Skip to content

Commit 490c7ae

Browse files
committed
Rename more stream subscription references
1 parent edb1040 commit 490c7ae

File tree

6 files changed

+56
-46
lines changed

6 files changed

+56
-46
lines changed

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -388,8 +388,8 @@ export class BucketParameterState {
388388

389389
this.querier = syncRules.getBucketParameterQuerier({
390390
globalParameters: this.syncParams,
391-
hasDefaultSubscriptions: this.includeDefaultStreams,
392-
resolveSubscription(name) {
391+
hasDefaultStreams: this.includeDefaultStreams,
392+
resolveOpenedStream(name) {
393393
const subscription = explicitlyOpenedStreams[name];
394394
if (subscription) {
395395
return subscription.parameters ?? {};

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,17 +42,19 @@ export interface SyncRulesOptions {
4242
export interface GetQuerierOptions {
4343
globalParameters: RequestParameters;
4444
/**
45-
* Whether the client is subscribing to default subscriptions (the default).
45+
* Whether the client is subscribing to default query streams.
46+
*
47+
* Client do this by default, but can disable the behavior if needed.
4648
*/
47-
hasDefaultSubscriptions: boolean;
49+
hasDefaultStreams: boolean;
4850
/**
49-
* For streams, this is invoked to check whether the client has requested a subscription to
50-
* the stream.
51+
* For streams, this is invoked to check whether the client has opened the relevant stream.
5152
*
5253
* @param name The name of the stream as it appears in the sync rule definitions.
53-
* @returns If a subscription is active, the stream parameters for that particular stream. Otherwise null.
54+
* @returns If the strema has been opened by the client, the stream parameters for that particular stream. Otherwise
55+
* null.
5456
*/
55-
resolveSubscription: (name: string) => Record<string, any> | null;
57+
resolveOpenedStream: (name: string) => Record<string, any> | null;
5658
}
5759

5860
export class SqlSyncRules implements SyncRules {
@@ -376,15 +378,15 @@ export class SqlSyncRules implements SyncRules {
376378
let params = options.globalParameters;
377379

378380
if (descriptor.type == SqlBucketDescriptorType.STREAM) {
379-
const subscription = options.resolveSubscription(descriptor.name);
381+
const opened = options.resolveOpenedStream(descriptor.name);
380382

381-
if (!descriptor.subscribedToByDefault && subscription == null) {
383+
if (!descriptor.subscribedToByDefault && opened == null) {
382384
// The client is not subscribing to this stream, so don't query buckets related to it.
383385
continue;
384386
}
385387

386-
if (subscription != null) {
387-
params = params.withAddedParameters(subscription);
388+
if (opened != null) {
389+
params = params.withAddedStreamParameters(opened);
388390
}
389391

390392
queriers.push(descriptor.getBucketParameterQuerier(params));

packages/sync-rules/src/TableValuedFunctionSqlParameterQuery.ts

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -222,9 +222,7 @@ export class TableValuedFunctionSqlParameterQuery {
222222

223223
private getIndividualBucketDescription(row: SqliteRow, parameters: RequestParameters): BucketDescription | null {
224224
const mergedParams: ParameterValueSet = {
225-
rawTokenPayload: parameters.rawTokenPayload,
226-
rawUserParameters: parameters.rawUserParameters,
227-
userId: parameters.userId,
225+
...parameters,
228226
lookup: (table, column) => {
229227
if (table == this.callTableName) {
230228
return row[column]!;

packages/sync-rules/src/request_functions.ts

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,19 @@ export interface SqlParameterFunction {
1313
documentation: string;
1414
}
1515

16-
const parametersFunction = (name: string): SqlParameterFunction => {
17-
return {
18-
debugName: name,
19-
call(parameters: ParameterValueSet) {
20-
return parameters.rawUserParameters;
21-
},
22-
getReturnType() {
23-
return ExpressionType.TEXT;
24-
},
25-
detail: 'Unauthenticated request parameters as JSON',
26-
documentation:
27-
'Returns parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.',
28-
usesAuthenticatedRequestParameters: false,
29-
usesUnauthenticatedRequestParameters: true
30-
};
16+
const request_parameters: SqlParameterFunction = {
17+
debugName: 'request.parameters',
18+
call(parameters: ParameterValueSet) {
19+
return parameters.rawUserParameters;
20+
},
21+
getReturnType() {
22+
return ExpressionType.TEXT;
23+
},
24+
detail: 'Unauthenticated request parameters as JSON',
25+
documentation:
26+
'Returns parameters passed by the client as a JSON string. These parameters are not authenticated - any value can be passed in by the client.',
27+
usesAuthenticatedRequestParameters: false,
28+
usesUnauthenticatedRequestParameters: true
3129
};
3230

3331
const request_jwt: SqlParameterFunction = {
@@ -58,16 +56,25 @@ const request_user_id: SqlParameterFunction = {
5856
usesUnauthenticatedRequestParameters: false
5957
};
6058

61-
export const REQUEST_FUNCTIONS_WITHOUT_PARAMETERS: Record<string, SqlParameterFunction> = {
59+
export const REQUEST_FUNCTIONS: Record<string, SqlParameterFunction> = {
60+
parameters: request_parameters,
6261
jwt: request_jwt,
6362
user_id: request_user_id
6463
};
6564

66-
export const REQUEST_FUNCTIONS: Record<string, SqlParameterFunction> = {
67-
parameters: parametersFunction('request.parameters'),
68-
...REQUEST_FUNCTIONS_WITHOUT_PARAMETERS
69-
};
70-
7165
export const QUERY_FUNCTIONS: Record<string, SqlParameterFunction> = {
72-
params: parametersFunction('query.params')
66+
params: {
67+
debugName: 'stream.params',
68+
call(parameters: ParameterValueSet) {
69+
return parameters.rawUserParameters;
70+
},
71+
getReturnType() {
72+
return ExpressionType.TEXT;
73+
},
74+
detail: 'Unauthenticated stream parameters as JSON',
75+
documentation:
76+
'Returns stream passed by the client when opening the stream. These parameters are not authenticated - any value can be passed in by the client.',
77+
usesAuthenticatedRequestParameters: false,
78+
usesUnauthenticatedRequestParameters: true
79+
}
7380
};

packages/sync-rules/src/sql_filters.ts

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,18 +427,14 @@ export class SqlTools {
427427
return this.error(`${schema} schema is not available in data queries`, expr);
428428
}
429429

430-
if (fn == 'parameters' && this.supportsStreamInputs) {
431-
return this.error(`'request.parameters()' is unavailable on streams - use 'stream.params()' instead.`, expr);
432-
}
433-
434430
if (expr.args.length > 0) {
435431
return this.error(`Function '${schema}.${fn}' does not take arguments`, expr);
436432
}
437433

438434
if (fn in REQUEST_FUNCTIONS) {
439435
const fnImpl = REQUEST_FUNCTIONS[fn];
440436
return {
441-
key: 'request.parameters()',
437+
key: `stream.${fn}()`,
442438
lookupParameterValue(parameters) {
443439
return fnImpl.call(parameters);
444440
},
@@ -456,7 +452,7 @@ export class SqlTools {
456452
if (fn in QUERY_FUNCTIONS) {
457453
const fnImpl = QUERY_FUNCTIONS[fn];
458454
return {
459-
key: 'stream.params()',
455+
key: `stream.${fn}()`,
460456
lookupParameterValue(parameters) {
461457
return fnImpl.call(parameters);
462458
},

packages/sync-rules/src/types.ts

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ export interface ParameterValueSet {
8585
*/
8686
rawUserParameters: string;
8787

88+
/**
89+
* For streams, the raw JSON string of stream parameters.
90+
*/
91+
rawStreamParameters: string | null;
92+
8893
/**
8994
* JSON string of raw request parameters.
9095
*/
@@ -102,6 +107,8 @@ export class RequestParameters implements ParameterValueSet {
102107
*/
103108
rawUserParameters: string;
104109

110+
rawStreamParameters: string | null;
111+
105112
/**
106113
* JSON string of raw request parameters.
107114
*/
@@ -125,6 +132,7 @@ export class RequestParameters implements ParameterValueSet {
125132

126133
this.rawUserParameters = JSONBig.stringify(clientParameters);
127134
this.userParameters = toSyncRulesParameters(clientParameters);
135+
this.rawStreamParameters = null;
128136
}
129137

130138
lookup(table: string, column: string): SqliteJsonValue {
@@ -136,10 +144,9 @@ export class RequestParameters implements ParameterValueSet {
136144
throw new Error(`Unknown table: ${table}`);
137145
}
138146

139-
withAddedParameters(params: Record<string, any>): RequestParameters {
147+
withAddedStreamParameters(params: Record<string, any>): RequestParameters {
140148
const clone = structuredClone(this);
141-
clone.rawUserParameters = JSONBig.stringify(params);
142-
clone.userParameters = toSyncRulesParameters(params);
149+
clone.rawStreamParameters = JSONBig.stringify(params);
143150

144151
return clone;
145152
}

0 commit comments

Comments
 (0)