Skip to content

Commit 2ccd6af

Browse files
[Modules] Move Write Checkpoint APIs (#110)
1 parent 5aa2322 commit 2ccd6af

File tree

17 files changed

+138
-113
lines changed

17 files changed

+138
-113
lines changed

.changeset/slow-stingrays-kiss.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': minor
3+
---
4+
5+
Moved Write Checkpoint APIs to SyncBucketStorage

packages/service-core/src/routes/endpoints/checkpointing.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { logger, router, schema } from '@powersync/lib-services-framework';
22
import * as t from 'ts-codec';
33

4+
import * as framework from '@powersync/lib-services-framework';
45
import * as util from '../../util/util-index.js';
56
import { authUser } from '../auth.js';
67
import { routeDefinition } from '../router.js';
@@ -63,7 +64,13 @@ export const writeCheckpoint2 = routeDefinition({
6364
storageEngine: { activeBucketStorage }
6465
} = service_context;
6566

66-
const writeCheckpoint = await activeBucketStorage.createManagedWriteCheckpoint({
67+
const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent();
68+
if (!activeSyncRules) {
69+
throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`);
70+
}
71+
72+
using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules);
73+
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
6774
user_id: full_user_id,
6875
heads: { '1': currentCheckpoint }
6976
});

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,16 +12,15 @@ import * as util from '../util/util-index.js';
1212
import { ReplicationEventPayload } from './ReplicationEventPayload.js';
1313
import { SourceEntityDescriptor } from './SourceEntity.js';
1414
import { SourceTable } from './SourceTable.js';
15-
import { BatchedCustomWriteCheckpointOptions, ReplicaId, WriteCheckpointAPI } from './storage-index.js';
15+
import { BatchedCustomWriteCheckpointOptions, ReplicaId } from './storage-index.js';
16+
import { SyncStorageWriteCheckpointAPI } from './WriteCheckpointAPI.js';
1617

1718
export interface BucketStorageFactoryListener extends DisposableListener {
1819
syncStorageCreated: (storage: SyncRulesBucketStorage) => void;
1920
replicationEvent: (event: ReplicationEventPayload) => void;
2021
}
2122

22-
export interface BucketStorageFactory
23-
extends DisposableObserverClient<BucketStorageFactoryListener>,
24-
WriteCheckpointAPI {
23+
export interface BucketStorageFactory extends DisposableObserverClient<BucketStorageFactoryListener> {
2524
/**
2625
* Update sync rules from configuration, if changed.
2726
*/
@@ -206,7 +205,9 @@ export interface SyncRulesBucketStorageListener extends DisposableListener {
206205
batchStarted: (batch: BucketStorageBatch) => void;
207206
}
208207

209-
export interface SyncRulesBucketStorage extends DisposableObserverClient<SyncRulesBucketStorageListener> {
208+
export interface SyncRulesBucketStorage
209+
extends DisposableObserverClient<SyncRulesBucketStorageListener>,
210+
SyncStorageWriteCheckpointAPI {
210211
readonly group_id: number;
211212
readonly slot_name: string;
212213

packages/service-core/src/storage/MongoBucketStorage.ts

Lines changed: 6 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,7 @@ import { PowerSyncMongo, PowerSyncMongoOptions } from './mongo/db.js';
2525
import { SyncRuleDocument, SyncRuleState } from './mongo/models.js';
2626
import { MongoPersistedSyncRulesContent } from './mongo/MongoPersistedSyncRulesContent.js';
2727
import { MongoSyncBucketStorage } from './mongo/MongoSyncBucketStorage.js';
28-
import { MongoWriteCheckpointAPI } from './mongo/MongoWriteCheckpointAPI.js';
2928
import { generateSlotName } from './mongo/util.js';
30-
import {
31-
CustomWriteCheckpointOptions,
32-
DEFAULT_WRITE_CHECKPOINT_MODE,
33-
LastWriteCheckpointFilters,
34-
ManagedWriteCheckpointOptions,
35-
WriteCheckpointAPI,
36-
WriteCheckpointMode
37-
} from './write-checkpoint.js';
3829

3930
export interface MongoBucketStorageOptions extends PowerSyncMongoOptions {}
4031

@@ -47,10 +38,6 @@ export class MongoBucketStorage
4738
// TODO: This is still Postgres specific and needs to be reworked
4839
public readonly slot_name_prefix: string;
4940

50-
readonly write_checkpoint_mode: WriteCheckpointMode;
51-
52-
protected readonly writeCheckpointAPI: WriteCheckpointAPI;
53-
5441
private readonly storageCache = new LRUCache<number, MongoSyncBucketStorage>({
5542
max: 3,
5643
fetchMethod: async (id) => {
@@ -78,19 +65,13 @@ export class MongoBucketStorage
7865
db: PowerSyncMongo,
7966
options: {
8067
slot_name_prefix: string;
81-
write_checkpoint_mode?: WriteCheckpointMode;
8268
}
8369
) {
8470
super();
8571
this.client = db.client;
8672
this.db = db;
8773
this.session = this.client.startSession();
8874
this.slot_name_prefix = options.slot_name_prefix;
89-
this.write_checkpoint_mode = options.write_checkpoint_mode ?? DEFAULT_WRITE_CHECKPOINT_MODE;
90-
this.writeCheckpointAPI = new MongoWriteCheckpointAPI({
91-
db,
92-
mode: this.write_checkpoint_mode
93-
});
9475
}
9576

9677
getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage {
@@ -299,22 +280,6 @@ export class MongoBucketStorage
299280
});
300281
}
301282

302-
async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void> {
303-
return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints);
304-
}
305-
306-
async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise<bigint> {
307-
return this.writeCheckpointAPI.createCustomWriteCheckpoint(options);
308-
}
309-
310-
async createManagedWriteCheckpoint(options: ManagedWriteCheckpointOptions): Promise<bigint> {
311-
return this.writeCheckpointAPI.createManagedWriteCheckpoint(options);
312-
}
313-
314-
async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise<bigint | null> {
315-
return this.writeCheckpointAPI.lastWriteCheckpoint(filters);
316-
}
317-
318283
async getActiveCheckpoint(): Promise<ActiveCheckpoint> {
319284
const doc = await this.db.sync_rules.findOne(
320285
{
@@ -426,7 +391,7 @@ export class MongoBucketStorage
426391
}
427392
return (await this.storageCache.fetch(doc._id)) ?? null;
428393
}
429-
};
394+
} satisfies ActiveCheckpoint;
430395
}
431396

432397
/**
@@ -516,6 +481,7 @@ export class MongoBucketStorage
516481
if (doc == null) {
517482
continue;
518483
}
484+
519485
const op = this.makeActiveCheckpoint(doc);
520486
// Check for LSN / checkpoint changes - ignore other metadata changes
521487
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
@@ -546,12 +512,14 @@ export class MongoBucketStorage
546512
// 1. checkpoint (op_id) changes.
547513
// 2. write checkpoint changes for the specific user
548514
const bucketStorage = await cp.getBucketStorage();
515+
if (!bucketStorage) {
516+
continue;
517+
}
549518

550519
const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};
551520

552-
const currentWriteCheckpoint = await this.lastWriteCheckpoint({
521+
const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({
553522
user_id,
554-
sync_rules_id: bucketStorage?.group_id,
555523
heads: {
556524
...lsnFilters
557525
}

packages/service-core/src/storage/StorageEngine.ts

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,12 @@
11
import { DisposableListener, DisposableObserver, logger } from '@powersync/lib-services-framework';
22
import { ResolvedPowerSyncConfig } from '../util/util-index.js';
33
import { BucketStorageFactory } from './BucketStorage.js';
4-
import { ActiveStorage, BucketStorageProvider, StorageSettings } from './StorageProvider.js';
5-
import { DEFAULT_WRITE_CHECKPOINT_MODE } from './write-checkpoint.js';
4+
import { ActiveStorage, BucketStorageProvider } from './StorageProvider.js';
65

76
export type StorageEngineOptions = {
87
configuration: ResolvedPowerSyncConfig;
98
};
109

11-
export const DEFAULT_STORAGE_SETTINGS: StorageSettings = {
12-
writeCheckpointMode: DEFAULT_WRITE_CHECKPOINT_MODE
13-
};
14-
1510
export interface StorageEngineListener extends DisposableListener {
1611
storageActivated: (storage: BucketStorageFactory) => void;
1712
}
@@ -20,11 +15,9 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
2015
// TODO: This will need to revisited when we actually support multiple storage providers.
2116
private storageProviders: Map<string, BucketStorageProvider> = new Map();
2217
private currentActiveStorage: ActiveStorage | null = null;
23-
private _activeSettings: StorageSettings;
2418

2519
constructor(private options: StorageEngineOptions) {
2620
super();
27-
this._activeSettings = DEFAULT_STORAGE_SETTINGS;
2821
}
2922

3023
get activeBucketStorage(): BucketStorageFactory {
@@ -39,20 +32,6 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
3932
return this.currentActiveStorage;
4033
}
4134

42-
get activeSettings(): StorageSettings {
43-
return { ...this._activeSettings };
44-
}
45-
46-
updateSettings(settings: Partial<StorageSettings>) {
47-
if (this.currentActiveStorage) {
48-
throw new Error(`Storage is already active, settings cannot be modified.`);
49-
}
50-
this._activeSettings = {
51-
...this._activeSettings,
52-
...settings
53-
};
54-
}
55-
5635
/**
5736
* Register a provider which generates a {@link BucketStorageFactory}
5837
* given the matching config specified in the loaded {@link ResolvedPowerSyncConfig}
@@ -65,8 +44,7 @@ export class StorageEngine extends DisposableObserver<StorageEngineListener> {
6544
logger.info('Starting Storage Engine...');
6645
const { configuration } = this.options;
6746
this.currentActiveStorage = await this.storageProviders.get(configuration.storage.type)!.getStorage({
68-
resolvedConfig: configuration,
69-
...this.activeSettings
47+
resolvedConfig: configuration
7048
});
7149
this.iterateListeners((cb) => cb.storageActivated?.(this.activeBucketStorage));
7250
logger.info(`Successfully activated storage: ${configuration.storage.type}.`);

packages/service-core/src/storage/StorageProvider.ts

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import * as util from '../util/util-index.js';
22
import { BucketStorageFactory } from './BucketStorage.js';
3-
import { WriteCheckpointMode } from './write-checkpoint.js';
43

54
export interface ActiveStorage {
65
storage: BucketStorageFactory;
@@ -12,14 +11,7 @@ export interface ActiveStorage {
1211
tearDown(): Promise<boolean>;
1312
}
1413

15-
/**
16-
* Settings which can be modified by various modules in their initialization.
17-
*/
18-
export interface StorageSettings {
19-
writeCheckpointMode: WriteCheckpointMode;
20-
}
21-
22-
export interface GetStorageOptions extends StorageSettings {
14+
export interface GetStorageOptions {
2315
// TODO: This should just be the storage config. Update once the slot name prefix coupling has been removed from the storage
2416
resolvedConfig: util.ResolvedPowerSyncConfig;
2517
}

packages/service-core/src/storage/write-checkpoint.ts renamed to packages/service-core/src/storage/WriteCheckpointAPI.ts

Lines changed: 31 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ export enum WriteCheckpointMode {
33
* Raw mappings of `user_id` to `write_checkpoint`s should
44
* be supplied for each set of sync rules.
55
*/
6-
CUSTOM = 'manual',
6+
CUSTOM = 'custom',
77
/**
88
* Write checkpoints are stored as a mapping of `user_id` plus
99
* replication HEAD (lsn in Postgres) to an automatically generated
@@ -26,19 +26,19 @@ export interface CustomWriteCheckpointFilters extends BaseWriteCheckpointIdentif
2626
sync_rules_id: number;
2727
}
2828

29-
export interface CustomWriteCheckpointOptions extends CustomWriteCheckpointFilters {
29+
export interface BatchedCustomWriteCheckpointOptions extends BaseWriteCheckpointIdentifier {
3030
/**
3131
* A supplied incrementing Write Checkpoint number
3232
*/
3333
checkpoint: bigint;
3434
}
3535

36-
/**
37-
* Options for creating a custom Write Checkpoint in a batch.
38-
* A {@link BucketStorageBatch} is already associated with a Sync Rules instance.
39-
* The `sync_rules_id` is not required here.
40-
*/
41-
export type BatchedCustomWriteCheckpointOptions = Omit<CustomWriteCheckpointOptions, 'sync_rules_id'>;
36+
export interface CustomWriteCheckpointOptions extends BatchedCustomWriteCheckpointOptions {
37+
/**
38+
* Sync rules which were active when this checkpoint was created.
39+
*/
40+
sync_rules_id: number;
41+
}
4242

4343
/**
4444
* Managed Write Checkpoints are a mapping of User ID to replication HEAD
@@ -52,15 +52,33 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti
5252

5353
export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters;
5454

55+
export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters;
5556
export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters;
5657

57-
export interface WriteCheckpointAPI {
58-
batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void>;
59-
60-
createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise<bigint>;
61-
58+
export interface BaseWriteCheckpointAPI {
59+
readonly writeCheckpointMode: WriteCheckpointMode;
60+
setWriteCheckpointMode(mode: WriteCheckpointMode): void;
6261
createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise<bigint>;
62+
}
63+
64+
/**
65+
* Write Checkpoint API to be used in conjunction with a {@link SyncRulesBucketStorage}.
66+
* This storage corresponds with a set of sync rules. These APIs don't require specifying a
67+
* sync rules id.
68+
*/
69+
export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI {
70+
batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise<void>;
71+
createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise<bigint>;
72+
lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise<bigint | null>;
73+
}
6374

75+
/**
76+
* Write Checkpoint API which is interfaced directly with the storage layer. This requires
77+
* sync rules identifiers for custom write checkpoints.
78+
*/
79+
export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI {
80+
batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise<void>;
81+
createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise<bigint>;
6482
lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise<bigint | null>;
6583
}
6684

packages/service-core/src/storage/mongo/MongoBucketBatch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ import {
1212
SaveOptions
1313
} from '../BucketStorage.js';
1414
import { SourceTable } from '../SourceTable.js';
15-
import { CustomWriteCheckpointOptions } from '../write-checkpoint.js';
15+
import { BatchedCustomWriteCheckpointOptions, CustomWriteCheckpointOptions } from '../WriteCheckpointAPI.js';
1616
import { PowerSyncMongo } from './db.js';
1717
import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js';
1818
import { MongoIdSequence } from './MongoIdSequence.js';
@@ -83,7 +83,7 @@ export class MongoBucketBatch extends DisposableObserver<BucketBatchStorageListe
8383
this.sync_rules = sync_rules;
8484
}
8585

86-
addCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): void {
86+
addCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): void {
8787
this.write_checkpoint_batch.push({
8888
...checkpoint,
8989
sync_rules_id: this.group_id

packages/service-core/src/storage/mongo/MongoStorageProvider.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,7 @@ export class MongoStorageProvider implements BucketStorageProvider {
1919
return {
2020
storage: new MongoBucketStorage(database, {
2121
// TODO currently need the entire resolved config due to this
22-
slot_name_prefix: resolvedConfig.slot_name_prefix,
23-
write_checkpoint_mode: options.writeCheckpointMode
22+
slot_name_prefix: resolvedConfig.slot_name_prefix
2423
}),
2524
shutDown: () => client.close(),
2625
tearDown: () => {

0 commit comments

Comments
 (0)