Skip to content

Commit 826545c

Browse files
committed
Translate resolved buckets
1 parent 490c7ae commit 826545c

File tree

4 files changed

+151
-44
lines changed

4 files changed

+151
-44
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 58 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,12 @@
1-
import { BucketDescription, isValidPriority, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules';
1+
import {
2+
BucketDescription,
3+
BucketPriority,
4+
isValidPriority,
5+
RequestedStream,
6+
RequestParameters,
7+
ResolvedBucket,
8+
SqlSyncRules
9+
} from '@powersync/service-sync-rules';
210

311
import * as storage from '../storage/storage-index.js';
412
import * as util from '../util/util-index.js';
@@ -355,7 +363,9 @@ export class BucketParameterState {
355363
private readonly querier: BucketParameterQuerier;
356364
private readonly staticBuckets: Map<string, BucketDescription>;
357365
private readonly includeDefaultStreams: boolean;
358-
private readonly explicitlyOpenedStreams: Record<string, util.OpenedStream>;
366+
// Indexed by the client-side id
367+
private readonly explicitStreamSubscriptions: Record<string, util.RequestedStreamSubscription>;
368+
private readonly subscribedStreamNames: Set<string>;
359369
private readonly logger: Logger;
360370
private cachedDynamicBuckets: BucketDescription[] | null = null;
361371
private cachedDynamicBucketSet: Set<string> | null = null;
@@ -376,51 +386,73 @@ export class BucketParameterState {
376386
this.syncParams = syncParams;
377387
this.logger = logger;
378388

379-
const explicitlyOpenedStreams: Record<string, util.OpenedStream> = {};
389+
const idToStreamSubscription: Record<string, util.RequestedStreamSubscription> = {};
390+
const streamsByName: Record<string, RequestedStream[]> = {};
380391
const subscriptions = request.subscriptions;
381392
if (subscriptions) {
382393
for (const subscription of subscriptions.opened) {
383-
explicitlyOpenedStreams[subscription.stream] = subscription;
394+
idToStreamSubscription[subscription.stream] = subscription;
395+
396+
const syncRuleStream: RequestedStream = {
397+
parameters: subscription.parameters ?? {},
398+
opaque_id: subscription.client_id
399+
};
400+
if (Object.hasOwn(streamsByName, subscription.stream)) {
401+
streamsByName[subscription.stream].push(syncRuleStream);
402+
} else {
403+
streamsByName[subscription.stream] = [syncRuleStream];
404+
}
384405
}
385406
}
386407
this.includeDefaultStreams = subscriptions?.include_defaults ?? true;
387-
this.explicitlyOpenedStreams = explicitlyOpenedStreams;
408+
this.explicitStreamSubscriptions = idToStreamSubscription;
388409

389410
this.querier = syncRules.getBucketParameterQuerier({
390411
globalParameters: this.syncParams,
391412
hasDefaultStreams: this.includeDefaultStreams,
392-
resolveOpenedStream(name) {
393-
const subscription = explicitlyOpenedStreams[name];
394-
if (subscription) {
395-
return subscription.parameters ?? {};
396-
} else {
397-
return null;
398-
}
399-
}
413+
streams: streamsByName
400414
});
401415
this.staticBuckets = new Map<string, BucketDescription>(this.querier.staticBuckets.map((b) => [b.bucket, b]));
402416
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
417+
this.subscribedStreamNames = new Set(Object.keys(streamsByName));
403418
}
404419

405420
/**
406-
* Overrides the `description` based on subscriptions from the client.
407-
*
408-
* In partiuclar, this can override the priority assigned to a bucket.
421+
* Translates an internal sync-rules {@link ResolvedBucket} instance to the public
422+
* {@link util.ClientBucketDescription}.
409423
*/
410-
overrideBucketDescription(description: BucketDescription): BucketDescription {
411-
const changedPriority = this.explicitlyOpenedStreams[description.definition]?.override_priority;
412-
if (changedPriority != null && isValidPriority(changedPriority)) {
413-
return {
414-
...description,
415-
priority: changedPriority
416-
};
417-
} else {
418-
return description;
424+
translateResolvedBucket(description: ResolvedBucket): util.ClientBucketDescription {
425+
// Assign
426+
let priorityOverride: BucketPriority | null = null;
427+
for (const reason of description.inclusion_reasons) {
428+
if (reason != 'default') {
429+
const requestedPriority = this.explicitStreamSubscriptions[reason.subscription]?.override_priority;
430+
if (requestedPriority != null) {
431+
if (priorityOverride == null) {
432+
priorityOverride = requestedPriority as BucketPriority;
433+
} else {
434+
priorityOverride = Math.min(requestedPriority, priorityOverride) as BucketPriority;
435+
}
436+
}
437+
}
419438
}
439+
440+
return {
441+
definition: description.definition,
442+
bucket: description.bucket,
443+
priority: priorityOverride ?? description.priority,
444+
subscriptions: description.inclusion_reasons.map((reason) => {
445+
if (reason == 'default') {
446+
return { def: description.definition };
447+
} else {
448+
return { sub: reason.subscription };
449+
}
450+
})
451+
};
420452
}
421453

422454
isSubscribedToStream(desc: SqlBucketDescriptor): boolean {
423-
return (desc.subscribedToByDefault && this.includeDefaultStreams) || desc.name in this.explicitlyOpenedStreams;
455+
return (desc.subscribedToByDefault && this.includeDefaultStreams) || this.subscribedStreamNames.has(desc.name);
424456
}
425457

426458
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {

packages/service-core/src/util/protocol-types.ts

Lines changed: 44 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,17 @@ export type BucketRequest = t.Decoded<typeof BucketRequest>;
1616
/**
1717
* A sync steam that a client has expressed interest in by explicitly opening it on the client side.
1818
*/
19-
export const OpenedStream = t.object({
19+
export const RequestedStreamSubscription = t.object({
2020
/**
2121
* The defined name of the stream as it appears in sync stream definitions.
2222
*/
2323
stream: t.string,
24+
/**
25+
* An opaque textual identifier assigned to this request by the client.
26+
*
27+
* Wh
28+
*/
29+
client_id: t.string,
2430
/**
2531
* An optional dictionary of parameters to pass to this specific stream.
2632
*/
@@ -34,26 +40,26 @@ export const OpenedStream = t.object({
3440
override_priority: t.number.optional()
3541
});
3642

37-
export type OpenedStream = t.Decoded<typeof OpenedStream>;
43+
export type RequestedStreamSubscription = t.Decoded<typeof RequestedStreamSubscription>;
3844

3945
/**
40-
* An overview of all opened streams as part of a streaming sync request.
46+
* An overview of all subscribed streams as part of a streaming sync request.
4147
*/
42-
export const OpenedStreams = t.object({
48+
export const StreamSubscriptionRequest = t.object({
4349
/**
4450
* Whether to sync default streams.
4551
*
46-
* When disabled,only
52+
* When disabled, only explicitly-opened subscriptions are included.
4753
*/
4854
include_defaults: t.boolean.optional(),
4955

5056
/**
5157
* An array of sync streams the client has opened explicitly.
5258
*/
53-
opened: t.array(OpenedStream)
59+
opened: t.array(RequestedStreamSubscription)
5460
});
5561

56-
export type StreamSubscriptions = t.Decoded<typeof OpenedStreams>;
62+
export type StreamSubscriptionRequest = t.Decoded<typeof StreamSubscriptionRequest>;
5763

5864
export const StreamingSyncRequest = t.object({
5965
/**
@@ -94,7 +100,7 @@ export const StreamingSyncRequest = t.object({
94100
/**
95101
* If the client is aware of streams, an array of streams the client has opened.
96102
*/
97-
subscriptions: OpenedStreams.optional()
103+
subscriptions: StreamSubscriptionRequest.optional()
98104
});
99105

100106
export type StreamingSyncRequest = t.Decoded<typeof StreamingSyncRequest>;
@@ -107,7 +113,7 @@ export interface StreamingSyncCheckpointDiff {
107113
checkpoint_diff: {
108114
last_op_id: ProtocolOpId;
109115
write_checkpoint?: ProtocolOpId;
110-
updated_buckets: BucketChecksumWithDescription[];
116+
updated_buckets: CheckpointBucket[];
111117
removed_buckets: string[];
112118
};
113119
}
@@ -154,7 +160,7 @@ export interface StreamDescription {
154160
export interface Checkpoint {
155161
last_op_id: ProtocolOpId;
156162
write_checkpoint?: ProtocolOpId;
157-
buckets: BucketChecksumWithDescription[];
163+
buckets: CheckpointBucket[];
158164
streams: StreamDescription[];
159165
}
160166

@@ -211,4 +217,31 @@ export interface BucketChecksum {
211217
count: number;
212218
}
213219

214-
export interface BucketChecksumWithDescription extends BucketChecksum, BucketDescription {}
220+
/**
221+
* The reason a particular bucket is included in a checkpoint.
222+
*
223+
* This information allows clients to associate individual buckets with sync streams they're subscribed to. Having that
224+
* association is useful because it enables clients to track progress for individual sync streams.
225+
*/
226+
export type BucketSubscriptionReason = BucketDerivedFromDefaultStream | BucketDerivedFromExplicitSubscription;
227+
228+
/**
229+
* A bucket has been included in a checkpoint because it's part of a default stream.
230+
*
231+
* The string is the name of the stream definition.
232+
*/
233+
export type BucketDerivedFromDefaultStream = { def: string };
234+
235+
/**
236+
* The bucket has been included in a checkpoint because it's part of a stream that a client has explicitly subscribed
237+
* to.
238+
*
239+
* The string is the client id associated with the subscription in {@link RequestedStreamSubscription}.
240+
*/
241+
export type BucketDerivedFromExplicitSubscription = { sub: string };
242+
243+
export interface ClientBucketDescription extends BucketDescription {
244+
subscriptions: BucketSubscriptionReason[];
245+
}
246+
247+
export interface CheckpointBucket extends BucketChecksum, ClientBucketDescription {}

packages/sync-rules/src/BucketDescription.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ export interface BucketDescription {
2424
* The name of the sync rule or stream definition from which the bucket is derived.
2525
*/
2626
definition: string;
27+
2728
/**
2829
* The id of the bucket, which is derived from the name of the bucket's definition
2930
* in the sync rules as well as the values returned by the parameter queries.
@@ -34,3 +35,15 @@ export interface BucketDescription {
3435
*/
3536
priority: BucketPriority;
3637
}
38+
39+
/**
40+
* A bucket that was resolved to a specific request including stream subscriptions.
41+
*
42+
* This includes information on why the bucket has been included in a checkpoint subset
43+
* shown to clients.
44+
*/
45+
export interface ResolvedBucket extends BucketDescription {
46+
inclusion_reasons: BucketInclusionReason[];
47+
}
48+
49+
export type BucketInclusionReason = 'default' | { subscription: string };

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 36 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
QueryParseOptions,
2121
RequestParameters,
2222
SourceSchema,
23+
SqliteJsonRow,
2324
SqliteRow,
2425
StreamParseOptions,
2526
SyncRules
@@ -39,6 +40,21 @@ export interface SyncRulesOptions {
3940
throwOnError?: boolean;
4041
}
4142

43+
export interface RequestedStream {
44+
/**
45+
* The parameters for the explicit stream subscription.
46+
*
47+
* Unlike {@link GetQuerierOptions.globalParameters}, these parameters are only applied to the particular stream.
48+
*/
49+
parameters: SqliteJsonRow | null;
50+
51+
/**
52+
* An opaque id of the stream subscription, used to associate buckets with the stream subscriptions that have caused
53+
* them to be included.
54+
*/
55+
opaque_id: string;
56+
}
57+
4258
export interface GetQuerierOptions {
4359
globalParameters: RequestParameters;
4460
/**
@@ -48,13 +64,14 @@ export interface GetQuerierOptions {
4864
*/
4965
hasDefaultStreams: boolean;
5066
/**
67+
*
5168
* For streams, this is invoked to check whether the client has opened the relevant stream.
5269
*
5370
* @param name The name of the stream as it appears in the sync rule definitions.
5471
* @returns If the strema has been opened by the client, the stream parameters for that particular stream. Otherwise
5572
* null.
5673
*/
57-
resolveOpenedStream: (name: string) => Record<string, any> | null;
74+
streams: Record<string, RequestedStream[]>;
5875
}
5976

6077
export class SqlSyncRules implements SyncRules {
@@ -378,21 +395,33 @@ export class SqlSyncRules implements SyncRules {
378395
let params = options.globalParameters;
379396

380397
if (descriptor.type == SqlBucketDescriptorType.STREAM) {
381-
const opened = options.resolveOpenedStream(descriptor.name);
398+
const subscriptions = options.streams[descriptor.name] ?? [];
382399

383-
if (!descriptor.subscribedToByDefault && opened == null) {
400+
if (!descriptor.subscribedToByDefault && subscriptions.length) {
384401
// The client is not subscribing to this stream, so don't query buckets related to it.
385402
continue;
386403
}
387404

388-
if (opened != null) {
389-
params = params.withAddedStreamParameters(opened);
405+
let hasExplicitDefaultSubscription = false;
406+
for (const subscription of subscriptions) {
407+
let subscriptionParams = params;
408+
if (subscription.parameters != null) {
409+
subscriptionParams = params.withAddedStreamParameters(subscription.parameters);
410+
} else {
411+
hasExplicitDefaultSubscription = true;
412+
}
413+
414+
queriers.push(descriptor.getBucketParameterQuerier(subscriptionParams));
390415
}
391416

417+
// If the stream is subscribed to by default and there is no explicit subscription that would match the default
418+
// subscription, also include the default querier.
419+
if (descriptor.subscribedToByDefault && !hasExplicitDefaultSubscription) {
420+
queriers.push(descriptor.getBucketParameterQuerier(params));
421+
}
422+
} else {
392423
queriers.push(descriptor.getBucketParameterQuerier(params));
393424
}
394-
395-
queriers.push(descriptor.getBucketParameterQuerier(params));
396425
}
397426

398427
return mergeBucketParameterQueriers(queriers);

0 commit comments

Comments
 (0)