Skip to content

Commit a6cd6f9

Browse files
committed
Add postImage tests.
1 parent 9c3e7d6 commit a6cd6f9

File tree

2 files changed

+97
-5
lines changed

2 files changed

+97
-5
lines changed

modules/module-mongodb/test/src/change_stream.test.ts

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js';
22
import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
33
import { BucketStorageFactory } from '@powersync/service-core';
44
import * as crypto from 'crypto';
5-
import { describe, expect, test } from 'vitest';
5+
import { afterEach, beforeEach, describe, expect, test } from 'vitest';
66
import { ChangeStreamTestContext } from './change_stream_utils.js';
77
import * as mongo from 'mongodb';
88
import { setTimeout } from 'node:timers/promises';
@@ -58,8 +58,8 @@ bucket_definitions:
5858
]);
5959
});
6060

61-
test('no fullDocument available', async () => {
62-
await using context = await ChangeStreamTestContext.open(factory);
61+
test('updateLookup - no fullDocument available', async () => {
62+
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'updateLookup' });
6363
const { db, client } = context;
6464
await context.updateSyncRules(`
6565
bucket_definitions:
@@ -101,6 +101,97 @@ bucket_definitions:
101101
]);
102102
});
103103

104+
test('postImages - autoConfigure', async () => {
105+
// Similar to the above test, but with postImages enabled.
106+
// This resolves the consistency issue.
107+
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'autoConfigure' });
108+
const { db, client } = context;
109+
await context.updateSyncRules(`
110+
bucket_definitions:
111+
global:
112+
data:
113+
- SELECT _id as id, description, num FROM "test_data"`);
114+
115+
db.createCollection('test_data', {
116+
// enabled: false here, but autoConfigure will enable it.
117+
changeStreamPreAndPostImages: { enabled: false }
118+
});
119+
const collection = db.collection('test_data');
120+
121+
await context.replicateSnapshot();
122+
123+
context.startStreaming();
124+
125+
const session = client.startSession();
126+
let test_id: mongo.ObjectId | undefined;
127+
try {
128+
await session.withTransaction(async () => {
129+
const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session });
130+
test_id = result.insertedId;
131+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session });
132+
await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session });
133+
await collection.deleteOne({ _id: test_id }, { session });
134+
});
135+
} finally {
136+
await session.endSession();
137+
}
138+
139+
const data = await context.getBucketData('global[]');
140+
141+
expect(data).toMatchObject([
142+
putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }),
143+
// The postImage helps us get this data
144+
putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }),
145+
putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }),
146+
removeOp('test_data', test_id!.toHexString())
147+
]);
148+
});
149+
150+
test('postImages - on', async () => {
151+
// Similar to postImages - autoConfigure, but does not auto-configure.
152+
// changeStreamPreAndPostImages must be manually configured.
153+
await using context = await ChangeStreamTestContext.open(factory, { postImages: 'on' });
154+
const { db, client } = context;
155+
await context.updateSyncRules(`
156+
bucket_definitions:
157+
global:
158+
data:
159+
- SELECT _id as id, description, num FROM "test_data"`);
160+
161+
db.createCollection('test_data', {
162+
changeStreamPreAndPostImages: { enabled: true }
163+
});
164+
const collection = db.collection('test_data');
165+
166+
await context.replicateSnapshot();
167+
168+
context.startStreaming();
169+
170+
const session = client.startSession();
171+
let test_id: mongo.ObjectId | undefined;
172+
try {
173+
await session.withTransaction(async () => {
174+
const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session });
175+
test_id = result.insertedId;
176+
await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session });
177+
await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session });
178+
await collection.deleteOne({ _id: test_id }, { session });
179+
});
180+
} finally {
181+
await session.endSession();
182+
}
183+
184+
const data = await context.getBucketData('global[]');
185+
186+
expect(data).toMatchObject([
187+
putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }),
188+
// The postImage helps us get this data
189+
putOp('test_data', { id: test_id!.toHexString(), description: 'test2', num: 1152921504606846976n }),
190+
putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }),
191+
removeOp('test_data', test_id!.toHexString())
192+
]);
193+
});
194+
104195
test('replicating case sensitive table', async () => {
105196
await using context = await ChangeStreamTestContext.open(factory);
106197
const { db } = context;

modules/module-mongodb/test/src/change_stream_utils.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { MongoManager } from '@module/replication/MongoManager.js';
66
import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
77
import * as mongo from 'mongodb';
88
import { createCheckpoint } from '@module/replication/MongoRelation.js';
9+
import { NormalizedMongoConnectionConfig } from '@module/types/types.js';
910

1011
export class ChangeStreamTestContext {
1112
private _walStream?: ChangeStream;
@@ -19,9 +20,9 @@ export class ChangeStreamTestContext {
1920
*
2021
* This configures all the context, and tears it down afterwards.
2122
*/
22-
static async open(factory: () => Promise<BucketStorageFactory>) {
23+
static async open(factory: () => Promise<BucketStorageFactory>, options?: Partial<NormalizedMongoConnectionConfig>) {
2324
const f = await factory();
24-
const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS);
25+
const connectionManager = new MongoManager({ ...TEST_CONNECTION_OPTIONS, ...options });
2526

2627
await clearTestDb(connectionManager.db);
2728
return new ChangeStreamTestContext(f, connectionManager);

0 commit comments

Comments
 (0)