Skip to content

Commit 0b1867b

Browse files
committed
Refactor write checkpoints to make testing easier.
1 parent e683d05 commit 0b1867b

File tree

4 files changed

+48
-28
lines changed

4 files changed

+48
-28
lines changed

packages/service-core/src/routes/endpoints/checkpointing.ts

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -56,25 +56,14 @@ export const writeCheckpoint2 = routeDefinition({
5656

5757
const apiHandler = service_context.routerEngine!.getAPI();
5858

59-
const client_id = payload.params.client_id;
60-
const full_user_id = util.checkpointUserId(user_id, client_id);
61-
62-
const currentCheckpoint = await apiHandler.getReplicationHead();
63-
const {
64-
storageEngine: { activeBucketStorage }
65-
} = service_context;
66-
67-
const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent();
68-
if (!activeSyncRules) {
69-
throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`);
70-
}
71-
72-
using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules);
73-
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
74-
user_id: full_user_id,
75-
heads: { '1': currentCheckpoint }
59+
const { replicationHead, writeCheckpoint } = await util.createWriteCheckpoint({
60+
userId: user_id,
61+
clientId: payload.params.client_id,
62+
api: apiHandler,
63+
storage: service_context.storageEngine.activeBucketStorage
7664
});
77-
logger.info(`Write checkpoint 2: ${JSON.stringify({ currentCheckpoint, id: String(full_user_id) })}`);
65+
66+
logger.info(`Write checkpoint for ${user_id}/${payload.params.client_id}: ${writeCheckpoint} | ${replicationHead}`);
7867

7968
return {
8069
write_checkpoint: String(writeCheckpoint)
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
2+
import { RouteAPI } from '../api/RouteAPI.js';
3+
import { BucketStorageFactory } from '../storage/BucketStorage.js';
4+
5+
export interface CreateWriteCheckpointOptions {
6+
userId: string | undefined;
7+
clientId: string | undefined;
8+
api: RouteAPI;
9+
storage: BucketStorageFactory;
10+
}
11+
export async function createWriteCheckpoint(options: CreateWriteCheckpointOptions) {
12+
const full_user_id = checkpointUserId(options.userId, options.clientId);
13+
14+
const currentCheckpoint = await options.api.getReplicationHead();
15+
16+
const activeSyncRules = await options.storage.getActiveSyncRulesContent();
17+
if (!activeSyncRules) {
18+
throw new ServiceError(ErrorCode.PSYNC_S2302, `Cannot create Write Checkpoint since no sync rules are active.`);
19+
}
20+
21+
using syncBucketStorage = options.storage.getInstance(activeSyncRules);
22+
const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({
23+
user_id: full_user_id,
24+
heads: { '1': currentCheckpoint }
25+
});
26+
return {
27+
writeCheckpoint: String(writeCheckpoint),
28+
replicationHead: currentCheckpoint
29+
};
30+
}
31+
32+
export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
33+
if (user_id == null) {
34+
throw new Error('user_id is required');
35+
}
36+
if (client_id == null) {
37+
return user_id;
38+
}
39+
return `${user_id}/${client_id}`;
40+
}

packages/service-core/src/util/util-index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ export * from './Mutex.js';
55
export * from './protocol-types.js';
66
export * from './secs.js';
77
export * from './utils.js';
8+
export * from './checkpointing.js';
89

910
export * from './config.js';
1011
export * from './config/compound-config-collector.js';

packages/service-core/src/util/utils.ts

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -145,16 +145,6 @@ export function isCompleteRow(storeData: boolean, row: sync_rules.ToastableSqlit
145145
return !hasToastedValues(row);
146146
}
147147

148-
export function checkpointUserId(user_id: string | undefined, client_id: string | undefined) {
149-
if (user_id == null) {
150-
throw new Error('user_id is required');
151-
}
152-
if (client_id == null) {
153-
return user_id;
154-
}
155-
return `${user_id}/${client_id}`;
156-
}
157-
158148
/**
159149
* Reduce a bucket to the final state as stored on the client.
160150
*

0 commit comments

Comments
 (0)