Skip to content

Commit 46d3a58

Browse files
committed
Support customizing subscriptions
1 parent 4321aca commit 46d3a58

File tree

12 files changed

+201
-29
lines changed

12 files changed

+201
-29
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { BucketDescription, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules';
1+
import { BucketDescription, isValidPriority, RequestParameters, SqlSyncRules } from '@powersync/service-sync-rules';
22

33
import * as storage from '../storage/storage-index.js';
44
import * as util from '../util/util-index.js';
@@ -20,6 +20,7 @@ export interface BucketChecksumStateOptions {
2020
bucketStorage: BucketChecksumStateStorage;
2121
syncRules: SqlSyncRules;
2222
syncParams: RequestParameters;
23+
syncRequest: util.StreamingSyncRequest;
2324
logger?: Logger;
2425
initialBucketPositions?: { name: string; after: util.InternalOpId }[];
2526
}
@@ -70,6 +71,7 @@ export class BucketChecksumState {
7071
options.bucketStorage,
7172
options.syncRules,
7273
options.syncParams,
74+
options.syncRequest,
7375
this.logger
7476
);
7577
this.bucketDataPositions = new Map();
@@ -182,12 +184,12 @@ export class BucketChecksumState {
182184

183185
const updatedBucketDescriptions = diff.updatedBuckets.map((e) => ({
184186
...e,
185-
priority: bucketDescriptionMap.get(e.bucket)!.priority
187+
...bucketDescriptionMap.get(e.bucket)!
186188
}));
187189
bucketsToFetch = [...generateBucketsToFetch].map((b) => {
188190
return {
189-
bucket: b,
190-
priority: bucketDescriptionMap.get(b)!.priority
191+
...bucketDescriptionMap.get(b)!,
192+
bucket: b
191193
};
192194
});
193195

@@ -227,7 +229,7 @@ export class BucketChecksumState {
227229
write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined,
228230
buckets: [...checksumMap.values()].map((e) => ({
229231
...e,
230-
priority: bucketDescriptionMap.get(e.bucket)!.priority
232+
...bucketDescriptionMap.get(e.bucket)!
231233
}))
232234
}
233235
} satisfies util.StreamingSyncCheckpoint;
@@ -336,6 +338,7 @@ export class BucketParameterState {
336338
public readonly syncParams: RequestParameters;
337339
private readonly querier: BucketParameterQuerier;
338340
private readonly staticBuckets: Map<string, BucketDescription>;
341+
private readonly explicitStreamSubscriptions: Record<string, util.StreamSubscription>;
339342
private readonly logger: Logger;
340343
private cachedDynamicBuckets: BucketDescription[] | null = null;
341344
private cachedDynamicBucketSet: Set<string> | null = null;
@@ -347,6 +350,7 @@ export class BucketParameterState {
347350
bucketStorage: BucketChecksumStateStorage,
348351
syncRules: SqlSyncRules,
349352
syncParams: RequestParameters,
353+
request: util.StreamingSyncRequest,
350354
logger: Logger
351355
) {
352356
this.context = context;
@@ -355,11 +359,48 @@ export class BucketParameterState {
355359
this.syncParams = syncParams;
356360
this.logger = logger;
357361

358-
this.querier = syncRules.getBucketParameterQuerier(this.syncParams);
362+
const explicitStreamSubscriptions: Record<string, util.StreamSubscription> = {};
363+
const subscriptions = request.subscriptions;
364+
if (subscriptions) {
365+
for (const subscription of subscriptions.subscriptions) {
366+
explicitStreamSubscriptions[subscription.stream] = subscription;
367+
}
368+
}
369+
this.explicitStreamSubscriptions = explicitStreamSubscriptions;
370+
371+
this.querier = syncRules.getBucketParameterQuerier({
372+
globalParameters: this.syncParams,
373+
hasDefaultSubscriptions: subscriptions?.include_defaults ?? true,
374+
resolveSubscription(name) {
375+
const subscription = explicitStreamSubscriptions[name];
376+
if (subscription) {
377+
return subscription.parameters ?? {};
378+
} else {
379+
return null;
380+
}
381+
}
382+
});
359383
this.staticBuckets = new Map<string, BucketDescription>(this.querier.staticBuckets.map((b) => [b.bucket, b]));
360384
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
361385
}
362386

387+
/**
388+
* Overrides the `description` based on subscriptions from the client.
389+
*
390+
* In partiuclar, this can override the priority assigned to a bucket.
391+
*/
392+
overrideBucketDescription(description: BucketDescription): BucketDescription {
393+
const changedPriority = this.explicitStreamSubscriptions[description.definition]?.override_priority;
394+
if (changedPriority != null && isValidPriority(changedPriority)) {
395+
return {
396+
...description,
397+
priority: changedPriority
398+
};
399+
} else {
400+
return description;
401+
}
402+
}
403+
363404
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {
364405
const querier = this.querier;
365406
let update: CheckpointUpdate;

packages/service-core/src/sync/sync.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ async function* streamResponseInner(
100100
bucketStorage,
101101
syncRules,
102102
syncParams,
103+
syncRequest: params,
103104
initialBucketPositions: params.buckets?.map((bucket) => ({
104105
name: bucket.name,
105106
after: BigInt(bucket.after)

packages/service-core/src/util/protocol-types.ts

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,51 @@ export const BucketRequest = t.object({
1313

1414
export type BucketRequest = t.Decoded<typeof BucketRequest>;
1515

16+
/**
17+
* An explicit subscription to a defined sync stream made by the client.
18+
*/
19+
export const StreamSubscription = t.object({
20+
/**
21+
* The defined name of the stream as it appears in sync rules.
22+
*/
23+
stream: t.string,
24+
/**
25+
* An optional dictionary of parameters to pass to this specific stream.
26+
*/
27+
parameters: t.record(t.any).optional(),
28+
/**
29+
* Set when the client wishes to re-assign a different priority to this subscription.
30+
*
31+
* Streams and sync rules can also assign a default priority, but clients are allowed to override those. This can be
32+
* useful when the priority for partial syncs depends on e.g. the current page opened in a client.
33+
*/
34+
override_priority: t.number.optional()
35+
});
36+
37+
export type StreamSubscription = t.Decoded<typeof StreamSubscription>;
38+
39+
/**
40+
* An overview of all subscriptions as part of a streaming sync request.
41+
*/
42+
export const StreamSubscriptions = t.object({
43+
/**
44+
* Whether to sync default streams.
45+
*
46+
* When disabled,only
47+
*/
48+
include_defaults: t.boolean.optional(),
49+
50+
/**
51+
* An array of streams the client has subscribed to.
52+
*/
53+
subscriptions: t.array(StreamSubscription)
54+
});
55+
56+
export type StreamSubscriptions = t.Decoded<typeof StreamSubscriptions>;
57+
1658
export const StreamingSyncRequest = t.object({
1759
/**
18-
* Existing bucket states.
60+
* Existing client-side bucket states.
1961
*/
2062
buckets: t.array(BucketRequest).optional(),
2163

@@ -47,7 +89,12 @@ export const StreamingSyncRequest = t.object({
4789
/**
4890
* Unique client id.
4991
*/
50-
client_id: t.string.optional()
92+
client_id: t.string.optional(),
93+
94+
/**
95+
* If the client is aware of stream subscriptions, an array of streams the client is subscribing to.
96+
*/
97+
subscriptions: StreamSubscriptions.optional()
5198
});
5299

53100
export type StreamingSyncRequest = t.Decoded<typeof StreamingSyncRequest>;

packages/sync-rules/src/BucketDescription.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ export const isValidPriority = (i: number): i is BucketPriority => {
2020
};
2121

2222
export interface BucketDescription {
23+
/**
24+
* The name of the sync rule or stream definition from which the bucket is derived.
25+
*/
26+
definition: string;
2327
/**
2428
* The id of the bucket, which is derived from the name of the bucket's definition
2529
* in the sync rules as well as the values returned by the parameter queries.

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,23 @@ export interface QueryParseResult {
2929
errors: SqlRuleError[];
3030
}
3131

32+
export enum SqlBucketDescriptorType {
33+
SYNC_RULE,
34+
STREAM
35+
}
36+
3237
export class SqlBucketDescriptor {
3338
name: string;
3439
bucketParameters?: string[];
40+
type: SqlBucketDescriptorType;
41+
subscribedToByDefault: boolean;
3542

36-
constructor(name: string) {
43+
constructor(name: string, type: SqlBucketDescriptorType) {
3744
this.name = name;
45+
this.type = type;
46+
47+
// Sync-rule style buckets are subscribed to by default, streams are opt-in unless their definition says otherwise.
48+
this.subscribedToByDefault = type == SqlBucketDescriptorType.SYNC_RULE;
3849
}
3950

4051
/**
@@ -93,6 +104,7 @@ export class SqlBucketDescriptor {
93104
}
94105
}
95106
this.dataQueries.push(query.data);
107+
this.subscribedToByDefault = options.default ?? false;
96108

97109
return {
98110
parsed: true,

packages/sync-rules/src/SqlParameterQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,7 @@ export class SqlParameterQuery {
367367
}
368368

369369
return {
370+
definition: this.descriptorName,
370371
bucket: getBucketId(this.descriptorName, this.bucketParameters, result),
371372
priority: this.priority
372373
};

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 38 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import { SqlEventDescriptor } from './events/SqlEventDescriptor.js';
66
import { IdSequence } from './IdSequence.js';
77
import { validateSyncRulesSchema } from './json_schema.js';
88
import { SourceTableInterface } from './SourceTableInterface.js';
9-
import { QueryParseResult, SqlBucketDescriptor } from './SqlBucketDescriptor.js';
9+
import { QueryParseResult, SqlBucketDescriptor, SqlBucketDescriptorType } from './SqlBucketDescriptor.js';
1010
import { TablePattern } from './TablePattern.js';
1111
import {
1212
EvaluatedParameters,
@@ -40,6 +40,22 @@ export interface SyncRulesOptions {
4040
throwOnError?: boolean;
4141
}
4242

43+
export interface GetQuerierOptions {
44+
globalParameters: RequestParameters;
45+
/**
46+
* Whether the client is subscribing to default subscriptions (the default).
47+
*/
48+
hasDefaultSubscriptions: boolean;
49+
/**
50+
* For streams, this is invoked to check whether the client has requested a subscription to
51+
* the stream.
52+
*
53+
* @param name The name of the stream as it appears in the sync rule definitions.
54+
* @returns If a subscription is active, the stream parameters for that particular stream. Otherwise null.
55+
*/
56+
resolveSubscription: (name: string) => Record<string, any> | null;
57+
}
58+
4359
export class SqlSyncRules implements SyncRules {
4460
bucketDescriptors: SqlBucketDescriptor[] = [];
4561
eventDescriptors: SqlEventDescriptor[] = [];
@@ -144,7 +160,7 @@ export class SqlSyncRules implements SyncRules {
144160
const parameters = value.get('parameters', true) as unknown;
145161
const dataQueries = value.get('data', true) as unknown;
146162

147-
const descriptor = new SqlBucketDescriptor(key);
163+
const descriptor = new SqlBucketDescriptor(key, SqlBucketDescriptorType.SYNC_RULE);
148164

149165
if (parameters instanceof Scalar) {
150166
rules.withScalar(parameters, (q) => {
@@ -179,7 +195,7 @@ export class SqlSyncRules implements SyncRules {
179195
continue;
180196
}
181197

182-
const descriptor = new SqlBucketDescriptor(key);
198+
const descriptor = new SqlBucketDescriptor(key, SqlBucketDescriptorType.STREAM);
183199

184200
const accept_potentially_dangerous_queries =
185201
value.get('accept_potentially_dangerous_queries', true)?.value == true;
@@ -355,8 +371,25 @@ export class SqlSyncRules implements SyncRules {
355371
return { results, errors };
356372
}
357373

358-
getBucketParameterQuerier(parameters: RequestParameters): BucketParameterQuerier {
359-
const queriers = this.bucketDescriptors.map((query) => query.getBucketParameterQuerier(parameters));
374+
getBucketParameterQuerier(options: GetQuerierOptions): BucketParameterQuerier {
375+
const queriers: BucketParameterQuerier[] = [];
376+
for (const descriptor of this.bucketDescriptors) {
377+
let params = options.globalParameters;
378+
const subscription =
379+
descriptor.type == SqlBucketDescriptorType.STREAM ? options.resolveSubscription(descriptor.name) : null;
380+
381+
if (!descriptor.subscribedToByDefault && subscription == null) {
382+
// The client is not subscribing to this stream, so don't query buckets related to it.
383+
continue;
384+
}
385+
386+
if (subscription != null) {
387+
params = params.withAddedParameters(subscription);
388+
}
389+
390+
queriers.push(descriptor.getBucketParameterQuerier(params));
391+
}
392+
360393
return mergeBucketParameterQueriers(queriers);
361394
}
362395

packages/sync-rules/src/StaticSqlParameterQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ export class StaticSqlParameterQuery {
177177

178178
return [
179179
{
180+
definition: this.descriptorName,
180181
bucket: getBucketId(this.descriptorName, this.bucketParameters, result),
181182
priority: this.priority
182183
}

packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -249,6 +249,7 @@ export class TableValuedFunctionSqlParameterQuery {
249249
}
250250

251251
return {
252+
definition: this.descriptorName,
252253
bucket: getBucketId(this.descriptorName, this.bucketParameters, result),
253254
priority: this.priority
254255
};

packages/sync-rules/src/types.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,14 @@ export class RequestParameters implements ParameterValueSet {
135135
}
136136
throw new Error(`Unknown table: ${table}`);
137137
}
138+
139+
withAddedParameters(params: Record<string, any>): RequestParameters {
140+
const clone = structuredClone(this);
141+
clone.rawUserParameters = JSONBig.stringify(params);
142+
clone.userParameters = toSyncRulesParameters(params);
143+
144+
return clone;
145+
}
138146
}
139147

140148
/**

0 commit comments

Comments
 (0)