diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 84165fb7c..61b376be0 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -90,6 +90,11 @@ export interface BucketStorageFactory */ getActiveCheckpoint(): Promise; + /** + * Yields the latest sync checkpoint. + */ + watchActiveCheckpoint(signal: AbortSignal): AsyncIterable; + /** * Yields the latest user write checkpoint whenever the sync checkpoint updates. */ @@ -118,6 +123,8 @@ export interface ActiveCheckpoint { hasSyncRules(): boolean; getBucketStorage(): Promise; + + syncRules: PersistedSyncRulesContent | null; } export interface StorageMetrics { diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index 9da62758c..46fd1e66b 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -78,6 +78,9 @@ export class MongoBucketStorage db: PowerSyncMongo, options: { slot_name_prefix: string; + /** + * Initial Write Checkpoint Mode + */ write_checkpoint_mode?: WriteCheckpointMode; } ) { @@ -93,6 +96,10 @@ export class MongoBucketStorage }); } + get writeCheckpointMode() { + return this.writeCheckpointAPI.writeCheckpointMode; + } + getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage { let { id, slot_name } = options; if ((typeof id as any) == 'bigint') { @@ -303,6 +310,10 @@ export class MongoBucketStorage return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints); } + setWriteCheckpointMode(mode: WriteCheckpointMode): void { + return this.writeCheckpointAPI.setWriteCheckpointMode(mode); + } + async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createCustomWriteCheckpoint(options); } @@ -425,14 +436,15 @@ export class MongoBucketStorage return null; } return (await this.storageCache.fetch(doc._id)) ?? null; - } - }; + }, + syncRules: doc ? new MongoPersistedSyncRulesContent(this.db, doc) : null + } satisfies ActiveCheckpoint; } /** * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ - private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { + private async *_watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { const pipeline: mongo.Document[] = [ { $match: { @@ -445,7 +457,8 @@ export class MongoBucketStorage operationType: 1, 'fullDocument._id': 1, 'fullDocument.last_checkpoint': 1, - 'fullDocument.last_checkpoint_lsn': 1 + 'fullDocument.last_checkpoint_lsn': 1, + 'fullDocument.content': 1 } } ]; @@ -467,7 +480,8 @@ export class MongoBucketStorage projection: { _id: 1, last_checkpoint: 1, - last_checkpoint_lsn: 1 + last_checkpoint_lsn: 1, + content: 1 } } ); @@ -516,6 +530,7 @@ export class MongoBucketStorage if (doc == null) { continue; } + const op = this.makeActiveCheckpoint(doc); // Check for LSN / checkpoint changes - ignore other metadata changes if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { @@ -527,9 +542,16 @@ export class MongoBucketStorage // Nothing is done here until a subscriber starts to iterate private readonly sharedIter = new sync.BroadcastIterable((signal) => { - return this.watchActiveCheckpoint(signal); + return this._watchActiveCheckpoint(signal); }); + /** + * Watch changes to the active sync rules and checkpoint. + */ + watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { + return wrapWithAbort(this.sharedIter, signal); + } + /** * User-specific watch on the latest checkpoint and/or write checkpoint. */ diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index 230db3153..aa66b6f28 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -17,11 +17,19 @@ export type MongoCheckpointAPIOptions = { export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { readonly db: PowerSyncMongo; - readonly mode: WriteCheckpointMode; + private _mode: WriteCheckpointMode; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; - this.mode = options.mode; + this._mode = options.mode; + } + + get writeCheckpointMode() { + return this._mode; + } + + setWriteCheckpointMode(mode: WriteCheckpointMode): void { + this._mode = mode; } async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise { @@ -29,12 +37,11 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { - if (this.mode !== WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"` - ); - } - + /** + * Allow creating custom checkpoints even if the current mode is not `custom`. + * There might be a state where the next sync rules rely on replicating custom + * write checkpoints, but the current active sync rules uses managed checkpoints. + */ const { checkpoint, user_id, sync_rules_id } = options; const doc = await this.db.custom_write_checkpoints.findOneAndUpdate( { @@ -52,9 +59,9 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise { - if (this.mode !== WriteCheckpointMode.MANAGED) { + if (this.writeCheckpointMode !== WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( - `Creating a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"` + `Attempting to create a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` ); } @@ -77,7 +84,7 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise { - switch (this.mode) { + switch (this.writeCheckpointMode) { case WriteCheckpointMode.CUSTOM: if (false == 'sync_rules_id' in filters) { throw new framework.errors.ValidationError(`Sync rules ID is required for custom Write Checkpoint filtering`); diff --git a/packages/service-core/src/storage/write-checkpoint.ts b/packages/service-core/src/storage/write-checkpoint.ts index 0b61fe0c1..3a9abd216 100644 --- a/packages/service-core/src/storage/write-checkpoint.ts +++ b/packages/service-core/src/storage/write-checkpoint.ts @@ -55,6 +55,10 @@ export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; export interface WriteCheckpointAPI { + readonly writeCheckpointMode: WriteCheckpointMode; + + setWriteCheckpointMode(mode: WriteCheckpointMode): void; + batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; diff --git a/packages/service-core/src/util/config/compound-config-collector.ts b/packages/service-core/src/util/config/compound-config-collector.ts index 78e8500cb..7c2b70c87 100644 --- a/packages/service-core/src/util/config/compound-config-collector.ts +++ b/packages/service-core/src/util/config/compound-config-collector.ts @@ -122,7 +122,8 @@ export class CompoundConfigCollector { }, // TODO maybe move this out of the connection or something // slot_name_prefix: connections[0]?.slot_name_prefix ?? 'powersync_' - slot_name_prefix: 'powersync_' + slot_name_prefix: 'powersync_', + parameters: baseConfig.parameters ?? {} }; return config; diff --git a/packages/service-core/src/util/config/types.ts b/packages/service-core/src/util/config/types.ts index e5f461e49..99829526d 100644 --- a/packages/service-core/src/util/config/types.ts +++ b/packages/service-core/src/util/config/types.ts @@ -64,4 +64,5 @@ export type ResolvedPowerSyncConfig = { /** Prefix for postgres replication slot names. May eventually be connection-specific. */ slot_name_prefix: string; + parameters: Record; }; diff --git a/packages/sync-rules/package.json b/packages/sync-rules/package.json index bb942b1e8..727188bb6 100644 --- a/packages/sync-rules/package.json +++ b/packages/sync-rules/package.json @@ -15,10 +15,9 @@ "type": "module", "scripts": { "clean": "rm -r ./dist && tsc -b --clean", - "build": "tsc -b", + "build:tsc": "tsc -b", + "build": "pnpm build:tsc && node scripts/compile-schema.js", "build:tests": "tsc -b test/tsconfig.json", - "compile:schema": "pnpm build && node scripts/compile-schema.js", - "postversion": "pnpm compile:schema", "test": "vitest" }, "dependencies": { diff --git a/packages/types/src/config/PowerSyncConfig.ts b/packages/types/src/config/PowerSyncConfig.ts index e9dff54f4..dbd33e1ae 100644 --- a/packages/types/src/config/PowerSyncConfig.ts +++ b/packages/types/src/config/PowerSyncConfig.ts @@ -135,7 +135,9 @@ export const powerSyncConfig = t.object({ disable_telemetry_sharing: t.boolean, internal_service_endpoint: t.string.optional() }) - .optional() + .optional(), + + parameters: t.record(t.number.or(t.string).or(t.boolean).or(t.Null)).optional() }); export type PowerSyncConfig = t.Decoded; diff --git a/service/Dockerfile b/service/Dockerfile index 715931b11..d6641555e 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -29,6 +29,7 @@ COPY packages/jpgwire/src packages/jpgwire/src/ COPY packages/jpgwire/ca packages/jpgwire/ca/ COPY packages/jsonbig/src packages/jsonbig/src/ COPY packages/sync-rules/src packages/sync-rules/src/ +COPY packages/sync-rules/scripts packages/sync-rules/scripts/ COPY packages/rsocket-router/src packages/rsocket-router/src/ COPY packages/types/src packages/types/src/