Skip to content

Commit dfbdf09

Browse files
committed
Use an explicit ParameterLookup class for better typing.
1 parent 8f430db commit dfbdf09

File tree

11 files changed

+86
-48
lines changed

11 files changed

+86
-48
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@ import {
1414
InternalOpId,
1515
internalToExternalOpId,
1616
ProtocolOpId,
17-
getLookupBucketDefinitionName,
1817
ReplicationCheckpoint,
1918
storage,
2019
utils,
2120
WatchWriteCheckpointOptions,
2221
CHECKPOINT_INVALIDATE_ALL,
2322
deserializeParameterLookup
2423
} from '@powersync/service-core';
25-
import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
24+
import { SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
2625
import * as bson from 'bson';
2726
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
2827
import { LRUCache } from 'lru-cache';
@@ -158,7 +157,7 @@ export class MongoSyncBucketStorage
158157

159158
await callback(batch);
160159
await batch.flush();
161-
if (batch.last_flushed_op) {
160+
if (batch.last_flushed_op != null) {
162161
return { flushed_op: batch.last_flushed_op };
163162
} else {
164163
return null;
@@ -256,7 +255,7 @@ export class MongoSyncBucketStorage
256255
return result!;
257256
}
258257

259-
async getParameterSets(checkpoint: utils.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
258+
async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
260259
const lookupFilter = lookups.map((lookup) => {
261260
return storage.serializeLookup(lookup);
262261
});

modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import * as lib_postgres from '@powersync/lib-service-postgres';
2-
import { ReplicationAssertionError } from '@powersync/lib-services-framework';
32
import {
43
BroadcastIterable,
54
CHECKPOINT_INVALIDATE_ALL,
@@ -14,23 +13,22 @@ import {
1413
} from '@powersync/service-core';
1514
import { JSONBig } from '@powersync/service-jsonbig';
1615
import * as sync_rules from '@powersync/service-sync-rules';
16+
import * as timers from 'timers/promises';
1717
import * as uuid from 'uuid';
1818
import { BIGINT_MAX } from '../types/codecs.js';
1919
import { models, RequiredOperationBatchLimits } from '../types/types.js';
2020
import { replicaIdToSubkey } from '../utils/bson.js';
2121
import { mapOpEntry } from '../utils/bucket-data.js';
22-
import * as timers from 'timers/promises';
2322

2423
import * as framework from '@powersync/lib-services-framework';
2524
import { StatementParam } from '@powersync/service-jpgwire';
25+
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
2626
import { SourceTableDecoded, StoredRelationId } from '../types/models/SourceTable.js';
2727
import { pick } from '../utils/ts-codec.js';
2828
import { PostgresBucketBatch } from './batch/PostgresBucketBatch.js';
2929
import { PostgresWriteCheckpointAPI } from './checkpoints/PostgresWriteCheckpointAPI.js';
3030
import { PostgresBucketStorageFactory } from './PostgresBucketStorageFactory.js';
3131
import { PostgresCompactor } from './PostgresCompactor.js';
32-
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
33-
import { Decoded } from 'ts-codec';
3432

3533
export type PostgresSyncRulesStorageOptions = {
3634
factory: PostgresBucketStorageFactory;
@@ -354,7 +352,7 @@ export class PostgresSyncRulesStorage
354352

355353
async getParameterSets(
356354
checkpoint: utils.InternalOpId,
357-
lookups: sync_rules.SqliteJsonValue[][]
355+
lookups: sync_rules.ParameterLookup[]
358356
): Promise<sync_rules.SqliteJsonRow[]> {
359357
const rows = await this.db.sql`
360358
SELECT DISTINCT

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 31 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { getUuidReplicaIdentityBson, OplogEntry, storage } from '@powersync/service-core';
2-
import { RequestParameters } from '@powersync/service-sync-rules';
2+
import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules';
33
import { expect, test } from 'vitest';
44
import * as test_utils from '../test-utils/test-utils-index.js';
55

@@ -65,7 +65,9 @@ bucket_definitions:
6565
});
6666
});
6767

68-
const parameters = await bucketStorage.getParameterSets(result!.flushed_op, [['mybucket', '1', 'user1']]);
68+
const parameters = await bucketStorage.getParameterSets(result!.flushed_op, [
69+
ParameterLookup.normalized('mybucket', '1', ['user1'])
70+
]);
6971
expect(parameters).toEqual([
7072
{
7173
group_id: 'group1a'
@@ -110,15 +112,19 @@ bucket_definitions:
110112
});
111113
});
112114

113-
const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [['mybucket', '1', 'user1']]);
115+
const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [
116+
ParameterLookup.normalized('mybucket', '1', ['user1'])
117+
]);
114118
expect(parameters).toEqual([
115119
{
116120
group_id: 'group2'
117121
}
118122
]);
119123

120124
// Use the checkpoint to get older data if relevant
121-
const parameters2 = await bucketStorage.getParameterSets(result1!.flushed_op, [['mybucket', '1', 'user1']]);
125+
const parameters2 = await bucketStorage.getParameterSets(result1!.flushed_op, [
126+
ParameterLookup.normalized('mybucket', '1', ['user1'])
127+
]);
122128
expect(parameters2).toEqual([
123129
{
124130
group_id: 'group1'
@@ -183,8 +189,8 @@ bucket_definitions:
183189
// There removal operation for the association of `list2`::`todo2` should not interfere with the new
184190
// association of `list1`::`todo2`
185191
const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [
186-
['mybucket', '1', 'list1'],
187-
['mybucket', '1', 'list2']
192+
ParameterLookup.normalized('mybucket', '1', ['list1']),
193+
ParameterLookup.normalized('mybucket', '1', ['list2'])
188194
]);
189195

190196
expect(parameters.sort((a, b) => (a.todo_id as string).localeCompare(b.todo_id as string))).toEqual([
@@ -230,11 +236,17 @@ bucket_definitions:
230236

231237
const checkpoint = result!.flushed_op;
232238

233-
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314n, 314, 3.14]]);
239+
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [
240+
ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14])
241+
]);
234242
expect(parameters1).toEqual([TEST_PARAMS]);
235-
const parameters2 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314, 314n, 3.14]]);
243+
const parameters2 = await bucketStorage.getParameterSets(checkpoint, [
244+
ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14])
245+
]);
236246
expect(parameters2).toEqual([TEST_PARAMS]);
237-
const parameters3 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 314n, 314, 3]]);
247+
const parameters3 = await bucketStorage.getParameterSets(checkpoint, [
248+
ParameterLookup.normalized('mybucket', '1', [314n, 314, 3])
249+
]);
238250
expect(parameters3).toEqual([]);
239251
});
240252

@@ -286,7 +298,9 @@ bucket_definitions:
286298

287299
const checkpoint = result!.flushed_op;
288300

289-
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 1152921504606846976n]]);
301+
const parameters1 = await bucketStorage.getParameterSets(checkpoint, [
302+
ParameterLookup.normalized('mybucket', '1', [1152921504606846976n])
303+
]);
290304
expect(parameters1).toEqual([TEST_PARAMS]);
291305
});
292306

@@ -387,7 +401,7 @@ bucket_definitions:
387401
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
388402

389403
const lookups = q1.getLookups(parameters);
390-
expect(lookups).toEqual([['by_workspace', '1', 'u1']]);
404+
expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]);
391405

392406
const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups);
393407
expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]);
@@ -457,7 +471,7 @@ bucket_definitions:
457471
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
458472

459473
const lookups = q1.getLookups(parameters);
460-
expect(lookups).toEqual([['by_public_workspace', '1']]);
474+
expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]);
461475

462476
const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups);
463477
parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
@@ -546,15 +560,15 @@ bucket_definitions:
546560
// Test intermediate values - could be moved to sync_rules.test.ts
547561
const q1 = sync_rules.bucket_descriptors[0].parameter_queries[0];
548562
const lookups1 = q1.getLookups(parameters);
549-
expect(lookups1).toEqual([['by_workspace', '1']]);
563+
expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]);
550564

551565
const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1);
552566
parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
553567
expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]);
554568

555569
const q2 = sync_rules.bucket_descriptors[0].parameter_queries[1];
556570
const lookups2 = q2.getLookups(parameters);
557-
expect(lookups2).toEqual([['by_workspace', '2', 'u1']]);
571+
expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]);
558572

559573
const parameter_sets2 = await bucketStorage.getParameterSets(checkpoint, lookups2);
560574
parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b)));
@@ -861,7 +875,9 @@ bucket_definitions:
861875

862876
const { checkpoint } = await bucketStorage.getCheckpoint();
863877

864-
const parameters = await bucketStorage.getParameterSets(checkpoint, [['mybucket', '1', 'user1']]);
878+
const parameters = await bucketStorage.getParameterSets(checkpoint, [
879+
ParameterLookup.normalized('mybucket', '1', ['user1'])
880+
]);
865881
expect(parameters).toEqual([]);
866882
});
867883

packages/service-core/src/storage/SyncRulesBucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { ObserverClient } from '@powersync/lib-services-framework';
2-
import { SqlSyncRules, SqliteJsonRow, SqliteJsonValue } from '@powersync/service-sync-rules';
2+
import { ParameterLookup, SqlSyncRules, SqliteJsonRow } from '@powersync/service-sync-rules';
33
import * as util from '../util/util-index.js';
44
import { BucketStorageBatch, FlushedResult } from './BucketStorageBatch.js';
55
import { BucketStorageFactory } from './BucketStorageFactory.js';
@@ -71,7 +71,7 @@ export interface SyncRulesBucketStorage
7171
/**
7272
* Used to resolve "dynamic" parameter queries.
7373
*/
74-
getParameterSets(checkpoint: util.InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]>;
74+
getParameterSets(checkpoint: util.InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]>;
7575

7676
getCheckpointChanges(options: GetCheckpointChangesOptions): Promise<CheckpointChanges>;
7777

packages/service-core/src/storage/bson.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as bson from 'bson';
22

3-
import { SqliteJsonValue } from '@powersync/service-sync-rules';
3+
import { ParameterLookup, SqliteJsonValue } from '@powersync/service-sync-rules';
44
import { ReplicaId } from './BucketStorageBatch.js';
55

66
type NodeBuffer = Buffer<ArrayBuffer>;
@@ -24,11 +24,11 @@ export const BSON_DESERIALIZE_DATA_OPTIONS: bson.DeserializeOptions = {
2424
* Lookup serialization must be number-agnostic. I.e. normalize numbers, instead of preserving numbers.
2525
* @param lookup
2626
*/
27-
export const serializeLookupBuffer = (lookup: SqliteJsonValue[]): NodeBuffer => {
28-
return bson.serialize({ l: lookup }) as NodeBuffer;
27+
export const serializeLookupBuffer = (lookup: ParameterLookup): NodeBuffer => {
28+
return bson.serialize({ l: lookup.values }) as NodeBuffer;
2929
};
3030

31-
export const serializeLookup = (lookup: SqliteJsonValue[]) => {
31+
export const serializeLookup = (lookup: ParameterLookup) => {
3232
return new bson.Binary(serializeLookupBuffer(lookup));
3333
};
3434

packages/service-core/src/sync/BucketChecksumState.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -288,7 +288,7 @@ export class BucketParameterState {
288288

289289
this.querier = syncRules.getBucketParameterQuerier(this.syncParams);
290290
this.staticBuckets = new Map<string, BucketDescription>(this.querier.staticBuckets.map((b) => [b.bucket, b]));
291-
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l)));
291+
this.lookups = new Set<string>(this.querier.parameterQueryLookups.map((l) => JSONBig.stringify(l.values)));
292292
}
293293

294294
async getCheckpointUpdate(checkpoint: storage.StorageCheckpointUpdate): Promise<CheckpointUpdate | null> {

packages/service-core/test/src/sync/BucketChecksumState.test.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
WatchFilterEvent
1010
} from '@/index.js';
1111
import { JSONBig } from '@powersync/service-jsonbig';
12-
import { RequestParameters, SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service-sync-rules';
12+
import { RequestParameters, SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules';
1313
import { describe, expect, test } from 'vitest';
1414

1515
describe('BucketChecksumState', () => {
@@ -475,10 +475,10 @@ bucket_definitions:
475475

476476
storage.getParameterSets = async (
477477
checkpoint: InternalOpId,
478-
lookups: SqliteJsonValue[][]
478+
lookups: ParameterLookup[]
479479
): Promise<SqliteJsonRow[]> => {
480480
expect(checkpoint).toEqual(1n);
481-
expect(lookups).toEqual([['by_project', '1', 'u1']]);
481+
expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]);
482482
return [{ id: 1 }, { id: 2 }];
483483
};
484484

@@ -520,10 +520,10 @@ bucket_definitions:
520520

521521
storage.getParameterSets = async (
522522
checkpoint: InternalOpId,
523-
lookups: SqliteJsonValue[][]
523+
lookups: ParameterLookup[]
524524
): Promise<SqliteJsonRow[]> => {
525525
expect(checkpoint).toEqual(2n);
526-
expect(lookups).toEqual([['by_project', '1', 'u1']]);
526+
expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]);
527527
return [{ id: 1 }, { id: 2 }, { id: 3 }];
528528
};
529529

@@ -581,7 +581,7 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage {
581581
);
582582
}
583583

584-
async getParameterSets(checkpoint: InternalOpId, lookups: SqliteJsonValue[][]): Promise<SqliteJsonRow[]> {
584+
async getParameterSets(checkpoint: InternalOpId, lookups: ParameterLookup[]): Promise<SqliteJsonRow[]> {
585585
throw new Error('Method not implemented.');
586586
}
587587
}

packages/sync-rules/src/BucketParameterQuerier.ts

Lines changed: 25 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { BucketDescription } from './BucketDescription.js';
22
import { RequestParameters, SqliteJsonRow, SqliteJsonValue } from './types.js';
3+
import { normalizeParameterValue } from './utils.js';
34

45
/**
56
* Represents a set of parameter queries for a specific request.
@@ -23,7 +24,7 @@ export interface BucketParameterQuerier {
2324
*/
2425
readonly hasDynamicBuckets: boolean;
2526

26-
readonly parameterQueryLookups: SqliteJsonValue[][];
27+
readonly parameterQueryLookups: ParameterLookup[];
2728

2829
/**
2930
* These buckets depend on parameter storage, and needs to be retrieved dynamically for each checkpoint.
@@ -39,7 +40,7 @@ export interface BucketParameterQuerier {
3940
}
4041

4142
export interface ParameterLookupSource {
42-
getParameterSets: (lookups: SqliteJsonValue[][]) => Promise<SqliteJsonRow[]>;
43+
getParameterSets: (lookups: ParameterLookup[]) => Promise<SqliteJsonRow[]>;
4344
}
4445

4546
export interface QueryBucketDescriptorOptions extends ParameterLookupSource {
@@ -63,3 +64,25 @@ export function mergeBucketParameterQueriers(queriers: BucketParameterQuerier[])
6364
}
6465
};
6566
}
67+
68+
/**
69+
* Represents an equality filter from a parameter query.
70+
*
71+
* Other query types are not supported yet.
72+
*/
73+
export class ParameterLookup {
74+
// bucket definition name, parameter query index, ...lookup values
75+
readonly values: SqliteJsonValue[];
76+
77+
static normalized(bucketDefinitionName: string, queryIndex: string, values: SqliteJsonValue[]): ParameterLookup {
78+
return new ParameterLookup([bucketDefinitionName, queryIndex, ...values.map(normalizeParameterValue)]);
79+
}
80+
81+
/**
82+
*
83+
* @param values must be pre-normalized (any integer converted into bigint)
84+
*/
85+
constructor(values: SqliteJsonValue[]) {
86+
this.values = values;
87+
}
88+
}

0 commit comments

Comments
 (0)