Skip to content

Commit 0b9d0e7

Browse files
committed
Test and fix wildcard collections.
1 parent a6cd6f9 commit 0b9d0e7

File tree

2 files changed

+47
-11
lines changed

2 files changed

+47
-11
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,10 @@ export class ChangeStream {
244244
}
245245

246246
if (tablePattern.isWildcard) {
247-
$refilters.push({ db: tablePattern.schema, coll: new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) });
247+
$refilters.push({
248+
'ns.db': tablePattern.schema,
249+
'ns.coll': new RegExp('^' + escapeRegExp(tablePattern.tablePrefix))
250+
});
248251
} else {
249252
$inFilters.push({
250253
db: tablePattern.schema,
@@ -284,6 +287,8 @@ export class ChangeStream {
284287
throw new Error(`Aborted initial replication`);
285288
}
286289

290+
at += 1;
291+
287292
const record = constructAfterRecord(document);
288293

289294
// This auto-flushes when the batch reaches its size limit
@@ -303,6 +308,7 @@ export class ChangeStream {
303308
}
304309

305310
await batch.flush();
311+
logger.info(`Replicated ${at} documents for ${table.qualifiedName}`);
306312
}
307313

308314
private async getRelation(
@@ -503,7 +509,6 @@ export class ChangeStream {
503509
// Configuration happens during snapshot
504510
fullDocument = 'required';
505511
}
506-
console.log({ fullDocument });
507512

508513
const streamOptions: mongo.ChangeStreamOptions = {
509514
startAtOperationTime: startAfter,

modules/module-mongodb/test/src/change_stream.test.ts

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,10 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js';
22
import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
33
import { BucketStorageFactory } from '@powersync/service-core';
44
import * as crypto from 'crypto';
5-
import { afterEach, beforeEach, describe, expect, test } from 'vitest';
6-
import { ChangeStreamTestContext } from './change_stream_utils.js';
75
import * as mongo from 'mongodb';
86
import { setTimeout } from 'node:timers/promises';
7+
import { describe, expect, test } from 'vitest';
8+
import { ChangeStreamTestContext } from './change_stream_utils.js';
99

1010
type StorageFactory = () => Promise<BucketStorageFactory>;
1111

@@ -30,8 +30,8 @@ bucket_definitions:
3030
data:
3131
- SELECT _id as id, description, num FROM "test_data"`);
3232

33-
db.createCollection('test_data', {
34-
changeStreamPreAndPostImages: { enabled: true }
33+
await db.createCollection('test_data', {
34+
changeStreamPreAndPostImages: { enabled: false }
3535
});
3636
const collection = db.collection('test_data');
3737

@@ -58,6 +58,38 @@ bucket_definitions:
5858
]);
5959
});
6060

61+
test('replicating wildcard', async () => {
62+
await using context = await ChangeStreamTestContext.open(factory);
63+
const { db } = context;
64+
await context.updateSyncRules(`
65+
bucket_definitions:
66+
global:
67+
data:
68+
- SELECT _id as id, description, num FROM "test_%"`);
69+
70+
await db.createCollection('test_data', {
71+
changeStreamPreAndPostImages: { enabled: false }
72+
});
73+
const collection = db.collection('test_data');
74+
75+
const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n });
76+
const test_id = result.insertedId;
77+
78+
await context.replicateSnapshot();
79+
80+
context.startStreaming();
81+
82+
await setTimeout(30);
83+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } });
84+
85+
const data = await context.getBucketData('global[]');
86+
87+
expect(data).toMatchObject([
88+
putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }),
89+
putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n })
90+
]);
91+
});
92+
6193
test('updateLookup - no fullDocument available', async () => {
6294
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'updateLookup' });
6395
const { db, client } = context;
@@ -67,13 +99,12 @@ bucket_definitions:
6799
data:
68100
- SELECT _id as id, description, num FROM "test_data"`);
69101

70-
db.createCollection('test_data', {
102+
await db.createCollection('test_data', {
71103
changeStreamPreAndPostImages: { enabled: false }
72104
});
73105
const collection = db.collection('test_data');
74106

75107
await context.replicateSnapshot();
76-
77108
context.startStreaming();
78109

79110
const session = client.startSession();
@@ -112,7 +143,7 @@ bucket_definitions:
112143
data:
113144
- SELECT _id as id, description, num FROM "test_data"`);
114145

115-
db.createCollection('test_data', {
146+
await db.createCollection('test_data', {
116147
// enabled: false here, but autoConfigure will enable it.
117148
changeStreamPreAndPostImages: { enabled: false }
118149
});
@@ -158,7 +189,7 @@ bucket_definitions:
158189
data:
159190
- SELECT _id as id, description, num FROM "test_data"`);
160191

161-
db.createCollection('test_data', {
192+
await db.createCollection('test_data', {
162193
changeStreamPreAndPostImages: { enabled: true }
163194
});
164195
const collection = db.collection('test_data');
@@ -355,7 +386,7 @@ bucket_definitions:
355386
});
356387
});
357388

358-
test('table not in sync rules', async () => {
389+
test('collection not in sync rules', async () => {
359390
await using context = await ChangeStreamTestContext.open(factory);
360391
const { db } = context;
361392
await context.updateSyncRules(BASIC_SYNC_RULES);

0 commit comments

Comments
 (0)