Skip to content

Commit 0978ffc

Browse files
committed
Migrate subkey format
1 parent 973fd1c commit 0978ffc

File tree

6 files changed

+143
-11
lines changed

6 files changed

+143
-11
lines changed

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,14 +65,16 @@ export interface BucketStorageListener extends BaseListener {
6565

6666
export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener>, Disposable {
6767
init(): Promise<void>;
68-
saveSyncData(batch: SyncDataBatch): Promise<void>;
68+
saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean): Promise<void>;
6969
removeBuckets(buckets: string[]): Promise<void>;
7070
setTargetCheckpoint(checkpoint: Checkpoint): Promise<void>;
7171

7272
startSession(): void;
7373

7474
getBucketStates(): Promise<BucketState[]>;
7575
getBucketOperationProgress(): Promise<BucketOperationProgress>;
76+
hasMigratedSubkeys(): Promise<boolean>;
77+
migrateToFixedSubkeys(): Promise<void>;
7678

7779
syncLocalDatabase(
7880
checkpoint: Checkpoint,

packages/common/src/client/sync/bucket/OplogEntry.ts

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ export interface OplogEntryJSON {
88
object_type?: string;
99
op_id: string;
1010
op: OpTypeJSON;
11-
subkey?: string | object;
11+
subkey?: string;
1212
}
1313

1414
export class OplogEntry {
@@ -17,7 +17,7 @@ export class OplogEntry {
1717
row.op_id,
1818
OpType.fromJSON(row.op),
1919
row.checksum,
20-
typeof row.subkey == 'string' ? row.subkey : JSON.stringify(row.subkey),
20+
row.subkey,
2121
row.object_type,
2222
row.object_id,
2323
row.data
@@ -28,21 +28,23 @@ export class OplogEntry {
2828
public op_id: OpId,
2929
public op: OpType,
3030
public checksum: number,
31-
public subkey: string,
31+
public subkey?: string,
3232
public object_type?: string,
3333
public object_id?: string,
3434
public data?: string
3535
) {}
3636

37-
toJSON(): OplogEntryJSON {
37+
toJSON(fixedKeyEncoding = false): OplogEntryJSON {
3838
return {
3939
op_id: this.op_id,
4040
op: this.op.toJSON(),
4141
object_type: this.object_type,
4242
object_id: this.object_id,
4343
checksum: this.checksum,
4444
data: this.data,
45-
subkey: JSON.stringify(this.subkey)
45+
// Older versions of the JS SDK used to always JSON.stringify here. That has always been wrong,
46+
// but we need to migrate gradually to not break existing databases.
47+
subkey: fixedKeyEncoding ? this.subkey : JSON.stringify(this.subkey)
4648
};
4749
}
4850
}

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,13 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
9999
return Object.fromEntries(rows.map((r) => [r.name, { atLast: r.count_at_last, sinceLast: r.count_since_last }]));
100100
}
101101

102-
async saveSyncData(batch: SyncDataBatch) {
102+
async saveSyncData(batch: SyncDataBatch, fixedKeyFormat: boolean) {
103103
await this.writeTransaction(async (tx) => {
104104
let count = 0;
105105
for (const b of batch.buckets) {
106106
const result = await tx.execute('INSERT INTO powersync_operations(op, data) VALUES(?, ?)', [
107107
'save',
108-
JSON.stringify({ buckets: [b.toJSON()] })
108+
JSON.stringify({ buckets: [b.toJSON(fixedKeyFormat)] })
109109
]);
110110
this.logger.debug('saveSyncData', JSON.stringify(result));
111111
count += b.data.length;
@@ -420,6 +420,25 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
420420
return raw;
421421
});
422422
}
423+
424+
async hasMigratedSubkeys(): Promise<boolean> {
425+
const { r } = await this.db.get<{ r: number }>('SELECT EXISTS(SELECT * FROM ps_kv WHERE key = ?) as r', [
426+
SqliteBucketStorage._subkeyMigrationKey
427+
]);
428+
return r != 0;
429+
}
430+
431+
async migrateToFixedSubkeys(): Promise<void> {
432+
await this.writeTransaction(async (tx) => {
433+
await tx.execute('UPDATE ps_oplog SET key = powersync_remove_duplicate_key_encoding(key);');
434+
await tx.execute('INSERT OR REPLACE INTO ps_kv (key, value) VALUES (?, ?);', [
435+
SqliteBucketStorage._subkeyMigrationKey,
436+
'1'
437+
]);
438+
});
439+
}
440+
441+
static _subkeyMigrationKey = 'powersync_js_migrated_subkeys';
423442
}
424443

425444
function hasMatchingPriority(priority: number, bucket: BucketChecksum) {

packages/common/src/client/sync/bucket/SyncDataBucket.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,13 @@ export class SyncDataBucket {
3737
public next_after?: OpId
3838
) {}
3939

40-
toJSON(): SyncDataBucketJSON {
40+
toJSON(fixedKeyEncoding = false): SyncDataBucketJSON {
4141
return {
4242
bucket: this.bucket,
4343
has_more: this.has_more,
4444
after: this.after,
4545
next_after: this.next_after,
46-
data: this.data.map((entry) => entry.toJSON())
46+
data: this.data.map((entry) => entry.toJSON(fixedKeyEncoding))
4747
};
4848
}
4949
}

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,6 +558,31 @@ The next upload iteration will be delayed.`);
558558
return [req, localDescriptions];
559559
}
560560

561+
/**
562+
* Older versions of the JS SDK used to encode subkeys as JSON in {@link OplogEntry.toJSON}.
563+
* Because subkeys are always strings, this leads to quotes being added around them in `ps_oplog`.
564+
* While this is not a problem as long as it's done consistently, it causes issues when a database
565+
* created by the JS SDK is used with other SDKs, or (more likely) when the new Rust sync client
566+
* is enabled.
567+
*
568+
* So, we add a migration from the old key format (with quotes) to the new one (no quotes). The
569+
* migration is only triggered when necessary (for now). The function returns whether the new format
570+
* should be used, so that the JS SDK is able to write to updated databases.
571+
*
572+
* @param requireFixedKeyFormat Whether we require the new format or also support the old one.
573+
* The Rust client requires the new subkey format.
574+
* @returns Whether the database is now using the new, fixed subkey format.
575+
*/
576+
private async requireKeyFormat(requireFixedKeyFormat: boolean): Promise<boolean> {
577+
const hasMigrated = await this.options.adapter.hasMigratedSubkeys();
578+
if (requireFixedKeyFormat && !hasMigrated) {
579+
await this.options.adapter.migrateToFixedSubkeys();
580+
return true;
581+
} else {
582+
return hasMigrated;
583+
}
584+
}
585+
561586
protected async streamingSyncIteration(signal: AbortSignal, options?: PowerSyncConnectionOptions): Promise<void> {
562587
await this.obtainLock({
563588
type: LockType.SYNC,
@@ -571,6 +596,7 @@ The next upload iteration will be delayed.`);
571596
if (resolvedOptions.clientImplementation == SyncClientImplementation.JAVASCRIPT) {
572597
await this.legacyStreamingSyncIteration(signal, resolvedOptions);
573598
} else {
599+
await this.requireKeyFormat(true);
574600
await this.rustSyncIteration(signal, resolvedOptions);
575601
}
576602
}
@@ -588,6 +614,7 @@ The next upload iteration will be delayed.`);
588614
let appliedCheckpoint: Checkpoint | null = null;
589615

590616
const clientId = await this.options.adapter.getClientId();
617+
const usingFixedKeyFormat = await this.requireKeyFormat(false);
591618

592619
this.logger.debug('Requesting stream from server');
593620

@@ -746,7 +773,7 @@ The next upload iteration will be delayed.`);
746773
downloadProgress: updatedProgress
747774
}
748775
});
749-
await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] });
776+
await this.options.adapter.saveSyncData({ buckets: [SyncDataBucket.fromRow(data)] }, usingFixedKeyFormat);
750777
} else if (isStreamingKeepalive(line)) {
751778
const remaining_seconds = line.token_expires_in;
752779
if (remaining_seconds == 0) {

packages/node/tests/sync.test.ts

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,88 @@ describe('Sync', () => {
2222
describe('rust client', () => {
2323
defineSyncTests(SyncClientImplementation.RUST);
2424
});
25+
26+
mockSyncServiceTest('can migrate between sync implementations', async ({ syncService }) => {
27+
function addData(id: string) {
28+
syncService.pushLine({
29+
data: {
30+
bucket: 'a',
31+
data: [
32+
{
33+
checksum: 0,
34+
op_id: id,
35+
op: 'PUT',
36+
object_id: id,
37+
object_type: 'lists',
38+
subkey: `subkey_${id}`,
39+
data: '{}'
40+
}
41+
]
42+
}
43+
});
44+
}
45+
const checkpoint = {
46+
checkpoint: {
47+
last_op_id: '3',
48+
buckets: [bucket('a', 3)]
49+
}
50+
};
51+
52+
let database = await syncService.createDatabase();
53+
database.connect(new TestConnector(), {
54+
clientImplementation: SyncClientImplementation.JAVASCRIPT,
55+
connectionMethod: SyncStreamConnectionMethod.HTTP
56+
});
57+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
58+
syncService.pushLine(checkpoint);
59+
addData('1');
60+
61+
await vi.waitFor(async () => {
62+
expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(1);
63+
});
64+
await database.disconnect();
65+
// The JavaScript client encodes subkeys to JSON when it shouldn't...
66+
expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([
67+
expect.objectContaining({ key: 'lists/1/"subkey_1"' })
68+
]);
69+
70+
// Connecting again with the new client should fix the format
71+
database.connect(new TestConnector(), {
72+
clientImplementation: SyncClientImplementation.RUST,
73+
connectionMethod: SyncStreamConnectionMethod.HTTP
74+
});
75+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
76+
syncService.pushLine(checkpoint);
77+
addData('2');
78+
await vi.waitFor(async () => {
79+
expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(2);
80+
});
81+
await database.disconnect();
82+
expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([
83+
// Existing entry should be fixed too!
84+
expect.objectContaining({ key: 'lists/1/subkey_1' }),
85+
expect.objectContaining({ key: 'lists/2/subkey_2' })
86+
]);
87+
88+
// Finally, connecting with JS again should keep the fixed subkey format.
89+
database.connect(new TestConnector(), {
90+
clientImplementation: SyncClientImplementation.RUST,
91+
connectionMethod: SyncStreamConnectionMethod.HTTP
92+
});
93+
await vi.waitFor(() => expect(syncService.connectedListeners).toHaveLength(1));
94+
syncService.pushLine(checkpoint);
95+
addData('3');
96+
await vi.waitFor(async () => {
97+
expect(await database.getAll('SELECT * FROM ps_oplog')).toHaveLength(3);
98+
});
99+
await database.disconnect();
100+
expect(await database.getAll('SELECT * FROM ps_oplog')).toEqual([
101+
// Existing entry should be fixed too!
102+
expect.objectContaining({ key: 'lists/1/subkey_1' }),
103+
expect.objectContaining({ key: 'lists/2/subkey_2' }),
104+
expect.objectContaining({ key: 'lists/3/subkey_3' })
105+
]);
106+
});
25107
});
26108

27109
function defineSyncTests(impl: SyncClientImplementation) {

0 commit comments

Comments
 (0)