Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions packages/service-core/src/storage/BucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ export interface BucketStorageFactory
*/
getActiveCheckpoint(): Promise<ActiveCheckpoint>;

/**
* Yields the latest sync checkpoint.
*/
watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<ActiveCheckpoint>;

/**
* Yields the latest user write checkpoint whenever the sync checkpoint updates.
*/
Expand Down Expand Up @@ -118,6 +123,8 @@ export interface ActiveCheckpoint {
hasSyncRules(): boolean;

getBucketStorage(): Promise<SyncRulesBucketStorage | null>;

syncRules: PersistedSyncRulesContent | null;
}

export interface StorageMetrics {
Expand Down
34 changes: 28 additions & 6 deletions packages/service-core/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,9 @@ export class MongoBucketStorage
db: PowerSyncMongo,
options: {
slot_name_prefix: string;
/**
* Initial Write Checkpoint Mode
*/
write_checkpoint_mode?: WriteCheckpointMode;
}
) {
Expand All @@ -93,6 +96,10 @@ export class MongoBucketStorage
});
}

get writeCheckpointMode() {
return this.writeCheckpointAPI.writeCheckpointMode;
}

getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage {
let { id, slot_name } = options;
if ((typeof id as any) == 'bigint') {
Expand Down Expand Up @@ -303,6 +310,10 @@ export class MongoBucketStorage
return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints);
}

setWriteCheckpointMode(mode: WriteCheckpointMode): void {
return this.writeCheckpointAPI.setWriteCheckpointMode(mode);
}

async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise<bigint> {
return this.writeCheckpointAPI.createCustomWriteCheckpoint(options);
}
Expand Down Expand Up @@ -425,14 +436,15 @@ export class MongoBucketStorage
return null;
}
return (await this.storageCache.fetch(doc._id)) ?? null;
}
};
},
syncRules: doc ? new MongoPersistedSyncRulesContent(this.db, doc) : null
} satisfies ActiveCheckpoint;
}

/**
* Instance-wide watch on the latest available checkpoint (op_id + lsn).
*/
private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<ActiveCheckpoint> {
private async *_watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<ActiveCheckpoint> {
const pipeline: mongo.Document[] = [
{
$match: {
Expand All @@ -445,7 +457,8 @@ export class MongoBucketStorage
operationType: 1,
'fullDocument._id': 1,
'fullDocument.last_checkpoint': 1,
'fullDocument.last_checkpoint_lsn': 1
'fullDocument.last_checkpoint_lsn': 1,
'fullDocument.content': 1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the potential to add a lot of overhead - the rest of the change stream is optimized to get checkpoint updates with as little latency as feasible. It also shouldn't be needed - the content of a sync rules document never change.

}
}
];
Expand All @@ -467,7 +480,8 @@ export class MongoBucketStorage
projection: {
_id: 1,
last_checkpoint: 1,
last_checkpoint_lsn: 1
last_checkpoint_lsn: 1,
content: 1
}
}
);
Expand Down Expand Up @@ -516,6 +530,7 @@ export class MongoBucketStorage
if (doc == null) {
continue;
}

const op = this.makeActiveCheckpoint(doc);
// Check for LSN / checkpoint changes - ignore other metadata changes
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
Expand All @@ -527,9 +542,16 @@ export class MongoBucketStorage

// Nothing is done here until a subscriber starts to iterate
private readonly sharedIter = new sync.BroadcastIterable((signal) => {
return this.watchActiveCheckpoint(signal);
return this._watchActiveCheckpoint(signal);
});

/**
* Watch changes to the active sync rules and checkpoint.
*/
watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<ActiveCheckpoint> {
return wrapWithAbort(this.sharedIter, signal);
}

/**
* User-specific watch on the latest checkpoint and/or write checkpoint.
*/
Expand Down
29 changes: 18 additions & 11 deletions packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,31 @@ export type MongoCheckpointAPIOptions = {

export class MongoWriteCheckpointAPI implements WriteCheckpointAPI {
readonly db: PowerSyncMongo;
readonly mode: WriteCheckpointMode;
private _mode: WriteCheckpointMode;

constructor(options: MongoCheckpointAPIOptions) {
this.db = options.db;
this.mode = options.mode;
this._mode = options.mode;
}

get writeCheckpointMode() {
return this._mode;
}

setWriteCheckpointMode(mode: WriteCheckpointMode): void {
this._mode = mode;
}

async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void> {
return batchCreateCustomWriteCheckpoints(this.db, checkpoints);
}

async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise<bigint> {
if (this.mode !== WriteCheckpointMode.CUSTOM) {
throw new framework.errors.ValidationError(
`Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"`
);
}

/**
* Allow creating custom checkpoints even if the current mode is not `custom`.
* There might be a state where the next sync rules rely on replicating custom
* write checkpoints, but the current active sync rules uses managed checkpoints.
*/
const { checkpoint, user_id, sync_rules_id } = options;
const doc = await this.db.custom_write_checkpoints.findOneAndUpdate(
{
Expand All @@ -52,9 +59,9 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI {
}

async createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise<bigint> {
if (this.mode !== WriteCheckpointMode.MANAGED) {
if (this.writeCheckpointMode !== WriteCheckpointMode.MANAGED) {
throw new framework.errors.ValidationError(
`Creating a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"`
`Attempting to create a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"`
);
}

Expand All @@ -77,7 +84,7 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI {
}

async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise<bigint | null> {
switch (this.mode) {
switch (this.writeCheckpointMode) {
case WriteCheckpointMode.CUSTOM:
if (false == 'sync_rules_id' in filters) {
throw new framework.errors.ValidationError(`Sync rules ID is required for custom Write Checkpoint filtering`);
Expand Down
4 changes: 4 additions & 0 deletions packages/service-core/src/storage/write-checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters;
export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters;

export interface WriteCheckpointAPI {
readonly writeCheckpointMode: WriteCheckpointMode;

setWriteCheckpointMode(mode: WriteCheckpointMode): void;

batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void>;

createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise<bigint>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,8 @@ export class CompoundConfigCollector {
},
// TODO maybe move this out of the connection or something
// slot_name_prefix: connections[0]?.slot_name_prefix ?? 'powersync_'
slot_name_prefix: 'powersync_'
slot_name_prefix: 'powersync_',
parameters: baseConfig.parameters ?? {}
};

return config;
Expand Down
1 change: 1 addition & 0 deletions packages/service-core/src/util/config/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,5 @@ export type ResolvedPowerSyncConfig = {

/** Prefix for postgres replication slot names. May eventually be connection-specific. */
slot_name_prefix: string;
parameters: Record<string, number | string | boolean | null>;
};
5 changes: 2 additions & 3 deletions packages/sync-rules/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@
"type": "module",
"scripts": {
"clean": "rm -r ./dist && tsc -b --clean",
"build": "tsc -b",
"build:tsc": "tsc -b",
"build": "pnpm build:tsc && node scripts/compile-schema.js",
"build:tests": "tsc -b test/tsconfig.json",
"compile:schema": "pnpm build && node scripts/compile-schema.js",
"postversion": "pnpm compile:schema",
"test": "vitest"
},
"dependencies": {
Expand Down
4 changes: 3 additions & 1 deletion packages/types/src/config/PowerSyncConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,9 @@ export const powerSyncConfig = t.object({
disable_telemetry_sharing: t.boolean,
internal_service_endpoint: t.string.optional()
})
.optional()
.optional(),

parameters: t.record(t.number.or(t.string).or(t.boolean).or(t.Null)).optional()
});

export type PowerSyncConfig = t.Decoded<typeof powerSyncConfig>;
Expand Down
1 change: 1 addition & 0 deletions service/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ COPY packages/jpgwire/src packages/jpgwire/src/
COPY packages/jpgwire/ca packages/jpgwire/ca/
COPY packages/jsonbig/src packages/jsonbig/src/
COPY packages/sync-rules/src packages/sync-rules/src/
COPY packages/sync-rules/scripts packages/sync-rules/scripts/
COPY packages/rsocket-router/src packages/rsocket-router/src/
COPY packages/types/src packages/types/src/

Expand Down
Loading