Skip to content

Commit 698467c

Browse files
authored
Use BigInt for OpId internally (#209)
* Use bigint internally for op ids. * Fix some tests. * Fix checksum query issue. * Fix some MySQL tests, and a race condition for aborting streaming. * Add changeset. * Fix merge conflict.
1 parent 45d2640 commit 698467c

File tree

31 files changed

+413
-336
lines changed

31 files changed

+413
-336
lines changed

.changeset/ninety-dancers-agree.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
'@powersync/lib-service-postgres': minor
10+
---
11+
12+
Use bigint everywhere internally for OpId.

.changeset/rotten-pianos-film.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-module-mysql': patch
3+
---
4+
5+
Fix race condition when stopping replication immediately after starting it.

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import {
1111
ReplicationAssertionError,
1212
ServiceError
1313
} from '@powersync/lib-services-framework';
14-
import { deserializeBson, SaveOperationTag, storage, utils } from '@powersync/service-core';
14+
import { deserializeBson, InternalOpId, SaveOperationTag, storage, utils } from '@powersync/service-core';
1515
import * as timers from 'node:timers/promises';
1616
import { PowerSyncMongo } from './db.js';
1717
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
@@ -39,7 +39,7 @@ export interface MongoBucketBatchOptions {
3939
groupId: number;
4040
slotName: string;
4141
lastCheckpointLsn: string | null;
42-
keepaliveOp: string | null;
42+
keepaliveOp: InternalOpId | null;
4343
noCheckpointBeforeLsn: string;
4444
storeCurrentData: boolean;
4545
/**
@@ -77,12 +77,12 @@ export class MongoBucketBatch
7777

7878
private no_checkpoint_before_lsn: string;
7979

80-
private persisted_op: bigint | null = null;
80+
private persisted_op: InternalOpId | null = null;
8181

8282
/**
8383
* For tests only - not for persistence logic.
8484
*/
85-
public last_flushed_op: bigint | null = null;
85+
public last_flushed_op: InternalOpId | null = null;
8686

8787
constructor(options: MongoBucketBatchOptions) {
8888
super();
@@ -98,9 +98,7 @@ export class MongoBucketBatch
9898
this.skipExistingRows = options.skipExistingRows;
9999
this.batch = new OperationBatch();
100100

101-
if (options.keepaliveOp) {
102-
this.persisted_op = BigInt(options.keepaliveOp);
103-
}
101+
this.persisted_op = options.keepaliveOp ?? null;
104102
}
105103

106104
addCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): void {
@@ -135,7 +133,7 @@ export class MongoBucketBatch
135133
return null;
136134
}
137135

138-
let last_op: bigint | null = null;
136+
let last_op: InternalOpId | null = null;
139137
let resumeBatch: OperationBatch | null = null;
140138

141139
await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => {
@@ -153,7 +151,7 @@ export class MongoBucketBatch
153151

154152
this.persisted_op = last_op;
155153
this.last_flushed_op = last_op;
156-
return { flushed_op: String(last_op) };
154+
return { flushed_op: last_op };
157155
}
158156

159157
private async replicateBatch(
@@ -776,22 +774,23 @@ export class MongoBucketBatch
776774
async truncate(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
777775
await this.flush();
778776

779-
let last_op: bigint | null = null;
777+
let last_op: InternalOpId | null = null;
780778
for (let table of sourceTables) {
781779
last_op = await this.truncateSingle(table);
782780
}
783781

784782
if (last_op) {
785783
this.persisted_op = last_op;
784+
return {
785+
flushed_op: last_op
786+
};
787+
} else {
788+
return null;
786789
}
787-
788-
return {
789-
flushed_op: String(last_op!)
790-
};
791790
}
792791

793-
async truncateSingle(sourceTable: storage.SourceTable): Promise<bigint> {
794-
let last_op: bigint | null = null;
792+
async truncateSingle(sourceTable: storage.SourceTable): Promise<InternalOpId> {
793+
let last_op: InternalOpId | null = null;
795794

796795
// To avoid too large transactions, we limit the amount of data we delete per transaction.
797796
// Since we don't use the record data here, we don't have explicit size limits per batch.

modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework';
3-
import { storage, utils } from '@powersync/service-core';
3+
import { InternalOpId, storage, utils } from '@powersync/service-core';
44

55
import { PowerSyncMongo } from './db.js';
66
import { BucketDataDocument, BucketDataKey } from './models.js';
@@ -12,7 +12,7 @@ interface CurrentBucketState {
1212
/**
1313
* Rows seen in the bucket, with the last op_id of each.
1414
*/
15-
seen: Map<string, bigint>;
15+
seen: Map<string, InternalOpId>;
1616
/**
1717
* Estimated memory usage of the seen Map.
1818
*/
@@ -21,7 +21,7 @@ interface CurrentBucketState {
2121
/**
2222
* Last (lowest) seen op_id that is not a PUT.
2323
*/
24-
lastNotPut: bigint | null;
24+
lastNotPut: InternalOpId | null;
2525

2626
/**
2727
* Number of REMOVE/MOVE operations seen since lastNotPut.
@@ -274,7 +274,7 @@ export class MongoCompactor {
274274
* @param bucket bucket name
275275
* @param op op_id of the last non-PUT operation, which will be converted to CLEAR.
276276
*/
277-
private async clearBucket(bucket: string, op: bigint) {
277+
private async clearBucket(bucket: string, op: InternalOpId) {
278278
const opFilter = {
279279
_id: {
280280
$gte: {

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import {
1212
CHECKPOINT_INVALIDATE_ALL,
1313
CheckpointChanges,
1414
GetCheckpointChangesOptions,
15+
InternalOpId,
16+
internalToExternalOpId,
17+
ProtocolOpId,
1518
ReplicationCheckpoint,
1619
SourceTable,
1720
storage,
@@ -119,7 +122,7 @@ export class MongoSyncBucketStorage
119122
}
120123
);
121124
return {
122-
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
125+
checkpoint: doc?.last_checkpoint ?? 0n,
123126
lsn: doc?.last_checkpoint_lsn ?? null
124127
};
125128
}
@@ -143,7 +146,7 @@ export class MongoSyncBucketStorage
143146
slotName: this.slot_name,
144147
lastCheckpointLsn: checkpoint_lsn,
145148
noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN,
146-
keepaliveOp: doc?.keepalive_op ?? null,
149+
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null,
147150
storeCurrentData: options.storeCurrentData,
148151
skipExistingRows: options.skipExistingRows ?? false
149152
});
@@ -152,7 +155,7 @@ export class MongoSyncBucketStorage
152155
await callback(batch);
153156
await batch.flush();
154157
if (batch.last_flushed_op) {
155-
return { flushed_op: String(batch.last_flushed_op) };
158+
return { flushed_op: batch.last_flushed_op };
156159
} else {
157160
return null;
158161
}
@@ -249,7 +252,7 @@ export class MongoSyncBucketStorage
249252
return result!;
250253
}
251254

252-
async getParameterSets(checkpoint: utils.OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
255+
async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
253256
const lookupFilter = lookups.map((lookup) => {
254257
return storage.serializeLookup(lookup);
255258
});
@@ -259,7 +262,7 @@ export class MongoSyncBucketStorage
259262
$match: {
260263
'key.g': this.group_id,
261264
lookup: { $in: lookupFilter },
262-
_id: { $lte: BigInt(checkpoint) }
265+
_id: { $lte: checkpoint }
263266
}
264267
},
265268
{
@@ -284,23 +287,26 @@ export class MongoSyncBucketStorage
284287
}
285288

286289
async *getBucketDataBatch(
287-
checkpoint: utils.OpId,
288-
dataBuckets: Map<string, string>,
290+
checkpoint: utils.InternalOpId,
291+
dataBuckets: Map<string, InternalOpId>,
289292
options?: storage.BucketDataBatchOptions
290293
): AsyncIterable<storage.SyncBucketDataBatch> {
291294
if (dataBuckets.size == 0) {
292295
return;
293296
}
294297
let filters: mongo.Filter<BucketDataDocument>[] = [];
295298

296-
const end = checkpoint ? BigInt(checkpoint) : new bson.MaxKey();
299+
if (checkpoint == null) {
300+
throw new ServiceAssertionError('checkpoint is null');
301+
}
302+
const end = checkpoint;
297303
for (let [name, start] of dataBuckets.entries()) {
298304
filters.push({
299305
_id: {
300306
$gt: {
301307
g: this.group_id,
302308
b: name,
303-
o: BigInt(start)
309+
o: start
304310
},
305311
$lte: {
306312
g: this.group_id,
@@ -347,15 +353,15 @@ export class MongoSyncBucketStorage
347353

348354
let batchSize = 0;
349355
let currentBatch: utils.SyncBucketData | null = null;
350-
let targetOp: bigint | null = null;
356+
let targetOp: InternalOpId | null = null;
351357

352358
// Ordered by _id, meaning buckets are grouped together
353359
for (let rawData of data) {
354360
const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument;
355361
const bucket = row._id.b;
356362

357363
if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) {
358-
let start: string | undefined = undefined;
364+
let start: ProtocolOpId | undefined = undefined;
359365
if (currentBatch != null) {
360366
if (currentBatch.bucket == bucket) {
361367
currentBatch.has_more = true;
@@ -369,9 +375,12 @@ export class MongoSyncBucketStorage
369375
targetOp = null;
370376
}
371377

372-
start ??= dataBuckets.get(bucket);
373378
if (start == null) {
374-
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
379+
const startOpId = dataBuckets.get(bucket);
380+
if (startOpId == null) {
381+
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
382+
}
383+
start = internalToExternalOpId(startOpId);
375384
}
376385
currentBatch = {
377386
bucket,
@@ -406,7 +415,7 @@ export class MongoSyncBucketStorage
406415
}
407416
}
408417

409-
async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise<utils.ChecksumMap> {
418+
async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise<utils.ChecksumMap> {
410419
return this.checksumCache.getChecksumMap(checkpoint, buckets);
411420
}
412421

@@ -638,7 +647,7 @@ export class MongoSyncBucketStorage
638647

639648
private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) {
640649
return {
641-
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
650+
checkpoint: doc?.last_checkpoint ?? 0n,
642651
lsn: doc?.last_checkpoint_lsn ?? null
643652
};
644653
}
@@ -755,7 +764,7 @@ export class MongoSyncBucketStorage
755764
*/
756765
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
757766
const { user_id, signal } = options;
758-
let lastCheckpoint: utils.OpId | null = null;
767+
let lastCheckpoint: utils.InternalOpId | null = null;
759768
let lastWriteCheckpoint: bigint | null = null;
760769

761770
const iter = wrapWithAbort(this.sharedIter, signal);

modules/module-mongodb-storage/src/storage/implementation/PersistedBatch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules
44
import * as bson from 'bson';
55

66
import { logger } from '@powersync/lib-services-framework';
7-
import { storage, utils } from '@powersync/service-core';
7+
import { InternalOpId, storage, utils } from '@powersync/service-core';
88
import { currentBucketKey } from './MongoBucketBatch.js';
99
import { MongoIdSequence } from './MongoIdSequence.js';
1010
import { PowerSyncMongo } from './db.js';
@@ -52,7 +52,7 @@ export class PersistedBatch {
5252
/**
5353
* For debug logging only.
5454
*/
55-
debugLastOpId: bigint | null = null;
55+
debugLastOpId: InternalOpId | null = null;
5656

5757
/**
5858
* Very rough estimate of transaction size.

modules/module-mongodb-storage/src/storage/implementation/util.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ export async function readSingleBatch<T>(cursor: mongo.FindCursor<T>): Promise<{
7474
export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry {
7575
if (row.op == 'PUT' || row.op == 'REMOVE') {
7676
return {
77-
op_id: utils.timestampToOpId(row._id.o),
77+
op_id: utils.internalToExternalOpId(row._id.o),
7878
op: row.op,
7979
object_type: row.table,
8080
object_id: row.row_id,
@@ -86,7 +86,7 @@ export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry {
8686
// MOVE, CLEAR
8787

8888
return {
89-
op_id: utils.timestampToOpId(row._id.o),
89+
op_id: utils.internalToExternalOpId(row._id.o),
9090
op: row.op,
9191
checksum: Number(row.checksum)
9292
};

modules/module-mongodb-storage/test/src/storage_sync.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ describe('sync - mongodb', () => {
7474
const options: storage.BucketDataBatchOptions = {};
7575

7676
const batch1 = await test_utils.fromAsync(
77-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), options)
77+
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), options)
7878
);
7979
expect(test_utils.getBatchData(batch1)).toEqual([
8080
{ op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 },
@@ -87,7 +87,7 @@ describe('sync - mongodb', () => {
8787
});
8888

8989
const batch2 = await test_utils.fromAsync(
90-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1[0].batch.next_after]]), options)
90+
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
9191
);
9292
expect(test_utils.getBatchData(batch2)).toEqual([
9393
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
@@ -99,7 +99,7 @@ describe('sync - mongodb', () => {
9999
});
100100

101101
const batch3 = await test_utils.fromAsync(
102-
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2[0].batch.next_after]]), options)
102+
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
103103
);
104104
expect(test_utils.getBatchData(batch3)).toEqual([
105105
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }

0 commit comments

Comments
 (0)