Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .changeset/ninety-dancers-agree.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
---
'@powersync/service-module-postgres-storage': minor
'@powersync/service-module-mongodb-storage': minor
'@powersync/service-core-tests': minor
'@powersync/service-module-postgres': minor
'@powersync/service-module-mongodb': minor
'@powersync/service-core': minor
'@powersync/service-module-mysql': minor
'@powersync/lib-service-postgres': minor
---

Use bigint everywhere internally for OpId.
5 changes: 5 additions & 0 deletions .changeset/rotten-pianos-film.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-mysql': patch
---

Fix race condition when stopping replication immediately after starting it.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import {
ReplicationAssertionError,
ServiceError
} from '@powersync/lib-services-framework';
import { deserializeBson, SaveOperationTag, storage, utils } from '@powersync/service-core';
import { deserializeBson, InternalOpId, SaveOperationTag, storage, utils } from '@powersync/service-core';
import * as timers from 'node:timers/promises';
import { PowerSyncMongo } from './db.js';
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
Expand Down Expand Up @@ -39,7 +39,7 @@ export interface MongoBucketBatchOptions {
groupId: number;
slotName: string;
lastCheckpointLsn: string | null;
keepaliveOp: string | null;
keepaliveOp: InternalOpId | null;
noCheckpointBeforeLsn: string;
storeCurrentData: boolean;
/**
Expand Down Expand Up @@ -77,12 +77,12 @@ export class MongoBucketBatch

private no_checkpoint_before_lsn: string;

private persisted_op: bigint | null = null;
private persisted_op: InternalOpId | null = null;

/**
* For tests only - not for persistence logic.
*/
public last_flushed_op: bigint | null = null;
public last_flushed_op: InternalOpId | null = null;

constructor(options: MongoBucketBatchOptions) {
super();
Expand All @@ -98,9 +98,7 @@ export class MongoBucketBatch
this.skipExistingRows = options.skipExistingRows;
this.batch = new OperationBatch();

if (options.keepaliveOp) {
this.persisted_op = BigInt(options.keepaliveOp);
}
this.persisted_op = options.keepaliveOp ?? null;
}

addCustomWriteCheckpoint(checkpoint: storage.BatchedCustomWriteCheckpointOptions): void {
Expand Down Expand Up @@ -135,7 +133,7 @@ export class MongoBucketBatch
return null;
}

let last_op: bigint | null = null;
let last_op: InternalOpId | null = null;
let resumeBatch: OperationBatch | null = null;

await this.withReplicationTransaction(`Flushing ${batch.length} ops`, async (session, opSeq) => {
Expand All @@ -153,7 +151,7 @@ export class MongoBucketBatch

this.persisted_op = last_op;
this.last_flushed_op = last_op;
return { flushed_op: String(last_op) };
return { flushed_op: last_op };
}

private async replicateBatch(
Expand Down Expand Up @@ -776,22 +774,23 @@ export class MongoBucketBatch
async truncate(sourceTables: storage.SourceTable[]): Promise<storage.FlushedResult | null> {
await this.flush();

let last_op: bigint | null = null;
let last_op: InternalOpId | null = null;
for (let table of sourceTables) {
last_op = await this.truncateSingle(table);
}

if (last_op) {
this.persisted_op = last_op;
return {
flushed_op: last_op
};
} else {
return null;
}

return {
flushed_op: String(last_op!)
};
}

async truncateSingle(sourceTable: storage.SourceTable): Promise<bigint> {
let last_op: bigint | null = null;
async truncateSingle(sourceTable: storage.SourceTable): Promise<InternalOpId> {
let last_op: InternalOpId | null = null;

// To avoid too large transactions, we limit the amount of data we delete per transaction.
// Since we don't use the record data here, we don't have explicit size limits per batch.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { logger, ReplicationAssertionError } from '@powersync/lib-services-framework';
import { storage, utils } from '@powersync/service-core';
import { InternalOpId, storage, utils } from '@powersync/service-core';

import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey } from './models.js';
Expand All @@ -12,7 +12,7 @@ interface CurrentBucketState {
/**
* Rows seen in the bucket, with the last op_id of each.
*/
seen: Map<string, bigint>;
seen: Map<string, InternalOpId>;
/**
* Estimated memory usage of the seen Map.
*/
Expand All @@ -21,7 +21,7 @@ interface CurrentBucketState {
/**
* Last (lowest) seen op_id that is not a PUT.
*/
lastNotPut: bigint | null;
lastNotPut: InternalOpId | null;

/**
* Number of REMOVE/MOVE operations seen since lastNotPut.
Expand Down Expand Up @@ -274,7 +274,7 @@ export class MongoCompactor {
* @param bucket bucket name
* @param op op_id of the last non-PUT operation, which will be converted to CLEAR.
*/
private async clearBucket(bucket: string, op: bigint) {
private async clearBucket(bucket: string, op: InternalOpId) {
const opFilter = {
_id: {
$gte: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ import {
CHECKPOINT_INVALIDATE_ALL,
CheckpointChanges,
GetCheckpointChangesOptions,
InternalOpId,
internalToExternalOpId,
ProtocolOpId,
ReplicationCheckpoint,
SourceTable,
storage,
Expand Down Expand Up @@ -119,7 +122,7 @@ export class MongoSyncBucketStorage
}
);
return {
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
checkpoint: doc?.last_checkpoint ?? 0n,
lsn: doc?.last_checkpoint_lsn ?? null
};
}
Expand All @@ -143,7 +146,7 @@ export class MongoSyncBucketStorage
slotName: this.slot_name,
lastCheckpointLsn: checkpoint_lsn,
noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN,
keepaliveOp: doc?.keepalive_op ?? null,
keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null,
storeCurrentData: options.storeCurrentData,
skipExistingRows: options.skipExistingRows ?? false
});
Expand All @@ -152,7 +155,7 @@ export class MongoSyncBucketStorage
await callback(batch);
await batch.flush();
if (batch.last_flushed_op) {
return { flushed_op: String(batch.last_flushed_op) };
return { flushed_op: batch.last_flushed_op };
} else {
return null;
}
Expand Down Expand Up @@ -249,7 +252,7 @@ export class MongoSyncBucketStorage
return result!;
}

async getParameterSets(checkpoint: utils.OpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
const lookupFilter = lookups.map((lookup) => {
return storage.serializeLookup(lookup);
});
Expand All @@ -259,7 +262,7 @@ export class MongoSyncBucketStorage
$match: {
'key.g': this.group_id,
lookup: { $in: lookupFilter },
_id: { $lte: BigInt(checkpoint) }
_id: { $lte: checkpoint }
}
},
{
Expand All @@ -284,23 +287,26 @@ export class MongoSyncBucketStorage
}

async *getBucketDataBatch(
checkpoint: utils.OpId,
dataBuckets: Map<string, string>,
checkpoint: utils.InternalOpId,
dataBuckets: Map<string, InternalOpId>,
options?: storage.BucketDataBatchOptions
): AsyncIterable<storage.SyncBucketDataBatch> {
if (dataBuckets.size == 0) {
return;
}
let filters: mongo.Filter<BucketDataDocument>[] = [];

const end = checkpoint ? BigInt(checkpoint) : new bson.MaxKey();
if (checkpoint == null) {
throw new ServiceAssertionError('checkpoint is null');
}
const end = checkpoint;
for (let [name, start] of dataBuckets.entries()) {
filters.push({
_id: {
$gt: {
g: this.group_id,
b: name,
o: BigInt(start)
o: start
},
$lte: {
g: this.group_id,
Expand Down Expand Up @@ -347,15 +353,15 @@ export class MongoSyncBucketStorage

let batchSize = 0;
let currentBatch: utils.SyncBucketData | null = null;
let targetOp: bigint | null = null;
let targetOp: InternalOpId | null = null;

// Ordered by _id, meaning buckets are grouped together
for (let rawData of data) {
const row = bson.deserialize(rawData, storage.BSON_DESERIALIZE_INTERNAL_OPTIONS) as BucketDataDocument;
const bucket = row._id.b;

if (currentBatch == null || currentBatch.bucket != bucket || batchSize >= sizeLimit) {
let start: string | undefined = undefined;
let start: ProtocolOpId | undefined = undefined;
if (currentBatch != null) {
if (currentBatch.bucket == bucket) {
currentBatch.has_more = true;
Expand All @@ -369,9 +375,12 @@ export class MongoSyncBucketStorage
targetOp = null;
}

start ??= dataBuckets.get(bucket);
if (start == null) {
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
const startOpId = dataBuckets.get(bucket);
if (startOpId == null) {
throw new ServiceAssertionError(`data for unexpected bucket: ${bucket}`);
}
start = internalToExternalOpId(startOpId);
}
currentBatch = {
bucket,
Expand Down Expand Up @@ -406,7 +415,7 @@ export class MongoSyncBucketStorage
}
}

async getChecksums(checkpoint: utils.OpId, buckets: string[]): Promise<utils.ChecksumMap> {
async getChecksums(checkpoint: utils.InternalOpId, buckets: string[]): Promise<utils.ChecksumMap> {
return this.checksumCache.getChecksumMap(checkpoint, buckets);
}

Expand Down Expand Up @@ -638,7 +647,7 @@ export class MongoSyncBucketStorage

private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) {
return {
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
checkpoint: doc?.last_checkpoint ?? 0n,
lsn: doc?.last_checkpoint_lsn ?? null
};
}
Expand Down Expand Up @@ -755,7 +764,7 @@ export class MongoSyncBucketStorage
*/
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.StorageCheckpointUpdate> {
const { user_id, signal } = options;
let lastCheckpoint: utils.OpId | null = null;
let lastCheckpoint: utils.InternalOpId | null = null;
let lastWriteCheckpoint: bigint | null = null;

const iter = wrapWithAbort(this.sharedIter, signal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules
import * as bson from 'bson';

import { logger } from '@powersync/lib-services-framework';
import { storage, utils } from '@powersync/service-core';
import { InternalOpId, storage, utils } from '@powersync/service-core';
import { currentBucketKey } from './MongoBucketBatch.js';
import { MongoIdSequence } from './MongoIdSequence.js';
import { PowerSyncMongo } from './db.js';
Expand Down Expand Up @@ -52,7 +52,7 @@ export class PersistedBatch {
/**
* For debug logging only.
*/
debugLastOpId: bigint | null = null;
debugLastOpId: InternalOpId | null = null;

/**
* Very rough estimate of transaction size.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export async function readSingleBatch<T>(cursor: mongo.FindCursor<T>): Promise<{
export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry {
if (row.op == 'PUT' || row.op == 'REMOVE') {
return {
op_id: utils.timestampToOpId(row._id.o),
op_id: utils.internalToExternalOpId(row._id.o),
op: row.op,
object_type: row.table,
object_id: row.row_id,
Expand All @@ -86,7 +86,7 @@ export function mapOpEntry(row: BucketDataDocument): utils.OplogEntry {
// MOVE, CLEAR

return {
op_id: utils.timestampToOpId(row._id.o),
op_id: utils.internalToExternalOpId(row._id.o),
op: row.op,
checksum: Number(row.checksum)
};
Expand Down
6 changes: 3 additions & 3 deletions modules/module-mongodb-storage/test/src/storage_sync.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ describe('sync - mongodb', () => {
const options: storage.BucketDataBatchOptions = {};

const batch1 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', '0']]), options)
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), options)
);
expect(test_utils.getBatchData(batch1)).toEqual([
{ op_id: '1', op: 'PUT', object_id: 'test1', checksum: 2871785649 },
Expand All @@ -87,7 +87,7 @@ describe('sync - mongodb', () => {
});

const batch2 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch1[0].batch.next_after]]), options)
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch1[0].batch.next_after)]]), options)
);
expect(test_utils.getBatchData(batch2)).toEqual([
{ op_id: '3', op: 'PUT', object_id: 'large2', checksum: 1607205872 }
Expand All @@ -99,7 +99,7 @@ describe('sync - mongodb', () => {
});

const batch3 = await test_utils.fromAsync(
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', batch2[0].batch.next_after]]), options)
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', BigInt(batch2[0].batch.next_after)]]), options)
);
expect(test_utils.getBatchData(batch3)).toEqual([
{ op_id: '4', op: 'PUT', object_id: 'test3', checksum: 1359888332 }
Expand Down
Loading