sqlite: fix crash session extension callbacks with workers

PR-URL: https://github.com/nodejs/node/pull/59848
Reviewed-By: Anna Henningsen <anna@addaleax.net>
Reviewed-By: Zeyu "Alex" Yang <himself65@outlook.com>
Reviewed-By: Edy Silva <edigleyssonsilva@gmail.com>
This commit is contained in:
Bart Louwers 2025-09-23 16:41:39 +02:00 committed by GitHub
parent 24ded11b66
commit d2ff9daf58
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 135 additions and 26 deletions

View File

@ -1668,26 +1668,28 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
job->ScheduleBackup();
}
struct ConflictCallbackContext {
std::function<bool(std::string_view)> filterCallback;
std::function<int(int)> conflictCallback;
};
// the reason for using static functions here is that SQLite needs a
// function pointer
static std::function<int(int)> conflictCallback;
static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
return conflictCallback(eConflict);
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT;
return ctx->conflictCallback(eConflict);
}
static std::function<bool(std::string)> filterCallback;
static int xFilter(void* pCtx, const char* zTab) {
if (!filterCallback) return 1;
return filterCallback(zTab) ? 1 : 0;
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
if (!ctx->filterCallback) return 1;
return ctx->filterCallback(zTab) ? 1 : 0;
}
void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
conflictCallback = nullptr;
filterCallback = nullptr;
ConflictCallbackContext context;
DatabaseSync* db;
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
@ -1723,7 +1725,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
return;
}
Local<Function> conflictFunc = conflictValue.As<Function>();
conflictCallback = [env, conflictFunc](int conflictType) -> int {
context.conflictCallback = [env, conflictFunc](int conflictType) -> int {
Local<Value> argv[] = {Integer::New(env->isolate(), conflictType)};
TryCatch try_catch(env->isolate());
Local<Value> result =
@ -1761,15 +1763,18 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
Local<Function> filterFunc = filterValue.As<Function>();
filterCallback = [env, filterFunc](std::string item) -> bool {
context.filterCallback = [env,
filterFunc](std::string_view item) -> bool {
// TODO(@jasnell): The use of ToLocalChecked here means that if
// the filter function throws an error the process will crash.
// The filterCallback should be updated to avoid the check and
// propagate the error correctly.
Local<Value> argv[] = {String::NewFromUtf8(env->isolate(),
item.c_str(),
NewStringType::kNormal)
.ToLocalChecked()};
Local<Value> argv[] = {
String::NewFromUtf8(env->isolate(),
item.data(),
NewStringType::kNormal,
static_cast<int>(item.size()))
.ToLocalChecked()};
Local<Value> result =
filterFunc->Call(env->context(), Null(env->isolate()), 1, argv)
.ToLocalChecked();
@ -1785,7 +1790,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
const_cast<void*>(static_cast<const void*>(buf.data())),
xFilter,
xConflict,
nullptr);
static_cast<void*>(&context));
if (r == SQLITE_OK) {
args.GetReturnValue().Set(true);
return;

View File

@ -7,6 +7,9 @@ const {
constants,
} = require('node:sqlite');
const { test, suite } = require('node:test');
const { nextDb } = require('../sqlite/next-db.js');
const { Worker } = require('worker_threads');
const { once } = require('events');
/**
* Convenience wrapper around assert.deepStrictEqual that sets a null
@ -555,3 +558,74 @@ test('session supports ERM', (t) => {
message: /session is not open/,
});
});
test('concurrent applyChangeset with workers', async (t) => {
// Before adding this test, the callbacks were stored in static variables
// this could result in a crash
// this test is a regression test for that scenario
function modeToString(mode) {
if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT';
if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT';
}
const dbPath = nextDb();
const db1 = new DatabaseSync(dbPath);
const db2 = new DatabaseSync(':memory:');
const createTable = `
CREATE TABLE data(
key INTEGER PRIMARY KEY,
value TEXT
) STRICT`;
db1.exec(createTable);
db2.exec(createTable);
db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello');
db1.close();
const session = db2.createSession();
db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world');
const changeset = session.changeset(); // Changeset with conflict (for db1)
const iterations = 10;
for (let i = 0; i < iterations; i++) {
const workers = [];
const expectedResults = new Map([
[constants.SQLITE_CHANGESET_ABORT, false],
[constants.SQLITE_CHANGESET_OMIT, true]]
);
// Launch two workers (abort and omit modes)
for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) {
const worker = new Worker(`${__dirname}/../sqlite/worker.js`, {
workerData: {
dbPath,
changeset,
mode
},
});
workers.push(worker);
}
const results = await Promise.all(workers.map(async (worker) => {
const [message] = await once(worker, 'message');
return message;
}));
// Verify each result
for (const res of results) {
if (res.errorMessage) {
if (res.errcode === 5) { // SQLITE_BUSY
break; // ignore
}
t.assert.fail(`Worker error: ${res.error.message}`);
}
const expected = expectedResults.get(res.mode);
t.assert.strictEqual(
res.result,
expected,
`Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}`
);
}
workers.forEach((worker) => worker.terminate()); // Cleanup
}
});

View File

@ -1,18 +1,10 @@
'use strict';
const { spawnPromisified, skipIfSQLiteMissing } = require('../common');
skipIfSQLiteMissing();
const tmpdir = require('../common/tmpdir');
const { join } = require('node:path');
const { DatabaseSync, constants } = require('node:sqlite');
const { suite, test } = require('node:test');
const { pathToFileURL } = require('node:url');
let cnt = 0;
tmpdir.refresh();
function nextDb() {
return join(tmpdir.path, `database-${cnt++}.db`);
}
const { nextDb } = require('../sqlite/next-db.js');
suite('accessing the node:sqlite module', () => {
test('cannot be accessed without the node: scheme', (t) => {

14
test/sqlite/next-db.js Normal file
View File

@ -0,0 +1,14 @@
'use strict';
require('../common');
const tmpdir = require('../common/tmpdir');
const { join } = require('node:path');
let cnt = 0;
tmpdir.refresh();
function nextDb() {
return join(tmpdir.path, `database-${cnt++}.db`);
}
module.exports = { nextDb };

24
test/sqlite/worker.js Normal file
View File

@ -0,0 +1,24 @@
// This worker is used for one of the tests in test-sqlite-session.js
'use strict';
require('../common');
const { parentPort, workerData } = require('worker_threads');
const { DatabaseSync, constants } = require('node:sqlite');
const { changeset, mode, dbPath } = workerData;
const db = new DatabaseSync(dbPath);
const options = {};
if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) {
throw new Error('Unexpected value for mode');
}
options.onConflict = () => mode;
try {
const result = db.applyChangeset(changeset, options);
parentPort.postMessage({ mode, result, error: null });
} catch (error) {
parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode });
} finally {
db.close(); // Just to make sure it is closed ASAP
}