Skip to content

Commit e9d178d

Browse files
committed
Include query streams in checkpoint message
1 parent 46d3a58 commit e9d178d

File tree

3 files changed

+44
-13
lines changed

3 files changed

+44
-13
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import { JSONBig } from '@powersync/service-jsonbig';
1414
import { BucketParameterQuerier } from '@powersync/service-sync-rules/src/BucketParameterQuerier.js';
1515
import { SyncContext } from './SyncContext.js';
1616
import { getIntersection, hasIntersection } from './util.js';
17+
import { SqlBucketDescriptor, SqlBucketDescriptorType } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js';
1718

1819
export interface BucketChecksumStateOptions {
1920
syncContext: SyncContext;
@@ -102,7 +103,9 @@ export class BucketChecksumState {
102103
const { buckets: allBuckets, updatedBuckets } = update;
103104

104105
/** Set of all buckets in this checkpoint. */
105-
const bucketDescriptionMap = new Map(allBuckets.map((b) => [b.bucket, b]));
106+
const bucketDescriptionMap = new Map(
107+
allBuckets.map((b) => [b.bucket, this.parameterState.overrideBucketDescription(b)])
108+
);
106109

107110
if (bucketDescriptionMap.size > this.context.maxBuckets) {
108111
throw new ServiceError(
@@ -223,14 +226,27 @@ export class BucketChecksumState {
223226
this.logger.info(message, { checkpoint: base.checkpoint, user_id: user_id, buckets: allBuckets.length });
224227
};
225228
bucketsToFetch = allBuckets;
229+
this.parameterState.syncRules.bucketDescriptors;
230+
231+
const subscriptions: util.SubscribedStream[] = [];
232+
for (const desc of this.parameterState.syncRules.bucketDescriptors) {
233+
if (desc.type == SqlBucketDescriptorType.STREAM && this.parameterState.isSubscribedToStream(desc)) {
234+
subscriptions.push({
235+
name: desc.name,
236+
is_default: desc.subscribedToByDefault
237+
});
238+
}
239+
}
240+
226241
checkpointLine = {
227242
checkpoint: {
228243
last_op_id: util.internalToExternalOpId(base.checkpoint),
229244
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
230245
buckets: [...checksumMap.values()].map((e) => ({
231246
...e,
232247
...bucketDescriptionMap.get(e.bucket)!
233-
}))
248+
})),
249+
included_subscriptions: subscriptions
234250
}
235251
} satisfies util.StreamingSyncCheckpoint;
236252
}
@@ -338,6 +354,7 @@ export class BucketParameterState {
338354
public readonly syncParams: RequestParameters;
339355
private readonly querier: BucketParameterQuerier;
340356
private readonly staticBuckets: Map<string, BucketDescription>;
357+
private readonly includeDefaultStreams: boolean;
341358
private readonly explicitStreamSubscriptions: Record<string, util.StreamSubscription>;
342359
private readonly logger: Logger;
343360
private cachedDynamicBuckets: BucketDescription[] | null = null;
@@ -366,11 +383,12 @@ export class BucketParameterState {
366383
explicitStreamSubscriptions[subscription.stream] = subscription;
367384
}
368385
}
386+
this.includeDefaultStreams = subscriptions?.include_defaults ?? true;
369387
this.explicitStreamSubscriptions = explicitStreamSubscriptions;
370388

371389
this.querier = syncRules.getBucketParameterQuerier({
372390
globalParameters: this.syncParams,
373-
hasDefaultSubscriptions: subscriptions?.include_defaults ?? true,
391+
hasDefaultSubscriptions: this.includeDefaultStreams,
374392
resolveSubscription(name) {
375393
const subscription = explicitStreamSubscriptions[name];
376394
if (subscription) {
@@ -401,6 +419,10 @@ export class BucketParameterState {
401419
}
402420
}
403421

422+
isSubscribedToStream(desc: SqlBucketDescriptor): boolean {
423+
return (desc.subscribedToByDefault && this.includeDefaultStreams) || desc.name in this.explicitStreamSubscriptions;
424+
}
425+
404426
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
405427
const querier = this.querier;
406428
let update: CheckpointUpdate;

packages/service-core/src/util/protocol-types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,16 @@ export type StreamingSyncLine =
146146
*/
147147
export type ProtocolOpId = string;
148148

149+
export interface SubscribedStream {
150+
name: string;
151+
is_default: boolean;
152+
}
153+
149154
export interface Checkpoint {
150155
last_op_id: ProtocolOpId;
151156
write_checkpoint?: ProtocolOpId;
152157
buckets: BucketChecksumWithDescription[];
158+
included_subscriptions: SubscribedStream[];
153159
}
154160

155161
export interface BucketState {

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
import { isScalar, LineCounter, parseDocument, Scalar, YAMLMap, YAMLSeq } from 'yaml';
2-
import { BucketPriority, isValidPriority } from './BucketDescription.js';
2+
import { isValidPriority } from './BucketDescription.js';
33
import { BucketParameterQuerier, mergeBucketParameterQueriers } from './BucketParameterQuerier.js';
44
import { SqlRuleError, SyncRulesErrors, YamlError } from './errors.js';
55
import { SqlEventDescriptor } from './events/SqlEventDescriptor.js';
6-
import { IdSequence } from './IdSequence.js';
76
import { validateSyncRulesSchema } from './json_schema.js';
87
import { SourceTableInterface } from './SourceTableInterface.js';
98
import { QueryParseResult, SqlBucketDescriptor, SqlBucketDescriptorType } from './SqlBucketDescriptor.js';
@@ -375,16 +374,20 @@ export class SqlSyncRules implements SyncRules {
375374
const queriers: BucketParameterQuerier[] = [];
376375
for (const descriptor of this.bucketDescriptors) {
377376
let params = options.globalParameters;
378-
const subscription =
379-
descriptor.type == SqlBucketDescriptorType.STREAM ? options.resolveSubscription(descriptor.name) : null;
380377

381-
if (!descriptor.subscribedToByDefault && subscription == null) {
382-
// The client is not subscribing to this stream, so don't query buckets related to it.
383-
continue;
384-
}
378+
if (descriptor.type == SqlBucketDescriptorType.STREAM) {
379+
const subscription = options.resolveSubscription(descriptor.name);
380+
381+
if (!descriptor.subscribedToByDefault && subscription == null) {
382+
// The client is not subscribing to this stream, so don't query buckets related to it.
383+
continue;
384+
}
385+
386+
if (subscription != null) {
387+
params = params.withAddedParameters(subscription);
388+
}
385389

386-
if (subscription != null) {
387-
params = params.withAddedParameters(subscription);
390+
queriers.push(descriptor.getBucketParameterQuerier(params));
388391
}
389392

390393
queriers.push(descriptor.getBucketParameterQuerier(params));

0 commit comments

Comments
 (0)