Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 33 additions & 92 deletions packages/service-core/test/src/sync/BucketChecksumState.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -607,11 +607,20 @@ bucket_definitions:
describe('streams', () => {
let source: { -readonly [P in keyof BucketSource]: BucketSource[P] };
let storage: MockBucketChecksumStateStorage;
let staticBucketIds = ['stream|0[]'];

function checksumState(options?: Partial<BucketChecksumStateOptions>) {
const rules = new SqlSyncRules('');
rules.bucketSources.push(source);
function checksumState(source: string | boolean, options?: Partial<BucketChecksumStateOptions>) {
if (typeof source == 'boolean') {
source = `
streams:
stream:
auto_subscribe: ${source}
query: SELECT * FROM assets WHERE id IN ifnull(subscription.parameter('ids'), '["default"]');
`;
}

const rules = SqlSyncRules.fromYaml(source, {
defaultSchema: 'public'
});

return new BucketChecksumState({
syncContext,
Expand All @@ -623,77 +632,15 @@ bucket_definitions:
});
}

function createQuerier(ids: string[], subscription: number | null): BucketParameterQuerier {
return {
staticBuckets: ids.map((bucket) => ({
definition: 'stream',
inclusion_reasons: subscription == null ? ['default'] : [{ subscription }],
bucket,
priority: 3
})),
hasDynamicBuckets: false,
parameterQueryLookups: [],
queryDynamicBucketDescriptions: function (): never {
throw new Error('no dynamic buckets.');
}
};
}

beforeEach(() => {
// Currently using mocked streams before streams are actually implemented as parsable rules.
source = {
name: 'stream',
type: BucketSourceType.SYNC_STREAM,
subscribedToByDefault: false,
pushBucketParameterQueriers(result, options) {
// Create a fake querier that resolves the global stream["default"] bucket by default and allows extracting
// additional buckets from parameters.
const subscriptions = options.streams['stream'] ?? [];
if (!this.subscribedToByDefault && !subscriptions.length) {
return;
}

let hasExplicitDefaultSubscription = false;
for (const subscription of subscriptions) {
try {
let subscriptionParameters = [];

if (subscription.parameters != null) {
subscriptionParameters = JSON.parse(subscription.parameters['ids'] as string).map(
(e: string) => `stream["${e}"]`
);
} else {
hasExplicitDefaultSubscription = true;
}

result.queriers.push(createQuerier([...subscriptionParameters], subscription.opaque_id));
} catch (e) {
result.errors.push({
descriptor: 'stream',
subscription,
message: `Error evaluating bucket ids: ${e.message}`
});
}
}

// If the stream is subscribed to by default and there is no explicit subscription that would match the default
// subscription, also include the default querier.
if (this.subscribedToByDefault && !hasExplicitDefaultSubscription) {
result.queriers.push(createQuerier(['stream["default"]'], null));
}
}
} satisfies Partial<BucketSource> as any;

storage = new MockBucketChecksumStateStorage();
storage.updateTestChecksum({ bucket: 'stream["default"]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'stream["a"]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'stream["b"]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'stream|0["default"]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'stream|0["a"]', checksum: 1, count: 1 });
storage.updateTestChecksum({ bucket: 'stream|0["b"]', checksum: 1, count: 1 });
});

test('includes defaults', async () => {
source.subscribedToByDefault = true;
const state = checksumState();

const state = checksumState(true);
const line = await state.buildNextCheckpointLine({
base: storage.makeCheckpoint(1n),
writeCheckpoint: null,
Expand All @@ -703,7 +650,7 @@ bucket_definitions:
expect(line?.checkpointLine).toEqual({
checkpoint: {
buckets: [
{ bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
{ bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
],
last_op_id: '1',
write_checkpoint: undefined,
Expand All @@ -713,8 +660,7 @@ bucket_definitions:
});

test('can exclude defaults', async () => {
source.subscribedToByDefault = true;
const state = checksumState({ syncRequest: { streams: { include_defaults: false, subscriptions: [] } } });
const state = checksumState(true, { syncRequest: { streams: { include_defaults: false, subscriptions: [] } } });

const line = await state.buildNextCheckpointLine({
base: storage.makeCheckpoint(1n),
Expand All @@ -733,9 +679,7 @@ bucket_definitions:
});

test('custom subscriptions', async () => {
source.subscribedToByDefault = true;

const state = checksumState({
const state = checksumState(true, {
syncRequest: {
streams: {
subscriptions: [
Expand All @@ -755,9 +699,9 @@ bucket_definitions:
expect(line?.checkpointLine).toEqual({
checkpoint: {
buckets: [
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] },
{ bucket: 'stream["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 1 }] },
{ bucket: 'stream|0["default"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ default: 0 }] }
],
last_op_id: '1',
write_checkpoint: undefined,
Expand All @@ -767,7 +711,7 @@ bucket_definitions:
});

test('overlap between custom subscriptions', async () => {
const state = checksumState({
const state = checksumState(false, {
syncRequest: {
streams: {
subscriptions: [
Expand All @@ -787,8 +731,8 @@ bucket_definitions:
expect(line?.checkpointLine).toEqual({
checkpoint: {
buckets: [
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] }
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 3, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }, { sub: 1 }] }
],
last_op_id: '1',
write_checkpoint: undefined,
Expand All @@ -798,8 +742,7 @@ bucket_definitions:
});

test('overlap between default and custom subscription', async () => {
source.subscribedToByDefault = true;
const state = checksumState({
const state = checksumState(true, {
syncRequest: {
streams: {
subscriptions: [{ stream: 'stream', parameters: { ids: '["a", "default"]' }, override_priority: 1 }]
Expand All @@ -816,9 +759,9 @@ bucket_definitions:
expect(line?.checkpointLine).toEqual({
checkpoint: {
buckets: [
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{
bucket: 'stream["default"]',
bucket: 'stream|0["default"]',
checksum: 1,
count: 1,
priority: 1,
Expand All @@ -833,9 +776,7 @@ bucket_definitions:
});

test('reports errors', async () => {
source.subscribedToByDefault = true;

const state = checksumState({
const state = checksumState(true, {
syncRequest: {
streams: {
subscriptions: [
Expand All @@ -855,10 +796,10 @@ bucket_definitions:
expect(line?.checkpointLine).toEqual({
checkpoint: {
buckets: [
{ bucket: 'stream["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream|0["a"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{ bucket: 'stream|0["b"]', checksum: 1, count: 1, priority: 1, subscriptions: [{ sub: 0 }] },
{
bucket: 'stream["default"]',
bucket: 'stream|0["default"]',
checksum: 1,
count: 1,
priority: 3,
Expand Down