Skip to content

Commit edb1040

Browse files
committed
Avoid subscription terminology
1 parent e9d178d commit edb1040

File tree

2 files changed

+24
-24
lines changed

2 files changed

+24
-24
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ export class BucketChecksumState {
228228
bucketsToFetch = allBuckets;
229229
this.parameterState.syncRules.bucketDescriptors;
230230

231-
const subscriptions: util.SubscribedStream[] = [];
231+
const subscriptions: util.StreamDescription[] = [];
232232
for (const desc of this.parameterState.syncRules.bucketDescriptors) {
233233
if (desc.type == SqlBucketDescriptorType.STREAM && this.parameterState.isSubscribedToStream(desc)) {
234234
subscriptions.push({
@@ -246,7 +246,7 @@ export class BucketChecksumState {
246246
...e,
247247
...bucketDescriptionMap.get(e.bucket)!
248248
})),
249-
included_subscriptions: subscriptions
249+
streams: subscriptions
250250
}
251251
} satisfies util.StreamingSyncCheckpoint;
252252
}
@@ -355,7 +355,7 @@ export class BucketParameterState {
355355
private readonly querier: BucketParameterQuerier;
356356
private readonly staticBuckets: Map<string, BucketDescription>;
357357
private readonly includeDefaultStreams: boolean;
358-
private readonly explicitStreamSubscriptions: Record<string, util.StreamSubscription>;
358+
private readonly explicitlyOpenedStreams: Record<string, util.OpenedStream>;
359359
private readonly logger: Logger;
360360
private cachedDynamicBuckets: BucketDescription[] | null = null;
361361
private cachedDynamicBucketSet: Set<string> | null = null;
@@ -376,21 +376,21 @@ export class BucketParameterState {
376376
this.syncParams = syncParams;
377377
this.logger = logger;
378378

379-
const explicitStreamSubscriptions: Record<string, util.StreamSubscription> = {};
379+
const explicitlyOpenedStreams: Record<string, util.OpenedStream> = {};
380380
const subscriptions = request.subscriptions;
381381
if (subscriptions) {
382-
for (const subscription of subscriptions.subscriptions) {
383-
explicitStreamSubscriptions[subscription.stream] = subscription;
382+
for (const subscription of subscriptions.opened) {
383+
explicitlyOpenedStreams[subscription.stream] = subscription;
384384
}
385385
}
386386
this.includeDefaultStreams = subscriptions?.include_defaults ?? true;
387-
this.explicitStreamSubscriptions = explicitStreamSubscriptions;
387+
this.explicitlyOpenedStreams = explicitlyOpenedStreams;
388388

389389
this.querier = syncRules.getBucketParameterQuerier({
390390
globalParameters: this.syncParams,
391391
hasDefaultSubscriptions: this.includeDefaultStreams,
392392
resolveSubscription(name) {
393-
const subscription = explicitStreamSubscriptions[name];
393+
const subscription = explicitlyOpenedStreams[name];
394394
if (subscription) {
395395
return subscription.parameters ?? {};
396396
} else {
@@ -408,7 +408,7 @@ export class BucketParameterState {
408408
* In partiuclar, this can override the priority assigned to a bucket.
409409
*/
410410
overrideBucketDescription(description: BucketDescription): BucketDescription {
411-
const changedPriority = this.explicitStreamSubscriptions[description.definition]?.override_priority;
411+
const changedPriority = this.explicitlyOpenedStreams[description.definition]?.override_priority;
412412
if (changedPriority != null && isValidPriority(changedPriority)) {
413413
return {
414414
...description,
@@ -420,7 +420,7 @@ export class BucketParameterState {
420420
}
421421

422422
isSubscribedToStream(desc: SqlBucketDescriptor): boolean {
423-
return (desc.subscribedToByDefault && this.includeDefaultStreams) || desc.name in this.explicitStreamSubscriptions;
423+
return (desc.subscribedToByDefault && this.includeDefaultStreams) || desc.name in this.explicitlyOpenedStreams;
424424
}
425425

426426
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate> {

packages/service-core/src/util/protocol-types.ts

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,32 +14,32 @@ export const BucketRequest = t.object({
1414
export type BucketRequest = t.Decoded<typeof BucketRequest>;
1515

1616
/**
17-
* An explicit subscription to a defined sync stream made by the client.
17+
* A sync steam that a client has expressed interest in by explicitly opening it on the client side.
1818
*/
19-
export const StreamSubscription = t.object({
19+
export const OpenedStream = t.object({
2020
/**
21-
* The defined name of the stream as it appears in sync rules.
21+
* The defined name of the stream as it appears in sync stream definitions.
2222
*/
2323
stream: t.string,
2424
/**
2525
* An optional dictionary of parameters to pass to this specific stream.
2626
*/
2727
parameters: t.record(t.any).optional(),
2828
/**
29-
* Set when the client wishes to re-assign a different priority to this subscription.
29+
* Set when the client wishes to re-assign a different priority to this stream.
3030
*
3131
* Streams and sync rules can also assign a default priority, but clients are allowed to override those. This can be
3232
* useful when the priority for partial syncs depends on e.g. the current page opened in a client.
3333
*/
3434
override_priority: t.number.optional()
3535
});
3636

37-
export type StreamSubscription = t.Decoded<typeof StreamSubscription>;
37+
export type OpenedStream = t.Decoded<typeof OpenedStream>;
3838

3939
/**
40-
* An overview of all subscriptions as part of a streaming sync request.
40+
* An overview of all opened streams as part of a streaming sync request.
4141
*/
42-
export const StreamSubscriptions = t.object({
42+
export const OpenedStreams = t.object({
4343
/**
4444
* Whether to sync default streams.
4545
*
@@ -48,12 +48,12 @@ export const StreamSubscriptions = t.object({
4848
include_defaults: t.boolean.optional(),
4949

5050
/**
51-
* An array of streams the client has subscribed to.
51+
* An array of sync streams the client has opened explicitly.
5252
*/
53-
subscriptions: t.array(StreamSubscription)
53+
opened: t.array(OpenedStream)
5454
});
5555

56-
export type StreamSubscriptions = t.Decoded<typeof StreamSubscriptions>;
56+
export type StreamSubscriptions = t.Decoded<typeof OpenedStreams>;
5757

5858
export const StreamingSyncRequest = t.object({
5959
/**
@@ -92,9 +92,9 @@ export const StreamingSyncRequest = t.object({
9292
client_id: t.string.optional(),
9393

9494
/**
95-
* If the client is aware of stream subscriptions, an array of streams the client is subscribing to.
95+
* If the client is aware of streams, an array of streams the client has opened.
9696
*/
97-
subscriptions: StreamSubscriptions.optional()
97+
subscriptions: OpenedStreams.optional()
9898
});
9999

100100
export type StreamingSyncRequest = t.Decoded<typeof StreamingSyncRequest>;
@@ -146,7 +146,7 @@ export type StreamingSyncLine =
146146
*/
147147
export type ProtocolOpId = string;
148148

149-
export interface SubscribedStream {
149+
export interface StreamDescription {
150150
name: string;
151151
is_default: boolean;
152152
}
@@ -155,7 +155,7 @@ export interface Checkpoint {
155155
last_op_id: ProtocolOpId;
156156
write_checkpoint?: ProtocolOpId;
157157
buckets: BucketChecksumWithDescription[];
158-
included_subscriptions: SubscribedStream[];
158+
streams: StreamDescription[];
159159
}
160160

161161
export interface BucketState {

0 commit comments

Comments
 (0)