Skip to content

Commit e31c90e

Browse files
committed
sqlite: fix crash session extension callbacks with workers
1 parent 8756a5a commit e31c90e

File tree

5 files changed

+128
-22
lines changed

5 files changed

+128
-22
lines changed

src/node_sqlite.cc

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1647,26 +1647,28 @@ void Backup(const FunctionCallbackInfo<Value>& args) {
16471647
job->ScheduleBackup();
16481648
}
16491649

1650+
struct ConflictCallbackContext {
1651+
std::function<bool(std::string)> filterCallback;
1652+
std::function<int(int)> conflictCallback;
1653+
};
1654+
16501655
// the reason for using static functions here is that SQLite needs a
16511656
// function pointer
1652-
static std::function<int(int)> conflictCallback;
16531657

16541658
static int xConflict(void* pCtx, int eConflict, sqlite3_changeset_iter* pIter) {
1655-
if (!conflictCallback) return SQLITE_CHANGESET_ABORT;
1656-
return conflictCallback(eConflict);
1659+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1660+
if (!ctx->conflictCallback) return SQLITE_CHANGESET_ABORT;
1661+
return ctx->conflictCallback(eConflict);
16571662
}
16581663

1659-
static std::function<bool(std::string)> filterCallback;
1660-
16611664
static int xFilter(void* pCtx, const char* zTab) {
1662-
if (!filterCallback) return 1;
1663-
1664-
return filterCallback(zTab) ? 1 : 0;
1665+
auto ctx = static_cast<ConflictCallbackContext*>(pCtx);
1666+
if (!ctx->filterCallback) return 1;
1667+
return ctx->filterCallback(zTab) ? 1 : 0;
16651668
}
16661669

16671670
void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
1668-
conflictCallback = nullptr;
1669-
filterCallback = nullptr;
1671+
ConflictCallbackContext context;
16701672

16711673
DatabaseSync* db;
16721674
ASSIGN_OR_RETURN_UNWRAP(&db, args.This());
@@ -1702,7 +1704,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17021704
return;
17031705
}
17041706
Local<Function> conflictFunc = conflictValue.As<Function>();
1705-
conflictCallback = [env, conflictFunc](int conflictType) -> int {
1707+
context.conflictCallback = [env, conflictFunc](int conflictType) -> int {
17061708
Local<Value> argv[] = {Integer::New(env->isolate(), conflictType)};
17071709
TryCatch try_catch(env->isolate());
17081710
Local<Value> result =
@@ -1740,7 +1742,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17401742

17411743
Local<Function> filterFunc = filterValue.As<Function>();
17421744

1743-
filterCallback = [env, filterFunc](std::string item) -> bool {
1745+
context.filterCallback = [env, filterFunc](std::string item) -> bool {
17441746
// TODO(@jasnell): The use of ToLocalChecked here means that if
17451747
// the filter function throws an error the process will crash.
17461748
// The filterCallback should be updated to avoid the check and
@@ -1764,7 +1766,7 @@ void DatabaseSync::ApplyChangeset(const FunctionCallbackInfo<Value>& args) {
17641766
const_cast<void*>(static_cast<const void*>(buf.data())),
17651767
xFilter,
17661768
xConflict,
1767-
nullptr);
1769+
static_cast<void*>(&context));
17681770
if (r == SQLITE_OK) {
17691771
args.GetReturnValue().Set(true);
17701772
return;

test/parallel/test-sqlite-session.js

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,9 @@ const {
77
constants,
88
} = require('node:sqlite');
99
const { test, suite } = require('node:test');
10+
const { nextDb } = require('../sqlite/next-db.js');
11+
const { Worker } = require('worker_threads');
12+
const { once } = require('events');
1013

1114
/**
1215
* Convenience wrapper around assert.deepStrictEqual that sets a null
@@ -555,3 +558,74 @@ test('session supports ERM', (t) => {
555558
message: /session is not open/,
556559
});
557560
});
561+
562+
test('concurrent applyChangeset with workers', async (t) => {
563+
// Before adding this test, the callbacks were stored in static variables
564+
// this could result in a crash
565+
// this test is a regression test for that scenario
566+
567+
function modeToString(mode) {
568+
if (mode === constants.SQLITE_CHANGESET_ABORT) return 'SQLITE_CHANGESET_ABORT';
569+
if (mode === constants.SQLITE_CHANGESET_OMIT) return 'SQLITE_CHANGESET_OMIT';
570+
}
571+
572+
const dbPath = nextDb();
573+
const db1 = new DatabaseSync(dbPath);
574+
const db2 = new DatabaseSync(':memory:');
575+
const createTable = `
576+
CREATE TABLE data(
577+
key INTEGER PRIMARY KEY,
578+
value TEXT
579+
) STRICT`;
580+
db1.exec(createTable);
581+
db2.exec(createTable);
582+
db1.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'hello');
583+
db1.close();
584+
const session = db2.createSession();
585+
db2.prepare('INSERT INTO data (key, value) VALUES (?, ?)').run(1, 'world');
586+
const changeset = session.changeset(); // Changeset with conflict (for db1)
587+
588+
const iterations = 10;
589+
for (let i = 0; i < iterations; i++) {
590+
const workers = [];
591+
const expectedResults = new Map([
592+
[constants.SQLITE_CHANGESET_ABORT, false],
593+
[constants.SQLITE_CHANGESET_OMIT, true]]
594+
);
595+
596+
// Launch two workers (abort and omit modes)
597+
for (const mode of [constants.SQLITE_CHANGESET_ABORT, constants.SQLITE_CHANGESET_OMIT]) {
598+
const worker = new Worker(`${__dirname}/../sqlite/worker.js`, {
599+
workerData: {
600+
dbPath,
601+
changeset,
602+
mode
603+
},
604+
});
605+
workers.push(worker);
606+
}
607+
608+
const results = await Promise.all(workers.map(async (worker) => {
609+
const [message] = await once(worker, 'message');
610+
return message;
611+
}));
612+
613+
// Verify each result
614+
for (const res of results) {
615+
if (res.errorMessage) {
616+
if (res.errcode === 5) { // SQLITE_BUSY
617+
break; // ignore
618+
}
619+
t.assert.fail(`Worker error: ${res.error.message}`);
620+
}
621+
const expected = expectedResults.get(res.mode);
622+
t.assert.strictEqual(
623+
res.result,
624+
expected,
625+
`Iteration ${i}: Worker (${modeToString(res.mode)}) expected ${expected} but got ${res.result}`
626+
);
627+
}
628+
629+
workers.forEach((worker) => worker.terminate()); // Cleanup
630+
}
631+
});

test/parallel/test-sqlite.js

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,10 @@
11
'use strict';
22
const { spawnPromisified, skipIfSQLiteMissing } = require('../common');
33
skipIfSQLiteMissing();
4-
const tmpdir = require('../common/tmpdir');
5-
const { join } = require('node:path');
64
const { DatabaseSync, constants } = require('node:sqlite');
75
const { suite, test } = require('node:test');
86
const { pathToFileURL } = require('node:url');
9-
let cnt = 0;
10-
11-
tmpdir.refresh();
12-
13-
function nextDb() {
14-
return join(tmpdir.path, `database-${cnt++}.db`);
15-
}
7+
const { nextDb } = require('../sqlite/next-db.js');
168

179
suite('accessing the node:sqlite module', () => {
1810
test('cannot be accessed without the node: scheme', (t) => {

test/sqlite/next-db.js

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
'use strict';
2+
require('../../common');
3+
const tmpdir = require('../common/tmpdir');
4+
const { join } = require('node:path');
5+
6+
let cnt = 0;
7+
8+
tmpdir.refresh();
9+
10+
function nextDb() {
11+
return join(tmpdir.path, `database-${cnt++}.db`);
12+
}
13+
14+
module.exports = { nextDb };

test/sqlite/worker.js

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// This worker is used for one of the tests in test-sqlite-session.js
2+
3+
'use strict';
4+
require('../../common');
5+
const { parentPort, workerData } = require('worker_threads');
6+
const { DatabaseSync, constants } = require('node:sqlite');
7+
const { changeset, mode, dbPath } = workerData;
8+
9+
const db = new DatabaseSync(dbPath);
10+
11+
const options = {};
12+
if (mode !== constants.SQLITE_CHANGESET_ABORT && mode !== constants.SQLITE_CHANGESET_OMIT) {
13+
throw new Error('Unexpected value for mode');
14+
}
15+
options.onConflict = () => mode;
16+
17+
try {
18+
const result = db.applyChangeset(changeset, options);
19+
parentPort.postMessage({ mode, result, error: null });
20+
} catch (error) {
21+
parentPort.postMessage({ mode, result: null, errorMessage: error.message, errcode: error.errcode });
22+
} finally {
23+
db.close(); // Just to make sure it is closed ASAP
24+
}

0 commit comments

Comments
 (0)