diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index df43cc9bd..717c017d2 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -607,11 +607,20 @@ bucket_definitions: describe('streams', () => { let source: { -readonly [P in keyof BucketSource]: BucketSource[P] }; let storage: MockBucketChecksumStateStorage; - let staticBucketIds = ['stream|0[]']; - function checksumState(options?: Partial) { - const rules = new SqlSyncRules(''); - rules.bucketSources.push(source); + function checksumState(source: string | boolean, options?: Partial) { + if (typeof source == 'boolean') { + source = ` +streams: + stream: + auto_subscribe: ${source} + query: SELECT * FROM assets WHERE id IN ifnull(subscription.parameter('ids'), '["default"]'); +`; + } + + const rules = SqlSyncRules.fromYaml(source, { + defaultSchema: 'public' + }); return new BucketChecksumState({ syncContext, @@ -623,77 +632,15 @@ bucket_definitions: }); } - function createQuerier(ids: string[], subscription: number | null): BucketParameterQuerier { - return { - staticBuckets: ids.map((bucket) => ({ - definition: 'stream', - inclusion_reasons: subscription == null ? ['default'] : [{ subscription }], - bucket, - priority: 3 - })), - hasDynamicBuckets: false, - parameterQueryLookups: [], - queryDynamicBucketDescriptions: function (): never { - throw new Error('no dynamic buckets.'); - } - }; - } - beforeEach(() => { - // Currently using mocked streams before streams are actually implemented as parsable rules. - source = { - name: 'stream', - type: BucketSourceType.SYNC_STREAM, - subscribedToByDefault: false, - pushBucketParameterQueriers(result, options) { - // Create a fake querier that resolves the global stream["default"] bucket by default and allows extracting - // additional buckets from parameters. - const subscriptions = options.streams['stream'] ?? []; - if (!this.subscribedToByDefault && !subscriptions.length) { - return; - } - - let hasExplicitDefaultSubscription = false; - for (const subscription of subscriptions) { - try { - let subscriptionParameters = []; - - if (subscription.parameters != null) { - subscriptionParameters = JSON.parse(subscription.parameters['ids'] as string).map( - (e: string) => `stream["${e}"]` - ); - } else { - hasExplicitDefaultSubscription = true; - } - - result.queriers.push(createQuerier([...subscriptionParameters], subscription.opaque_id)); - } catch (e) { - result.errors.push({ - descriptor: 'stream', - subscription, - message: `Error evaluating bucket ids: ${e.message}` - }); - } - } - - // If the stream is subscribed to by default and there is no explicit subscription that would match the default - // subscription, also include the default querier. - if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) { - result.queriers.push(createQuerier(['stream["default"]'], null)); - } - } - } satisfies Partial as any; - storage = new MockBucketChecksumStateStorage(); - storage.updateTestChecksum({ bucket: 'stream["default"]', checksum: 1, count: 1 }); - storage.updateTestChecksum({ bucket: 'stream["a"]', checksum: 1, count: 1 }); - storage.updateTestChecksum({ bucket: 'stream["b"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'stream|0["default"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'stream|0["a"]', checksum: 1, count: 1 }); + storage.updateTestChecksum({ bucket: 'stream|0["b"]', checksum: 1, count: 1 }); }); test('includes defaults', async () => { - source.subscribedToByDefault = true; - const state = checksumState(); - + const state = checksumState(true); const line = await state.buildNextCheckpointLine({ base: storage.makeCheckpoint(1n), writeCheckpoint: null, @@ -703,7 +650,7 @@ bucket_definitions: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + { bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -713,8 +660,7 @@ bucket_definitions: }); test('can exclude defaults', async () => { - source.subscribedToByDefault = true; - const state = checksumState({ syncRequest: { streams: { include_defaults: false, subscriptions: [] } } }); + const state = checksumState(true, { syncRequest: { streams: { include_defaults: false, subscriptions: [] } } }); const line = await state.buildNextCheckpointLine({ base: storage.makeCheckpoint(1n), @@ -733,9 +679,7 @@ bucket_definitions: }); test('custom subscriptions', async () => { - source.subscribedToByDefault = true; - - const state = checksumState({ + const state = checksumState(true, { syncRequest: { streams: { subscriptions: [ @@ -755,9 +699,9 @@ bucket_definitions: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] }, - { bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } + { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] }, + { bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -767,7 +711,7 @@ bucket_definitions: }); test('overlap between custom subscriptions', async () => { - const state = checksumState({ + const state = checksumState(false, { syncRequest: { streams: { subscriptions: [ @@ -787,8 +731,8 @@ bucket_definitions: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] } + { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] } ], last_op_id: '1', write_checkpoint: undefined, @@ -798,8 +742,7 @@ bucket_definitions: }); test('overlap between default and custom subscription', async () => { - source.subscribedToByDefault = true; - const state = checksumState({ + const state = checksumState(true, { syncRequest: { streams: { subscriptions: [{ stream: 'stream', parameters: { ids: '["a", "default"]' }, override_priority: 1 }] @@ -816,9 +759,9 @@ bucket_definitions: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, { - bucket: 'stream["default"]', + bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 1, @@ -833,9 +776,7 @@ bucket_definitions: }); test('reports errors', async () => { - source.subscribedToByDefault = true; - - const state = checksumState({ + const state = checksumState(true, { syncRequest: { streams: { subscriptions: [ @@ -855,10 +796,10 @@ bucket_definitions: expect(line?.checkpointLine).toEqual({ checkpoint: { buckets: [ - { bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, - { bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, + { bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] }, { - bucket: 'stream["default"]', + bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3,