Skip to content

Commit c5ca6e7

Browse files
committed
Validate changeStreamPreAndPostImages on existing collections.
1 parent 267ec11 commit c5ca6e7

File tree

2 files changed

+231
-66
lines changed

2 files changed

+231
-66
lines changed

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 51 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
3434
await this.client.close();
3535
}
3636

37+
async [Symbol.asyncDispose]() {
38+
await this.shutdown();
39+
}
40+
3741
async getSourceConfig(): Promise<service_types.configFile.ResolvedDataSourceConfig> {
3842
return this.config;
3943
}
@@ -77,6 +81,28 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
7781
sqlSyncRules: sync_rules.SqlSyncRules
7882
): Promise<api.PatternResult[]> {
7983
let result: api.PatternResult[] = [];
84+
85+
const validatePostImages = (schema: string, collection: mongo.CollectionInfo): service_types.ReplicationError[] => {
86+
if (this.config.postImages == types.PostImagesOption.OFF) {
87+
return [];
88+
} else if (!collection.options?.changeStreamPreAndPostImages?.enabled) {
89+
if (this.config.postImages == types.PostImagesOption.READ_ONLY) {
90+
return [
91+
{ level: 'fatal', message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}` }
92+
];
93+
} else {
94+
return [
95+
{
96+
level: 'warning',
97+
message: `changeStreamPreAndPostImages not enabled on ${schema}.${collection.name}, will be enabled automatically`
98+
}
99+
];
100+
}
101+
} else {
102+
return [];
103+
}
104+
};
105+
80106
for (let tablePattern of tablePatterns) {
81107
const schema = tablePattern.schema;
82108

@@ -101,7 +127,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
101127
{
102128
name: nameFilter
103129
},
104-
{ nameOnly: true }
130+
{ nameOnly: false }
105131
)
106132
.toArray();
107133

@@ -117,6 +143,12 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
117143
[],
118144
true
119145
);
146+
let errors: service_types.ReplicationError[] = [];
147+
if (collection.type == 'view') {
148+
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` });
149+
} else {
150+
errors.push(...validatePostImages(schema, collection));
151+
}
120152
const syncData = sqlSyncRules.tableSyncsData(sourceTable);
121153
const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable);
122154
patternResult.tables.push({
@@ -125,7 +157,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
125157
replication_id: ['_id'],
126158
data_queries: syncData,
127159
parameter_queries: syncParameters,
128-
errors: []
160+
errors: errors
129161
});
130162
}
131163
} else {
@@ -141,26 +173,25 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
141173

142174
const syncData = sqlSyncRules.tableSyncsData(sourceTable);
143175
const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable);
176+
const collection = collections[0];
144177

145-
if (collections.length == 1) {
146-
patternResult.table = {
147-
schema,
148-
name: tablePattern.name,
149-
replication_id: ['_id'],
150-
data_queries: syncData,
151-
parameter_queries: syncParameters,
152-
errors: []
153-
};
154-
} else {
155-
patternResult.table = {
156-
schema,
157-
name: tablePattern.name,
158-
replication_id: ['_id'],
159-
data_queries: syncData,
160-
parameter_queries: syncParameters,
161-
errors: [{ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }]
162-
};
178+
let errors: service_types.ReplicationError[] = [];
179+
if (collections.length != 1) {
180+
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` });
181+
} else if (collection.type == 'view') {
182+
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` });
183+
} else if (!collection.options?.changeStreamPreAndPostImages?.enabled) {
184+
errors.push(...validatePostImages(schema, collection));
163185
}
186+
187+
patternResult.table = {
188+
schema,
189+
name: tablePattern.name,
190+
replication_id: ['_id'],
191+
data_queries: syncData,
192+
parameter_queries: syncParameters,
193+
errors
194+
};
164195
}
165196
}
166197
return result;

modules/module-mongodb/test/src/mongo_test.test.ts

Lines changed: 180 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js';
22
import { ChangeStream } from '@module/replication/ChangeStream.js';
33
import { constructAfterRecord } from '@module/replication/MongoRelation.js';
4-
import { SqliteRow } from '@powersync/service-sync-rules';
4+
import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules';
55
import * as mongo from 'mongodb';
66
import { describe, expect, test } from 'vitest';
77
import { clearTestDb, connectMongoData, TEST_CONNECTION_OPTIONS } from './util.js';
8+
import { PostImagesOption } from '@module/types/types.js';
89

910
describe('mongo data types', () => {
1011
async function setupTable(db: mongo.Db) {
@@ -245,58 +246,191 @@ describe('mongo data types', () => {
245246
});
246247

247248
test('connection schema', async () => {
248-
const adapter = new MongoRouteAPIAdapter({
249+
await using adapter = new MongoRouteAPIAdapter({
249250
type: 'mongodb',
250251
...TEST_CONNECTION_OPTIONS
251252
});
252-
try {
253-
const db = adapter.db;
254-
await clearTestDb(db);
253+
const db = adapter.db;
254+
await clearTestDb(db);
255255

256-
const collection = db.collection('test_data');
257-
await setupTable(db);
258-
await insert(collection);
256+
const collection = db.collection('test_data');
257+
await setupTable(db);
258+
await insert(collection);
259+
260+
const schema = await adapter.getConnectionSchema();
261+
const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0];
262+
expect(dbSchema).not.toBeNull();
263+
expect(dbSchema.tables).toMatchObject([
264+
{
265+
name: 'test_data',
266+
columns: [
267+
{ name: '_id', sqlite_type: 4, internal_type: 'Integer' },
268+
{ name: 'bool', sqlite_type: 4, internal_type: 'Boolean' },
269+
{ name: 'bytea', sqlite_type: 1, internal_type: 'Binary' },
270+
{ name: 'date', sqlite_type: 2, internal_type: 'Date' },
271+
{ name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' },
272+
{ name: 'float', sqlite_type: 8, internal_type: 'Double' },
273+
{ name: 'int2', sqlite_type: 4, internal_type: 'Integer' },
274+
{ name: 'int4', sqlite_type: 4, internal_type: 'Integer' },
275+
{ name: 'int8', sqlite_type: 4, internal_type: 'Long' },
276+
// We can fix these later
277+
{ name: 'js', sqlite_type: 2, internal_type: 'Object' },
278+
{ name: 'js2', sqlite_type: 2, internal_type: 'Object' },
279+
{ name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' },
280+
{ name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' },
281+
{ name: 'nested', sqlite_type: 2, internal_type: 'Object' },
282+
{ name: 'null', sqlite_type: 0, internal_type: 'Null' },
283+
{ name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' },
284+
// We can fix these later
285+
{ name: 'pointer', sqlite_type: 2, internal_type: 'Object' },
286+
{ name: 'pointer2', sqlite_type: 2, internal_type: 'Object' },
287+
{ name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' },
288+
// Can fix this later
289+
{ name: 'symbol', sqlite_type: 2, internal_type: 'String' },
290+
{ name: 'text', sqlite_type: 2, internal_type: 'String' },
291+
{ name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' },
292+
{ name: 'undefined', sqlite_type: 0, internal_type: 'Null' },
293+
{ name: 'uuid', sqlite_type: 2, internal_type: 'UUID' }
294+
]
295+
}
296+
]);
297+
});
298+
299+
test('validate postImages', async () => {
300+
await using adapter = new MongoRouteAPIAdapter({
301+
type: 'mongodb',
302+
...TEST_CONNECTION_OPTIONS,
303+
postImages: PostImagesOption.READ_ONLY
304+
});
305+
const db = adapter.db;
306+
await clearTestDb(db);
307+
308+
const collection = db.collection('test_data');
309+
await setupTable(db);
310+
await insert(collection);
311+
312+
const rules = SqlSyncRules.fromYaml(
313+
`
314+
bucket_definitions:
315+
global:
316+
data:
317+
- select _id as id, * from test_data
259318
260-
const schema = await adapter.getConnectionSchema();
261-
const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0];
262-
expect(dbSchema).not.toBeNull();
263-
expect(dbSchema.tables).toMatchObject([
319+
`,
320+
{
321+
...adapter.getParseSyncRulesOptions(),
322+
// No schema-based validation at this point
323+
schema: undefined
324+
}
325+
);
326+
const source_table_patterns = rules.getSourceTables();
327+
const results = await adapter.getDebugTablesInfo(source_table_patterns, rules);
328+
329+
const result = results[0];
330+
expect(result).not.toBeNull();
331+
expect(result.table).toMatchObject({
332+
schema: 'powersync_test_data',
333+
name: 'test_data',
334+
replication_id: ['_id'],
335+
data_queries: true,
336+
parameter_queries: false,
337+
errors: [
264338
{
265-
name: 'test_data',
266-
columns: [
267-
{ name: '_id', sqlite_type: 4, internal_type: 'Integer' },
268-
{ name: 'bool', sqlite_type: 4, internal_type: 'Boolean' },
269-
{ name: 'bytea', sqlite_type: 1, internal_type: 'Binary' },
270-
{ name: 'date', sqlite_type: 2, internal_type: 'Date' },
271-
{ name: 'decimal', sqlite_type: 2, internal_type: 'Decimal' },
272-
{ name: 'float', sqlite_type: 8, internal_type: 'Double' },
273-
{ name: 'int2', sqlite_type: 4, internal_type: 'Integer' },
274-
{ name: 'int4', sqlite_type: 4, internal_type: 'Integer' },
275-
{ name: 'int8', sqlite_type: 4, internal_type: 'Long' },
276-
// We can fix these later
277-
{ name: 'js', sqlite_type: 2, internal_type: 'Object' },
278-
{ name: 'js2', sqlite_type: 2, internal_type: 'Object' },
279-
{ name: 'maxKey', sqlite_type: 0, internal_type: 'MaxKey' },
280-
{ name: 'minKey', sqlite_type: 0, internal_type: 'MinKey' },
281-
{ name: 'nested', sqlite_type: 2, internal_type: 'Object' },
282-
{ name: 'null', sqlite_type: 0, internal_type: 'Null' },
283-
{ name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' },
284-
// We can fix these later
285-
{ name: 'pointer', sqlite_type: 2, internal_type: 'Object' },
286-
{ name: 'pointer2', sqlite_type: 2, internal_type: 'Object' },
287-
{ name: 'regexp', sqlite_type: 2, internal_type: 'RegExp' },
288-
// Can fix this later
289-
{ name: 'symbol', sqlite_type: 2, internal_type: 'String' },
290-
{ name: 'text', sqlite_type: 2, internal_type: 'String' },
291-
{ name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' },
292-
{ name: 'undefined', sqlite_type: 0, internal_type: 'Null' },
293-
{ name: 'uuid', sqlite_type: 2, internal_type: 'UUID' }
294-
]
339+
level: 'fatal',
340+
message: 'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data'
295341
}
296-
]);
297-
} finally {
298-
await adapter.shutdown();
299-
}
342+
]
343+
});
344+
});
345+
346+
test('validate postImages - auto-configure', async () => {
347+
await using adapter = new MongoRouteAPIAdapter({
348+
type: 'mongodb',
349+
...TEST_CONNECTION_OPTIONS,
350+
postImages: PostImagesOption.AUTO_CONFIGURE
351+
});
352+
const db = adapter.db;
353+
await clearTestDb(db);
354+
355+
const collection = db.collection('test_data');
356+
await setupTable(db);
357+
await insert(collection);
358+
359+
const rules = SqlSyncRules.fromYaml(
360+
`
361+
bucket_definitions:
362+
global:
363+
data:
364+
- select _id as id, * from test_data
365+
366+
`,
367+
{
368+
...adapter.getParseSyncRulesOptions(),
369+
// No schema-based validation at this point
370+
schema: undefined
371+
}
372+
);
373+
const source_table_patterns = rules.getSourceTables();
374+
const results = await adapter.getDebugTablesInfo(source_table_patterns, rules);
375+
376+
const result = results[0];
377+
expect(result).not.toBeNull();
378+
expect(result.table).toMatchObject({
379+
schema: 'powersync_test_data',
380+
name: 'test_data',
381+
replication_id: ['_id'],
382+
data_queries: true,
383+
parameter_queries: false,
384+
errors: [
385+
{
386+
level: 'warning',
387+
message:
388+
'changeStreamPreAndPostImages not enabled on powersync_test_data.test_data, will be enabled automatically'
389+
}
390+
]
391+
});
392+
});
393+
394+
test('validate postImages - off', async () => {
395+
await using adapter = new MongoRouteAPIAdapter({
396+
type: 'mongodb',
397+
...TEST_CONNECTION_OPTIONS,
398+
postImages: PostImagesOption.OFF
399+
});
400+
const db = adapter.db;
401+
await clearTestDb(db);
402+
403+
const collection = db.collection('test_data');
404+
await setupTable(db);
405+
await insert(collection);
406+
407+
const rules = SqlSyncRules.fromYaml(
408+
`
409+
bucket_definitions:
410+
global:
411+
data:
412+
- select _id as id, * from test_data
413+
414+
`,
415+
{
416+
...adapter.getParseSyncRulesOptions(),
417+
// No schema-based validation at this point
418+
schema: undefined
419+
}
420+
);
421+
const source_table_patterns = rules.getSourceTables();
422+
const results = await adapter.getDebugTablesInfo(source_table_patterns, rules);
423+
424+
const result = results[0];
425+
expect(result).not.toBeNull();
426+
expect(result.table).toMatchObject({
427+
schema: 'powersync_test_data',
428+
name: 'test_data',
429+
replication_id: ['_id'],
430+
data_queries: true,
431+
parameter_queries: false,
432+
errors: []
433+
});
300434
});
301435
});
302436

0 commit comments

Comments
 (0)