diff --git a/.changeset/green-peas-roll.md b/.changeset/green-peas-roll.md new file mode 100644 index 000000000..61762c082 --- /dev/null +++ b/.changeset/green-peas-roll.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mongodb': minor +'@powersync/service-image': minor +--- + +Add MongoDB support (Alpha) diff --git a/modules/module-mongodb/CHANGELOG.md b/modules/module-mongodb/CHANGELOG.md new file mode 100644 index 000000000..05f7d8b81 --- /dev/null +++ b/modules/module-mongodb/CHANGELOG.md @@ -0,0 +1 @@ +# @powersync/service-module-mongodb diff --git a/modules/module-mongodb/LICENSE b/modules/module-mongodb/LICENSE new file mode 100644 index 000000000..c8efd46cc --- /dev/null +++ b/modules/module-mongodb/LICENSE @@ -0,0 +1,67 @@ +# Functional Source License, Version 1.1, Apache 2.0 Future License + +## Abbreviation + +FSL-1.1-Apache-2.0 + +## Notice + +Copyright 2023-2024 Journey Mobile, Inc. + +## Terms and Conditions + +### Licensor ("We") + +The party offering the Software under these Terms and Conditions. + +### The Software + +The "Software" is each version of the software that we make available under these Terms and Conditions, as indicated by our inclusion of these Terms and Conditions with the Software. + +### License Grant + +Subject to your compliance with this License Grant and the Patents, Redistribution and Trademark clauses below, we hereby grant you the right to use, copy, modify, create derivative works, publicly perform, publicly display and redistribute the Software for any Permitted Purpose identified below. + +### Permitted Purpose + +A Permitted Purpose is any purpose other than a Competing Use. A Competing Use means making the Software available to others in a commercial product or service that: + +1. substitutes for the Software; +2. substitutes for any other product or service we offer using the Software that exists as of the date we make the Software available; or +3. offers the same or substantially similar functionality as the Software. + +Permitted Purposes specifically include using the Software: + +1. for your internal use and access; +2. for non-commercial education; +3. for non-commercial research; and +4. in connection with professional services that you provide to a licensee using the Software in accordance with these Terms and Conditions. + +### Patents + +To the extent your use for a Permitted Purpose would necessarily infringe our patents, the license grant above includes a license under our patents. If you make a claim against any party that the Software infringes or contributes to the infringement of any patent, then your patent license to the Software ends immediately. + +### Redistribution + +The Terms and Conditions apply to all copies, modifications and derivatives of the Software. +If you redistribute any copies, modifications or derivatives of the Software, you must include a copy of or a link to these Terms and Conditions and not remove any copyright notices provided in or with the Software. + +### Disclaimer + +THE SOFTWARE IS PROVIDED "AS IS" AND WITHOUT WARRANTIES OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABILITY, TITLE OR NON-INFRINGEMENT. +IN NO EVENT WILL WE HAVE ANY LIABILITY TO YOU ARISING OUT OF OR RELATED TO THE SOFTWARE, INCLUDING INDIRECT, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES, EVEN IF WE HAVE BEEN INFORMED OF THEIR POSSIBILITY IN ADVANCE. + +### Trademarks + +Except for displaying the License Details and identifying us as the origin of the Software, you have no right under these Terms and Conditions to use our trademarks, trade names, service marks or product names. + +## Grant of Future License + +We hereby irrevocably grant you an additional license to use the Software under the Apache License, Version 2.0 that is effective on the second anniversary of the date we make the Software available. On or after that date, you may use the Software under the Apache License, Version 2.0, in which case the following will apply: + +Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. +You may obtain a copy of the License at + +http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. diff --git a/modules/module-mongodb/README.md b/modules/module-mongodb/README.md new file mode 100644 index 000000000..f9e9e4c64 --- /dev/null +++ b/modules/module-mongodb/README.md @@ -0,0 +1,3 @@ +# PowerSync Service Module MongoDB + +MongoDB replication module for PowerSync diff --git a/modules/module-mongodb/package.json b/modules/module-mongodb/package.json new file mode 100644 index 000000000..5809bed99 --- /dev/null +++ b/modules/module-mongodb/package.json @@ -0,0 +1,47 @@ +{ + "name": "@powersync/service-module-mongodb", + "repository": "https://github.com/powersync-ja/powersync-service", + "types": "dist/index.d.ts", + "version": "0.0.1", + "main": "dist/index.js", + "license": "FSL-1.1-Apache-2.0", + "type": "module", + "publishConfig": { + "access": "public" + }, + "scripts": { + "build": "tsc -b", + "build:tests": "tsc -b test/tsconfig.json", + "clean": "rm -rf ./lib && tsc -b --clean", + "test": "vitest --no-threads" + }, + "exports": { + ".": { + "import": "./dist/index.js", + "require": "./dist/index.js", + "default": "./dist/index.js" + }, + "./types": { + "import": "./dist/types/types.js", + "require": "./dist/types/types.js", + "default": "./dist/types/types.js" + } + }, + "dependencies": { + "@powersync/lib-services-framework": "workspace:*", + "@powersync/service-core": "workspace:*", + "@powersync/service-jsonbig": "workspace:*", + "@powersync/service-sync-rules": "workspace:*", + "@powersync/service-types": "workspace:*", + "mongodb": "^6.7.0", + "ts-codec": "^1.2.2", + "uuid": "^9.0.1", + "uri-js": "^4.4.1" + }, + "devDependencies": { + "@types/uuid": "^9.0.4", + "typescript": "^5.2.2", + "vitest": "^0.34.6", + "vite-tsconfig-paths": "^4.3.2" + } +} diff --git a/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts new file mode 100644 index 000000000..e96cc8cb8 --- /dev/null +++ b/modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts @@ -0,0 +1,307 @@ +import { api, ParseSyncRulesOptions, SourceTable } from '@powersync/service-core'; +import * as mongo from 'mongodb'; + +import * as sync_rules from '@powersync/service-sync-rules'; +import * as service_types from '@powersync/service-types'; +import { MongoManager } from '../replication/MongoManager.js'; +import { constructAfterRecord, createCheckpoint } from '../replication/MongoRelation.js'; +import * as types from '../types/types.js'; +import { escapeRegExp } from '../utils.js'; + +export class MongoRouteAPIAdapter implements api.RouteAPI { + protected client: mongo.MongoClient; + public db: mongo.Db; + + connectionTag: string; + defaultSchema: string; + + constructor(protected config: types.ResolvedConnectionConfig) { + const manager = new MongoManager(config); + this.client = manager.client; + this.db = manager.db; + this.defaultSchema = manager.db.databaseName; + this.connectionTag = config.tag ?? sync_rules.DEFAULT_TAG; + } + + getParseSyncRulesOptions(): ParseSyncRulesOptions { + return { + defaultSchema: this.defaultSchema + }; + } + + async shutdown(): Promise { + await this.client.close(); + } + + async getSourceConfig(): Promise { + return this.config; + } + + async getConnectionStatus(): Promise { + const base = { + id: this.config.id, + uri: types.baseUri(this.config) + }; + + try { + await this.client.connect(); + await this.db.command({ hello: 1 }); + } catch (e) { + return { + ...base, + connected: false, + errors: [{ level: 'fatal', message: e.message }] + }; + } + return { + ...base, + connected: true, + errors: [] + }; + } + + async executeQuery(query: string, params: any[]): Promise { + return service_types.internal_routes.ExecuteSqlResponse.encode({ + results: { + columns: [], + rows: [] + }, + success: false, + error: 'SQL querying is not supported for MongoDB' + }); + } + + async getDebugTablesInfo( + tablePatterns: sync_rules.TablePattern[], + sqlSyncRules: sync_rules.SqlSyncRules + ): Promise { + let result: api.PatternResult[] = []; + for (let tablePattern of tablePatterns) { + const schema = tablePattern.schema; + + let patternResult: api.PatternResult = { + schema: schema, + pattern: tablePattern.tablePattern, + wildcard: tablePattern.isWildcard + }; + result.push(patternResult); + + let nameFilter: RegExp | string; + if (tablePattern.isWildcard) { + nameFilter = new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)); + } else { + nameFilter = tablePattern.name; + } + + // Check if the collection exists + const collections = await this.client + .db(schema) + .listCollections( + { + name: nameFilter + }, + { nameOnly: true } + ) + .toArray(); + + if (tablePattern.isWildcard) { + patternResult.tables = []; + for (let collection of collections) { + const sourceTable = new SourceTable( + 0, + this.connectionTag, + collection.name, + schema, + collection.name, + [], + true + ); + const syncData = sqlSyncRules.tableSyncsData(sourceTable); + const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); + patternResult.tables.push({ + schema, + name: collection.name, + replication_id: ['_id'], + data_queries: syncData, + parameter_queries: syncParameters, + errors: [] + }); + } + } else { + const sourceTable = new SourceTable( + 0, + this.connectionTag, + tablePattern.name, + schema, + tablePattern.name, + [], + true + ); + + const syncData = sqlSyncRules.tableSyncsData(sourceTable); + const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable); + + if (collections.length == 1) { + patternResult.table = { + schema, + name: tablePattern.name, + replication_id: ['_id'], + data_queries: syncData, + parameter_queries: syncParameters, + errors: [] + }; + } else { + patternResult.table = { + schema, + name: tablePattern.name, + replication_id: ['_id'], + data_queries: syncData, + parameter_queries: syncParameters, + errors: [{ level: 'warning', message: `Collection ${schema}.${tablePattern.name} not found` }] + }; + } + } + } + return result; + } + + async getReplicationLag(syncRulesId: string): Promise { + // There is no fast way to get replication lag in bytes in MongoDB. + // We can get replication lag in seconds, but need a different API for that. + return undefined; + } + + async getReplicationHead(): Promise { + return createCheckpoint(this.client, this.db); + } + + async getConnectionSchema(): Promise { + const sampleSize = 50; + + const databases = await this.db.admin().listDatabases({ nameOnly: true }); + const filteredDatabases = databases.databases.filter((db) => { + return !['local', 'admin', 'config'].includes(db.name); + }); + const databaseSchemas = await Promise.all( + filteredDatabases.map(async (db) => { + /** + * Filtering the list of database with `authorizedDatabases: true` + * does not produce the full list of databases under some circumstances. + * This catches any potential auth errors. + */ + let collections: mongo.CollectionInfo[]; + try { + collections = await this.client.db(db.name).listCollections().toArray(); + } catch (ex) { + return null; + } + + const filtered = collections.filter((c) => { + return !['_powersync_checkpoints'].includes(c.name); + }); + + const tables = await Promise.all( + filtered.map(async (collection) => { + const sampleDocuments = await this.db + .collection(collection.name) + .aggregate([{ $sample: { size: sampleSize } }]) + .toArray(); + + if (sampleDocuments.length > 0) { + const columns = this.getColumnsFromDocuments(sampleDocuments); + + return { + name: collection.name, + // Since documents are sampled in a random order, we need to sort + // to get a consistent order + columns: columns.sort((a, b) => a.name.localeCompare(b.name)) + } satisfies service_types.TableSchema; + } else { + return { + name: collection.name, + columns: [] + } satisfies service_types.TableSchema; + } + }) + ); + return { + name: db.name, + tables: tables + } satisfies service_types.DatabaseSchema; + }) + ); + return databaseSchemas.filter((schema) => !!schema); + } + + private getColumnsFromDocuments(documents: mongo.BSON.Document[]) { + let columns = new Map }>(); + for (const document of documents) { + const parsed = constructAfterRecord(document); + for (const key in parsed) { + const value = parsed[key]; + const type = sync_rules.sqliteTypeOf(value); + const sqliteType = sync_rules.ExpressionType.fromTypeText(type); + let entry = columns.get(key); + if (entry == null) { + entry = { sqliteType, bsonTypes: new Set() }; + columns.set(key, entry); + } else { + entry.sqliteType = entry.sqliteType.or(sqliteType); + } + const bsonType = this.getBsonType(document[key]); + if (bsonType != null) { + entry.bsonTypes.add(bsonType); + } + } + } + return [...columns.entries()].map(([key, value]) => { + const internal_type = value.bsonTypes.size == 0 ? '' : [...value.bsonTypes].join(' | '); + return { + name: key, + type: internal_type, + sqlite_type: value.sqliteType.typeFlags, + internal_type, + pg_type: internal_type + }; + }); + } + + private getBsonType(data: any): string | null { + if (data == null) { + // null or undefined + return 'Null'; + } else if (typeof data == 'string') { + return 'String'; + } else if (typeof data == 'number') { + if (Number.isInteger(data)) { + return 'Integer'; + } else { + return 'Double'; + } + } else if (typeof data == 'bigint') { + return 'Long'; + } else if (typeof data == 'boolean') { + return 'Boolean'; + } else if (data instanceof mongo.ObjectId) { + return 'ObjectId'; + } else if (data instanceof mongo.UUID) { + return 'UUID'; + } else if (data instanceof Date) { + return 'Date'; + } else if (data instanceof mongo.Timestamp) { + return 'Timestamp'; + } else if (data instanceof mongo.Binary) { + return 'Binary'; + } else if (data instanceof mongo.Long) { + return 'Long'; + } else if (Array.isArray(data)) { + return 'Array'; + } else if (data instanceof Uint8Array) { + return 'Binary'; + } else if (typeof data == 'object') { + return 'Object'; + } else { + return null; + } + } +} diff --git a/modules/module-mongodb/src/index.ts b/modules/module-mongodb/src/index.ts new file mode 100644 index 000000000..4cfc25695 --- /dev/null +++ b/modules/module-mongodb/src/index.ts @@ -0,0 +1,5 @@ +import { MongoModule } from './module/MongoModule.js'; + +export const module = new MongoModule(); + +export default module; diff --git a/modules/module-mongodb/src/module/MongoModule.ts b/modules/module-mongodb/src/module/MongoModule.ts new file mode 100644 index 000000000..3f6e27636 --- /dev/null +++ b/modules/module-mongodb/src/module/MongoModule.ts @@ -0,0 +1,52 @@ +import { api, ConfigurationFileSyncRulesProvider, replication, system, TearDownOptions } from '@powersync/service-core'; +import { MongoRouteAPIAdapter } from '../api/MongoRouteAPIAdapter.js'; +import { ConnectionManagerFactory } from '../replication/ConnectionManagerFactory.js'; +import { MongoErrorRateLimiter } from '../replication/MongoErrorRateLimiter.js'; +import { ChangeStreamReplicator } from '../replication/ChangeStreamReplicator.js'; +import * as types from '../types/types.js'; + +export class MongoModule extends replication.ReplicationModule { + constructor() { + super({ + name: 'MongoDB', + type: types.MONGO_CONNECTION_TYPE, + configSchema: types.MongoConnectionConfig + }); + } + + async initialize(context: system.ServiceContextContainer): Promise { + await super.initialize(context); + } + + protected createRouteAPIAdapter(): api.RouteAPI { + return new MongoRouteAPIAdapter(this.resolveConfig(this.decodedConfig!)); + } + + protected createReplicator(context: system.ServiceContext): replication.AbstractReplicator { + const normalisedConfig = this.resolveConfig(this.decodedConfig!); + const syncRuleProvider = new ConfigurationFileSyncRulesProvider(context.configuration.sync_rules); + const connectionFactory = new ConnectionManagerFactory(normalisedConfig); + + return new ChangeStreamReplicator({ + id: this.getDefaultId(normalisedConfig.database ?? ''), + syncRuleProvider: syncRuleProvider, + storageEngine: context.storageEngine, + connectionFactory: connectionFactory, + rateLimiter: new MongoErrorRateLimiter() + }); + } + + /** + * Combines base config with normalized connection settings + */ + private resolveConfig(config: types.MongoConnectionConfig): types.ResolvedConnectionConfig { + return { + ...config, + ...types.normalizeConnectionConfig(config) + }; + } + + async teardown(options: TearDownOptions): Promise { + // TODO: Implement? + } +} diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts new file mode 100644 index 000000000..3d36e1814 --- /dev/null +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -0,0 +1,499 @@ +import { container, logger } from '@powersync/lib-services-framework'; +import { Metrics, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core'; +import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; +import * as mongo from 'mongodb'; +import { MongoManager } from './MongoManager.js'; +import { + constructAfterRecord, + createCheckpoint, + getMongoLsn, + getMongoRelation, + mongoLsnToTimestamp +} from './MongoRelation.js'; +import { escapeRegExp } from '../utils.js'; + +export const ZERO_LSN = '0000000000000000'; + +export interface ChangeStreamOptions { + connections: MongoManager; + storage: storage.SyncRulesBucketStorage; + abort_signal: AbortSignal; +} + +interface InitResult { + needsInitialSync: boolean; +} + +export class MissingReplicationSlotError extends Error { + constructor(message: string) { + super(message); + } +} + +export class ChangeStream { + sync_rules: SqlSyncRules; + group_id: number; + + connection_id = 1; + + private readonly storage: storage.SyncRulesBucketStorage; + + private connections: MongoManager; + private readonly client: mongo.MongoClient; + private readonly defaultDb: mongo.Db; + + private abort_signal: AbortSignal; + + private relation_cache = new Map(); + + constructor(options: ChangeStreamOptions) { + this.storage = options.storage; + this.group_id = options.storage.group_id; + this.connections = options.connections; + this.client = this.connections.client; + this.defaultDb = this.connections.db; + this.sync_rules = options.storage.getParsedSyncRules({ + defaultSchema: this.defaultDb.databaseName + }); + + this.abort_signal = options.abort_signal; + this.abort_signal.addEventListener( + 'abort', + () => { + // TODO: Fast abort? + }, + { once: true } + ); + } + + get stopped() { + return this.abort_signal.aborted; + } + + async getQualifiedTableNames( + batch: storage.BucketStorageBatch, + tablePattern: TablePattern + ): Promise { + const schema = tablePattern.schema; + if (tablePattern.connectionTag != this.connections.connectionTag) { + return []; + } + + let nameFilter: RegExp | string; + if (tablePattern.isWildcard) { + nameFilter = new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)); + } else { + nameFilter = tablePattern.name; + } + let result: storage.SourceTable[] = []; + + // Check if the collection exists + const collections = await this.client + .db(schema) + .listCollections( + { + name: nameFilter + }, + { nameOnly: true } + ) + .toArray(); + + for (let collection of collections) { + const table = await this.handleRelation( + batch, + { + name: collection.name, + schema, + objectId: collection.name, + replicationColumns: [{ name: '_id' }] + } as SourceEntityDescriptor, + // This is done as part of the initial setup - snapshot is handled elsewhere + { snapshot: false } + ); + + result.push(table); + } + + return result; + } + + async initSlot(): Promise { + const status = await this.storage.getStatus(); + if (status.snapshot_done && status.checkpoint_lsn) { + logger.info(`Initial replication already done`); + return { needsInitialSync: false }; + } + + return { needsInitialSync: true }; + } + + async estimatedCount(table: storage.SourceTable): Promise { + const db = this.client.db(table.schema); + const count = db.collection(table.table).estimatedDocumentCount(); + return `~${count}`; + } + + /** + * Start initial replication. + * + * If (partial) replication was done before on this slot, this clears the state + * and starts again from scratch. + */ + async startInitialReplication() { + await this.storage.clear(); + await this.initialReplication(); + } + + async initialReplication() { + const sourceTables = this.sync_rules.getSourceTables(); + await this.client.connect(); + + const hello = await this.defaultDb.command({ hello: 1 }); + const startTime = hello.lastWrite?.majorityOpTime?.ts as mongo.Timestamp; + if (hello.msg == 'isdbgrid') { + throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).'); + } else if (hello.setName == null) { + throw new Error('Standalone MongoDB instances are not supported - use a replicaset.'); + } else if (startTime == null) { + // Not known where this would happen apart from the above cases + throw new Error('MongoDB lastWrite timestamp not found.'); + } + const session = await this.client.startSession({ + snapshot: true + }); + try { + await this.storage.startBatch( + { zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, + async (batch) => { + for (let tablePattern of sourceTables) { + const tables = await this.getQualifiedTableNames(batch, tablePattern); + for (let table of tables) { + await this.snapshotTable(batch, table, session); + await batch.markSnapshotDone([table], ZERO_LSN); + + await touch(); + } + } + + const snapshotTime = session.clusterTime?.clusterTime ?? startTime; + + if (snapshotTime != null) { + const lsn = getMongoLsn(snapshotTime); + logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`); + // keepalive() does an auto-commit if there is data + await batch.flush(); + await batch.keepalive(lsn); + } else { + throw new Error(`No snapshot clusterTime available.`); + } + } + ); + } finally { + session.endSession(); + } + } + + private getSourceNamespaceFilters() { + const sourceTables = this.sync_rules.getSourceTables(); + + let $inFilters: any[] = [{ db: this.defaultDb.databaseName, coll: '_powersync_checkpoints' }]; + let $refilters: any[] = []; + for (let tablePattern of sourceTables) { + if (tablePattern.connectionTag != this.connections.connectionTag) { + continue; + } + + if (tablePattern.isWildcard) { + $refilters.push({ db: tablePattern.schema, coll: new RegExp('^' + escapeRegExp(tablePattern.tablePrefix)) }); + } else { + $inFilters.push({ + db: tablePattern.schema, + coll: tablePattern.name + }); + } + } + if ($refilters.length > 0) { + return { $or: [{ ns: { $in: $inFilters } }, ...$refilters] }; + } + return { ns: { $in: $inFilters } }; + } + + static *getQueryData(results: Iterable): Generator { + for (let row of results) { + yield constructAfterRecord(row); + } + } + + private async snapshotTable( + batch: storage.BucketStorageBatch, + table: storage.SourceTable, + session?: mongo.ClientSession + ) { + logger.info(`Replicating ${table.qualifiedName}`); + const estimatedCount = await this.estimatedCount(table); + let at = 0; + + const db = this.client.db(table.schema); + const collection = db.collection(table.table); + const query = collection.find({}, { session }); + + const cursor = query.stream(); + + for await (let document of cursor) { + if (this.abort_signal.aborted) { + throw new Error(`Aborted initial replication`); + } + + const record = constructAfterRecord(document); + + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: 'insert', + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: document._id + }); + + at += 1; + Metrics.getInstance().rows_replicated_total.add(1); + + await touch(); + } + + await batch.flush(); + } + + private async getRelation( + batch: storage.BucketStorageBatch, + descriptor: SourceEntityDescriptor + ): Promise { + const existing = this.relation_cache.get(descriptor.objectId); + if (existing != null) { + return existing; + } + return this.handleRelation(batch, descriptor, { snapshot: false }); + } + + async handleRelation( + batch: storage.BucketStorageBatch, + descriptor: SourceEntityDescriptor, + options: { snapshot: boolean } + ) { + const snapshot = options.snapshot; + if (!descriptor.objectId && typeof descriptor.objectId != 'string') { + throw new Error('objectId expected'); + } + const result = await this.storage.resolveTable({ + group_id: this.group_id, + connection_id: this.connection_id, + connection_tag: this.connections.connectionTag, + entity_descriptor: descriptor, + sync_rules: this.sync_rules + }); + this.relation_cache.set(descriptor.objectId, result.table); + + // Drop conflicting tables. This includes for example renamed tables. + await batch.drop(result.dropTables); + + // Snapshot if: + // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere) + // 2. Snapshot is not already done, AND: + // 3. The table is used in sync rules. + const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny; + if (shouldSnapshot) { + // Truncate this table, in case a previous snapshot was interrupted. + await batch.truncate([result.table]); + + await this.snapshotTable(batch, result.table); + const no_checkpoint_before_lsn = await createCheckpoint(this.client, this.defaultDb); + + const [table] = await batch.markSnapshotDone([result.table], no_checkpoint_before_lsn); + return table; + } + + return result.table; + } + + async writeChange( + batch: storage.BucketStorageBatch, + table: storage.SourceTable, + change: mongo.ChangeStreamDocument + ): Promise { + if (!table.syncAny) { + logger.debug(`Collection ${table.qualifiedName} not used in sync rules - skipping`); + return null; + } + + Metrics.getInstance().rows_replicated_total.add(1); + if (change.operationType == 'insert') { + const baseRecord = constructAfterRecord(change.fullDocument); + return await batch.save({ + tag: 'insert', + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: baseRecord, + afterReplicaId: change.documentKey._id + }); + } else if (change.operationType == 'update' || change.operationType == 'replace') { + if (change.fullDocument == null) { + // Treat as delete + return await batch.save({ + tag: 'delete', + sourceTable: table, + before: undefined, + beforeReplicaId: change.documentKey._id + }); + } + const after = constructAfterRecord(change.fullDocument!); + return await batch.save({ + tag: 'update', + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: after, + afterReplicaId: change.documentKey._id + }); + } else if (change.operationType == 'delete') { + return await batch.save({ + tag: 'delete', + sourceTable: table, + before: undefined, + beforeReplicaId: change.documentKey._id + }); + } else { + throw new Error(`Unsupported operation: ${change.operationType}`); + } + } + + async replicate() { + try { + // If anything errors here, the entire replication process is halted, and + // all connections automatically closed, including this one. + + await this.initReplication(); + await this.streamChanges(); + } catch (e) { + await this.storage.reportError(e); + throw e; + } + } + + async initReplication() { + const result = await this.initSlot(); + if (result.needsInitialSync) { + await this.startInitialReplication(); + } + } + + async streamChanges() { + // Auto-activate as soon as initial replication is done + await this.storage.autoActivate(); + + await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, async (batch) => { + const lastLsn = batch.lastCheckpointLsn; + const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined; + logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`); + + // TODO: Use changeStreamSplitLargeEvent + + const pipeline: mongo.Document[] = [ + { + $match: this.getSourceNamespaceFilters() + } + ]; + + const stream = this.client.watch(pipeline, { + startAtOperationTime: startAfter, + showExpandedEvents: true, + useBigInt64: true, + maxAwaitTimeMS: 200, + fullDocument: 'updateLookup' + }); + + if (this.abort_signal.aborted) { + stream.close(); + return; + } + + this.abort_signal.addEventListener('abort', () => { + stream.close(); + }); + + let waitForCheckpointLsn: string | null = null; + + while (true) { + if (this.abort_signal.aborted) { + break; + } + + const changeDocument = await stream.tryNext(); + + if (changeDocument == null || this.abort_signal.aborted) { + continue; + } + await touch(); + + if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) { + continue; + } + + // console.log('event', changeDocument); + + if ( + (changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace') && + changeDocument.ns.coll == '_powersync_checkpoints' + ) { + const lsn = getMongoLsn(changeDocument.clusterTime!); + if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) { + waitForCheckpointLsn = null; + } + await batch.flush(); + await batch.keepalive(lsn); + } else if ( + changeDocument.operationType == 'insert' || + changeDocument.operationType == 'update' || + changeDocument.operationType == 'replace' || + changeDocument.operationType == 'delete' + ) { + if (waitForCheckpointLsn == null) { + waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb); + } + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel); + if (table.syncAny) { + await this.writeChange(batch, table, changeDocument); + } + } else if (changeDocument.operationType == 'drop') { + const rel = getMongoRelation(changeDocument.ns); + const table = await this.getRelation(batch, rel); + if (table.syncAny) { + await batch.drop([table]); + this.relation_cache.delete(table.objectId); + } + } else if (changeDocument.operationType == 'rename') { + const relFrom = getMongoRelation(changeDocument.ns); + const relTo = getMongoRelation(changeDocument.to); + const tableFrom = await this.getRelation(batch, relFrom); + if (tableFrom.syncAny) { + await batch.drop([tableFrom]); + this.relation_cache.delete(tableFrom.objectId); + } + // Here we do need to snapshot the new table + await this.handleRelation(batch, relTo, { snapshot: true }); + } + } + }); + } +} + +async function touch() { + // FIXME: The hosted Kubernetes probe does not actually check the timestamp on this. + // FIXME: We need a timeout of around 5+ minutes in Kubernetes if we do start checking the timestamp, + // or reduce PING_INTERVAL here. + return container.probes.touch(); +} diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts new file mode 100644 index 000000000..fb60e7aa7 --- /dev/null +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicationJob.ts @@ -0,0 +1,104 @@ +import { container } from '@powersync/lib-services-framework'; +import { MongoManager } from './MongoManager.js'; +import { MissingReplicationSlotError, ChangeStream } from './ChangeStream.js'; + +import { replication } from '@powersync/service-core'; +import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; + +import * as mongo from 'mongodb'; + +export interface ChangeStreamReplicationJobOptions extends replication.AbstractReplicationJobOptions { + connectionFactory: ConnectionManagerFactory; +} + +export class ChangeStreamReplicationJob extends replication.AbstractReplicationJob { + private connectionFactory: ConnectionManagerFactory; + private readonly connectionManager: MongoManager; + + constructor(options: ChangeStreamReplicationJobOptions) { + super(options); + this.connectionFactory = options.connectionFactory; + this.connectionManager = this.connectionFactory.create(); + } + + async cleanUp(): Promise { + // TODO: Implement? + } + + async keepAlive() { + // TODO: Implement? + } + + private get slotName() { + return this.options.storage.slot_name; + } + + async replicate() { + try { + await this.replicateLoop(); + } catch (e) { + // Fatal exception + container.reporter.captureException(e, { + metadata: {} + }); + this.logger.error(`Replication failed`, e); + + if (e instanceof MissingReplicationSlotError) { + // This stops replication on this slot, and creates a new slot + await this.options.storage.factory.slotRemoved(this.slotName); + } + } finally { + this.abortController.abort(); + } + } + + async replicateLoop() { + while (!this.isStopped) { + await this.replicateOnce(); + + if (!this.isStopped) { + await new Promise((resolve) => setTimeout(resolve, 5000)); + } + } + } + + async replicateOnce() { + // New connections on every iteration (every error with retry), + // otherwise we risk repeating errors related to the connection, + // such as caused by cached PG schemas. + const connectionManager = this.connectionFactory.create(); + try { + await this.rateLimiter?.waitUntilAllowed({ signal: this.abortController.signal }); + if (this.isStopped) { + return; + } + const stream = new ChangeStream({ + abort_signal: this.abortController.signal, + storage: this.options.storage, + connections: connectionManager + }); + await stream.replicate(); + } catch (e) { + if (this.abortController.signal.aborted) { + return; + } + this.logger.error(`Replication error`, e); + if (e.cause != null) { + // Without this additional log, the cause may not be visible in the logs. + this.logger.error(`cause`, e.cause); + } + if (e instanceof mongo.MongoError && e.hasErrorLabel('NonResumableChangeStreamError')) { + throw new MissingReplicationSlotError(e.message); + } else { + // Report the error if relevant, before retrying + container.reporter.captureException(e, { + metadata: {} + }); + // This sets the retry delay + this.rateLimiter?.reportError(e); + } + } finally { + await connectionManager.end(); + } + } +} diff --git a/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts new file mode 100644 index 000000000..84b2c7f68 --- /dev/null +++ b/modules/module-mongodb/src/replication/ChangeStreamReplicator.ts @@ -0,0 +1,36 @@ +import { storage, replication } from '@powersync/service-core'; +import { ChangeStreamReplicationJob } from './ChangeStreamReplicationJob.js'; +import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; +import { MongoErrorRateLimiter } from './MongoErrorRateLimiter.js'; + +export interface WalStreamReplicatorOptions extends replication.AbstractReplicatorOptions { + connectionFactory: ConnectionManagerFactory; +} + +export class ChangeStreamReplicator extends replication.AbstractReplicator { + private readonly connectionFactory: ConnectionManagerFactory; + + constructor(options: WalStreamReplicatorOptions) { + super(options); + this.connectionFactory = options.connectionFactory; + } + + createJob(options: replication.CreateJobOptions): ChangeStreamReplicationJob { + return new ChangeStreamReplicationJob({ + id: this.createJobId(options.storage.group_id), + storage: options.storage, + connectionFactory: this.connectionFactory, + lock: options.lock, + rateLimiter: new MongoErrorRateLimiter() + }); + } + + async cleanUp(syncRulesStorage: storage.SyncRulesBucketStorage): Promise { + // TODO: Implement anything? + } + + async stop(): Promise { + await super.stop(); + await this.connectionFactory.shutdown(); + } +} diff --git a/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts b/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts new file mode 100644 index 000000000..c84c28e05 --- /dev/null +++ b/modules/module-mongodb/src/replication/ConnectionManagerFactory.ts @@ -0,0 +1,27 @@ +import { logger } from '@powersync/lib-services-framework'; +import { NormalizedMongoConnectionConfig } from '../types/types.js'; +import { MongoManager } from './MongoManager.js'; + +export class ConnectionManagerFactory { + private readonly connectionManagers: MongoManager[]; + private readonly dbConnectionConfig: NormalizedMongoConnectionConfig; + + constructor(dbConnectionConfig: NormalizedMongoConnectionConfig) { + this.dbConnectionConfig = dbConnectionConfig; + this.connectionManagers = []; + } + + create() { + const manager = new MongoManager(this.dbConnectionConfig); + this.connectionManagers.push(manager); + return manager; + } + + async shutdown() { + logger.info('Shutting down MongoDB connection Managers...'); + for (const manager of this.connectionManagers) { + await manager.end(); + } + logger.info('MongoDB connection Managers shutdown completed.'); + } +} diff --git a/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts new file mode 100644 index 000000000..bcf58db63 --- /dev/null +++ b/modules/module-mongodb/src/replication/MongoErrorRateLimiter.ts @@ -0,0 +1,45 @@ +import { setTimeout } from 'timers/promises'; +import { ErrorRateLimiter } from '@powersync/service-core'; + +export class MongoErrorRateLimiter implements ErrorRateLimiter { + nextAllowed: number = Date.now(); + + async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise { + const delay = Math.max(0, this.nextAllowed - Date.now()); + // Minimum delay between connections, even without errors + this.setDelay(500); + await setTimeout(delay, undefined, { signal: options?.signal }); + } + + mayPing(): boolean { + return Date.now() >= this.nextAllowed; + } + + reportError(e: any): void { + // FIXME: Check mongodb-specific requirements + const message = (e.message as string) ?? ''; + if (message.includes('password authentication failed')) { + // Wait 15 minutes, to avoid triggering Supabase's fail2ban + this.setDelay(900_000); + } else if (message.includes('ENOTFOUND')) { + // DNS lookup issue - incorrect URI or deleted instance + this.setDelay(120_000); + } else if (message.includes('ECONNREFUSED')) { + // Could be fail2ban or similar + this.setDelay(120_000); + } else if ( + message.includes('Unable to do postgres query on ended pool') || + message.includes('Postgres unexpectedly closed connection') + ) { + // Connection timed out - ignore / immediately retry + // We don't explicitly set the delay to 0, since there could have been another error that + // we need to respect. + } else { + this.setDelay(30_000); + } + } + + private setDelay(delay: number) { + this.nextAllowed = Math.max(this.nextAllowed, Date.now() + delay); + } +} diff --git a/modules/module-mongodb/src/replication/MongoManager.ts b/modules/module-mongodb/src/replication/MongoManager.ts new file mode 100644 index 000000000..cb2f9d54f --- /dev/null +++ b/modules/module-mongodb/src/replication/MongoManager.ts @@ -0,0 +1,47 @@ +import * as mongo from 'mongodb'; +import { NormalizedMongoConnectionConfig } from '../types/types.js'; + +export class MongoManager { + /** + * Do not use this for any transactions. + */ + public readonly client: mongo.MongoClient; + public readonly db: mongo.Db; + + constructor(public options: NormalizedMongoConnectionConfig) { + // The pool is lazy - no connections are opened until a query is performed. + this.client = new mongo.MongoClient(options.uri, { + auth: { + username: options.username, + password: options.password + }, + // Time for connection to timeout + connectTimeoutMS: 5_000, + // Time for individual requests to timeout + socketTimeoutMS: 60_000, + // How long to wait for new primary selection + serverSelectionTimeoutMS: 30_000, + + // Avoid too many connections: + // 1. It can overwhelm the source database. + // 2. Processing too many queries in parallel can cause the process to run out of memory. + maxPoolSize: 8, + + maxConnecting: 3, + maxIdleTimeMS: 60_000 + }); + this.db = this.client.db(options.database, {}); + } + + public get connectionTag() { + return this.options.tag; + } + + async end(): Promise { + await this.client.close(); + } + + async destroy() { + // TODO: Implement? + } +} diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts new file mode 100644 index 000000000..267674895 --- /dev/null +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -0,0 +1,156 @@ +import { storage } from '@powersync/service-core'; +import { SqliteRow, SqliteValue, toSyncRulesRow } from '@powersync/service-sync-rules'; +import * as mongo from 'mongodb'; +import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; + +export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.SourceEntityDescriptor { + return { + name: source.coll, + schema: source.db, + objectId: source.coll, + replicationColumns: [{ name: '_id' }] + } satisfies storage.SourceEntityDescriptor; +} + +export function getMongoLsn(timestamp: mongo.Timestamp) { + const a = timestamp.high.toString(16).padStart(8, '0'); + const b = timestamp.low.toString(16).padStart(8, '0'); + return a + b; +} + +export function mongoLsnToTimestamp(lsn: string | null) { + if (lsn == null) { + return null; + } + const a = parseInt(lsn.substring(0, 8), 16); + const b = parseInt(lsn.substring(8, 16), 16); + return mongo.Timestamp.fromBits(b, a); +} + +export function constructAfterRecord(document: mongo.Document): SqliteRow { + let record: SqliteRow = {}; + for (let key of Object.keys(document)) { + record[key] = toMongoSyncRulesValue(document[key]); + } + return record; +} + +export function toMongoSyncRulesValue(data: any): SqliteValue { + const autoBigNum = true; + if (data == null) { + // null or undefined + return data; + } else if (typeof data == 'string') { + return data; + } else if (typeof data == 'number') { + if (Number.isInteger(data) && autoBigNum) { + return BigInt(data); + } else { + return data; + } + } else if (typeof data == 'bigint') { + return data; + } else if (typeof data == 'boolean') { + return data ? 1n : 0n; + } else if (data instanceof mongo.ObjectId) { + return data.toHexString(); + } else if (data instanceof mongo.UUID) { + return data.toHexString(); + } else if (data instanceof Date) { + return data.toISOString().replace('T', ' '); + } else if (data instanceof mongo.Binary) { + return new Uint8Array(data.buffer); + } else if (data instanceof mongo.Long) { + return data.toBigInt(); + } else if (Array.isArray(data)) { + // We may be able to avoid some parse + stringify cycles here for JsonSqliteContainer. + return JSONBig.stringify(data.map((element) => filterJsonData(element))); + } else if (data instanceof Uint8Array) { + return data; + } else if (data instanceof JsonContainer) { + return data.toString(); + } else if (typeof data == 'object') { + let record: Record = {}; + for (let key of Object.keys(data)) { + record[key] = filterJsonData(data[key]); + } + return JSONBig.stringify(record); + } else { + return null; + } +} + +const DEPTH_LIMIT = 20; + +function filterJsonData(data: any, depth = 0): any { + const autoBigNum = true; + if (depth > DEPTH_LIMIT) { + // This is primarily to prevent infinite recursion + throw new Error(`json nested object depth exceeds the limit of ${DEPTH_LIMIT}`); + } + if (data == null) { + return data; // null or undefined + } else if (typeof data == 'string') { + return data; + } else if (typeof data == 'number') { + if (autoBigNum && Number.isInteger(data)) { + return BigInt(data); + } else { + return data; + } + } else if (typeof data == 'boolean') { + return data ? 1n : 0n; + } else if (typeof data == 'bigint') { + return data; + } else if (data instanceof Date) { + return data.toISOString().replace('T', ' '); + } else if (data instanceof mongo.ObjectId) { + return data.toHexString(); + } else if (data instanceof mongo.UUID) { + return data.toHexString(); + } else if (data instanceof mongo.Binary) { + return undefined; + } else if (data instanceof mongo.Long) { + return data.toBigInt(); + } else if (Array.isArray(data)) { + return data.map((element) => filterJsonData(element, depth + 1)); + } else if (ArrayBuffer.isView(data)) { + return undefined; + } else if (data instanceof JsonContainer) { + // Can be stringified directly when using our JSONBig implementation + return data; + } else if (typeof data == 'object') { + let record: Record = {}; + for (let key of Object.keys(data)) { + record[key] = filterJsonData(data[key], depth + 1); + } + return record; + } else { + return undefined; + } +} + +export async function createCheckpoint(client: mongo.MongoClient, db: mongo.Db): Promise { + const session = client.startSession(); + try { + const result = await db.collection('_powersync_checkpoints').findOneAndUpdate( + { + _id: 'checkpoint' as any + }, + { + $inc: { i: 1 } + }, + { + upsert: true, + returnDocument: 'after', + session + } + ); + const time = session.operationTime!; + // console.log('marked checkpoint at', time, getMongoLsn(time)); + // TODO: Use the above when we support custom write checkpoints + return getMongoLsn(time); + } finally { + await session.endSession(); + } +} diff --git a/modules/module-mongodb/src/replication/replication-index.ts b/modules/module-mongodb/src/replication/replication-index.ts new file mode 100644 index 000000000..4ff43b56a --- /dev/null +++ b/modules/module-mongodb/src/replication/replication-index.ts @@ -0,0 +1,4 @@ +export * from './MongoRelation.js'; +export * from './ChangeStream.js'; +export * from './ChangeStreamReplicator.js'; +export * from './ChangeStreamReplicationJob.js'; diff --git a/modules/module-mongodb/src/types/types.ts b/modules/module-mongodb/src/types/types.ts new file mode 100644 index 000000000..572a8b4dd --- /dev/null +++ b/modules/module-mongodb/src/types/types.ts @@ -0,0 +1,65 @@ +import { normalizeMongoConfig } from '@powersync/service-core'; +import * as service_types from '@powersync/service-types'; +import * as t from 'ts-codec'; + +export const MONGO_CONNECTION_TYPE = 'mongodb' as const; + +export interface NormalizedMongoConnectionConfig { + id: string; + tag: string; + + uri: string; + database: string; + + username?: string; + password?: string; +} + +export const MongoConnectionConfig = service_types.configFile.dataSourceConfig.and( + t.object({ + type: t.literal(MONGO_CONNECTION_TYPE), + /** Unique identifier for the connection - optional when a single connection is present. */ + id: t.string.optional(), + /** Tag used as reference in sync rules. Defaults to "default". Does not have to be unique. */ + tag: t.string.optional(), + uri: t.string, + username: t.string.optional(), + password: t.string.optional(), + database: t.string.optional() + }) +); + +/** + * Config input specified when starting services + */ +export type MongoConnectionConfig = t.Decoded; + +/** + * Resolved version of {@link MongoConnectionConfig} + */ +export type ResolvedConnectionConfig = MongoConnectionConfig & NormalizedMongoConnectionConfig; + +/** + * Validate and normalize connection options. + * + * Returns destructured options. + */ +export function normalizeConnectionConfig(options: MongoConnectionConfig): NormalizedMongoConnectionConfig { + const base = normalizeMongoConfig(options); + + return { + id: options.id ?? 'default', + tag: options.tag ?? 'default', + + ...base + }; +} + +/** + * Construct a mongodb URI, without username, password or ssl options. + * + * Only contains hostname, port, database. + */ +export function baseUri(options: NormalizedMongoConnectionConfig) { + return options.uri; +} diff --git a/modules/module-mongodb/src/utils.ts b/modules/module-mongodb/src/utils.ts new file mode 100644 index 000000000..badee3083 --- /dev/null +++ b/modules/module-mongodb/src/utils.ts @@ -0,0 +1,4 @@ +export function escapeRegExp(string: string) { + // https://stackoverflow.com/a/3561711/214837 + return string.replace(/[/\-\\^$*+?.()|[\]{}]/g, '\\$&'); +} diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts new file mode 100644 index 000000000..f950e4f35 --- /dev/null +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -0,0 +1,306 @@ +import { putOp, removeOp } from '@core-tests/stream_utils.js'; +import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; +import { BucketStorageFactory } from '@powersync/service-core'; +import * as crypto from 'crypto'; +import { describe, expect, test } from 'vitest'; +import { walStreamTest } from './change_stream_utils.js'; +import * as mongo from 'mongodb'; +import { setTimeout } from 'node:timers/promises'; + +type StorageFactory = () => Promise; + +const BASIC_SYNC_RULES = ` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data" +`; + +describe( + 'change stream - mongodb', + function () { + defineWalStreamTests(MONGO_STORAGE_FACTORY); + }, + { timeout: 20_000 } +); + +function defineWalStreamTests(factory: StorageFactory) { + test( + 'replicating basic values', + walStreamTest(factory, async (context) => { + const { db } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: true } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); + const test_id = result.insertedId; + await setTimeout(10); + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); + await setTimeout(10); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }); + await setTimeout(10); + await collection.deleteOne({ _id: test_id }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test2', num: 1152921504606846976n }), + putOp('test_data', { id: test_id.toHexString(), description: 'test3' }), + removeOp('test_data', test_id.toHexString()) + ]); + }) + ); + + test( + 'no fullDocument available', + walStreamTest(factory, async (context) => { + const { db, client } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT _id as id, description, num FROM "test_data"`); + + db.createCollection('test_data', { + changeStreamPreAndPostImages: { enabled: false } + }); + const collection = db.collection('test_data'); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const session = client.startSession(); + let test_id: mongo.ObjectId | undefined; + try { + await session.withTransaction(async () => { + const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }, { session }); + test_id = result.insertedId; + await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }, { session }); + await collection.replaceOne({ _id: test_id }, { description: 'test3' }, { session }); + await collection.deleteOne({ _id: test_id }, { session }); + }); + } finally { + await session.endSession(); + } + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id!.toHexString(), description: 'test1', num: 1152921504606846976n }), + // fullDocument is not available at the point this is replicated, resulting in it treated as a remove + removeOp('test_data', test_id!.toHexString()), + putOp('test_data', { id: test_id!.toHexString(), description: 'test3' }), + removeOp('test_data', test_id!.toHexString()) + ]); + }) + ); + + test( + 'replicating case sensitive table', + walStreamTest(factory, async (context) => { + const { db } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_DATA" + `); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const collection = db.collection('test_DATA'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_DATA', { id: test_id, description: 'test1' })]); + }) + ); + + test( + 'replicating large values', + walStreamTest(factory, async (context) => { + const { db } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT _id as id, name, description FROM "test_data" + `); + + await context.replicateSnapshot(); + context.startStreaming(); + + const largeDescription = crypto.randomBytes(20_000).toString('hex'); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ name: 'test1', description: largeDescription }); + const test_id = result.insertedId; + + await collection.updateOne({ _id: test_id }, { $set: { name: 'test2' } }); + + const data = await context.getBucketData('global[]'); + expect(data.slice(0, 1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test1', description: largeDescription }) + ]); + expect(data.slice(1)).toMatchObject([ + putOp('test_data', { id: test_id.toHexString(), name: 'test2', description: largeDescription }) + ]); + }) + ); + + test( + 'replicating dropCollection', + walStreamTest(factory, async (context) => { + const { db } = context; + const syncRuleContent = ` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data" + by_test_data: + parameters: SELECT _id as id FROM test_data WHERE id = token_parameters.user_id + data: [] +`; + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + await collection.drop(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: test_id, description: 'test1' }), + removeOp('test_data', test_id) + ]); + }) + ); + + test( + 'replicating renameCollection', + walStreamTest(factory, async (context) => { + const { db } = context; + const syncRuleContent = ` +bucket_definitions: + global: + data: + - SELECT _id as id, description FROM "test_data1" + - SELECT _id as id, description FROM "test_data2" +`; + await context.updateSyncRules(syncRuleContent); + await context.replicateSnapshot(); + context.startStreaming(); + + console.log('insert1', db.databaseName); + const collection = db.collection('test_data1'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + await collection.rename('test_data2'); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data1', { id: test_id, description: 'test1' }), + removeOp('test_data1', test_id), + putOp('test_data2', { id: test_id, description: 'test1' }) + ]); + }) + ); + + test( + 'initial sync', + walStreamTest(factory, async (context) => { + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: 'test1' }); + const test_id = result.insertedId.toHexString(); + + await context.replicateSnapshot(); + context.startStreaming(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: test_id, description: 'test1' })]); + }) + ); + + // Not correctly implemented yet + test.skip( + 'large record', + walStreamTest(factory, async (context) => { + await context.updateSyncRules(`bucket_definitions: + global: + data: + - SELECT _id as id, description, other FROM "test_data"`); + const { db } = context; + + await context.replicateSnapshot(); + + // 16MB + const largeDescription = crypto.randomBytes(8_000_000 - 100).toString('hex'); + + const collection = db.collection('test_data'); + const result = await collection.insertOne({ description: largeDescription }); + const test_id = result.insertedId; + + await collection.updateOne({ _id: test_id }, { $set: { name: 't2' } }); + context.startStreaming(); + + const data = await context.getBucketData('global[]'); + expect(data.length).toEqual(2); + const row = JSON.parse(data[0].data as string); + delete row.description; + expect(row).toEqual({ id: test_id.toHexString() }); + delete data[0].data; + expect(data[0]).toMatchObject({ + object_id: test_id.toHexString(), + object_type: 'test_data', + op: 'PUT', + op_id: '1' + }); + }) + ); + + test( + 'table not in sync rules', + walStreamTest(factory, async (context) => { + const { db } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await context.replicateSnapshot(); + + context.startStreaming(); + + const collection = db.collection('test_donotsync'); + const result = await collection.insertOne({ description: 'test' }); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + }) + ); +} diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts new file mode 100644 index 000000000..19148695e --- /dev/null +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -0,0 +1,151 @@ +import { ActiveCheckpoint, BucketStorageFactory, OpId, SyncRulesBucketStorage } from '@powersync/service-core'; + +import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; +import { fromAsync } from '@core-tests/stream_utils.js'; +import { MongoManager } from '@module/replication/MongoManager.js'; +import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js'; +import * as mongo from 'mongodb'; +import { createCheckpoint } from '@module/replication/MongoRelation.js'; + +/** + * Tests operating on the wal stream need to configure the stream and manage asynchronous + * replication, which gets a little tricky. + * + * This wraps a test in a function that configures all the context, and tears it down afterwards. + */ +export function walStreamTest( + factory: () => Promise, + test: (context: ChangeStreamTestContext) => Promise +): () => Promise { + return async () => { + const f = await factory(); + const connectionManager = new MongoManager(TEST_CONNECTION_OPTIONS); + + await clearTestDb(connectionManager.db); + const context = new ChangeStreamTestContext(f, connectionManager); + try { + await test(context); + } finally { + await context.dispose(); + } + }; +} + +export class ChangeStreamTestContext { + private _walStream?: ChangeStream; + private abortController = new AbortController(); + private streamPromise?: Promise; + public storage?: SyncRulesBucketStorage; + + constructor( + public factory: BucketStorageFactory, + public connectionManager: MongoManager + ) {} + + async dispose() { + this.abortController.abort(); + await this.streamPromise?.catch((e) => e); + await this.connectionManager.destroy(); + } + + get client() { + return this.connectionManager.client; + } + + get db() { + return this.connectionManager.db; + } + + get connectionTag() { + return this.connectionManager.connectionTag; + } + + async updateSyncRules(content: string) { + const syncRules = await this.factory.updateSyncRules({ content: content }); + this.storage = this.factory.getInstance(syncRules); + return this.storage!; + } + + get walStream() { + if (this.storage == null) { + throw new Error('updateSyncRules() first'); + } + if (this._walStream) { + return this._walStream; + } + const options: ChangeStreamOptions = { + storage: this.storage, + connections: this.connectionManager, + abort_signal: this.abortController.signal + }; + this._walStream = new ChangeStream(options); + return this._walStream!; + } + + async replicateSnapshot() { + await this.walStream.initReplication(); + await this.storage!.autoActivate(); + } + + startStreaming() { + this.streamPromise = this.walStream.streamChanges(); + } + + async getCheckpoint(options?: { timeout?: number }) { + let checkpoint = await Promise.race([ + getClientCheckpoint(this.client, this.db, this.factory, { timeout: options?.timeout ?? 15_000 }), + this.streamPromise + ]); + if (typeof checkpoint == undefined) { + // This indicates an issue with the test setup - streamingPromise completed instead + // of getClientCheckpoint() + throw new Error('Test failure - streamingPromise completed'); + } + return checkpoint as string; + } + + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { + let checkpoint = await this.getCheckpoint(options); + const map = new Map(Object.entries(buckets)); + return fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + } + + async getBucketData(bucket: string, start?: string, options?: { timeout?: number }) { + start ??= '0'; + let checkpoint = await this.getCheckpoint(options); + const map = new Map([[bucket, start]]); + const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batches = await fromAsync(batch); + return batches[0]?.batch.data ?? []; + } +} + +export async function getClientCheckpoint( + client: mongo.MongoClient, + db: mongo.Db, + bucketStorage: BucketStorageFactory, + options?: { timeout?: number } +): Promise { + const start = Date.now(); + const lsn = await createCheckpoint(client, db); + // This old API needs a persisted checkpoint id. + // Since we don't use LSNs anymore, the only way to get that is to wait. + + const timeout = options?.timeout ?? 50_000; + let lastCp: ActiveCheckpoint | null = null; + + while (Date.now() - start < timeout) { + const cp = await bucketStorage.getActiveCheckpoint(); + lastCp = cp; + if (!cp.hasSyncRules()) { + throw new Error('No sync rules available'); + } + if (cp.lsn && cp.lsn >= lsn) { + return cp.checkpoint; + } + + await new Promise((resolve) => setTimeout(resolve, 30)); + } + + throw new Error(`Timeout while waiting for checkpoint ${lsn}. Last checkpoint: ${lastCp?.lsn}`); +} diff --git a/modules/module-mongodb/test/src/env.ts b/modules/module-mongodb/test/src/env.ts new file mode 100644 index 000000000..e460c80b3 --- /dev/null +++ b/modules/module-mongodb/test/src/env.ts @@ -0,0 +1,7 @@ +import { utils } from '@powersync/lib-services-framework'; + +export const env = utils.collectEnvironmentVariables({ + MONGO_TEST_DATA_URL: utils.type.string.default('mongodb://localhost:27017/powersync_test_data'), + CI: utils.type.boolean.default('false'), + SLOW_TESTS: utils.type.boolean.default('false') +}); diff --git a/modules/module-mongodb/test/src/mongo_test.test.ts b/modules/module-mongodb/test/src/mongo_test.test.ts new file mode 100644 index 000000000..ed03ebcd9 --- /dev/null +++ b/modules/module-mongodb/test/src/mongo_test.test.ts @@ -0,0 +1,262 @@ +import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js'; +import { ChangeStream } from '@module/replication/ChangeStream.js'; +import { constructAfterRecord } from '@module/replication/MongoRelation.js'; +import { SqliteRow } from '@powersync/service-sync-rules'; +import * as mongo from 'mongodb'; +import { describe, expect, test } from 'vitest'; +import { clearTestDb, connectMongoData, TEST_CONNECTION_OPTIONS } from './util.js'; + +describe('mongo data types', () => { + async function setupTable(db: mongo.Db) { + await clearTestDb(db); + } + + async function insert(collection: mongo.Collection) { + await collection.insertMany([ + { + _id: 1 as any, + null: null, + text: 'text', + uuid: new mongo.UUID('baeb2514-4c57-436d-b3cc-c1256211656d'), + bool: true, + bytea: Buffer.from('test'), + int2: 1000, + int4: 1000000, + int8: 9007199254740993n, + float: 3.14 + }, + { _id: 2 as any, nested: { test: 'thing' } }, + { _id: 3 as any, date: new Date('2023-03-06 15:47+02') }, + { + _id: 4 as any, + timestamp: mongo.Timestamp.fromBits(123, 456), + objectId: mongo.ObjectId.createFromHexString('66e834cc91d805df11fa0ecb') + } + ]); + } + + async function insertNested(collection: mongo.Collection) { + await collection.insertMany([ + { + _id: 1 as any, + null: [null], + text: ['text'], + uuid: [new mongo.UUID('baeb2514-4c57-436d-b3cc-c1256211656d')], + bool: [true], + bytea: [Buffer.from('test')], + int2: [1000], + int4: [1000000], + int8: [9007199254740993n], + float: [3.14] + }, + { _id: 2 as any, nested: [{ test: 'thing' }] }, + { _id: 3 as any, date: [new Date('2023-03-06 15:47+02')] }, + { + _id: 10 as any, + timestamp: [mongo.Timestamp.fromBits(123, 456)], + objectId: [mongo.ObjectId.createFromHexString('66e834cc91d805df11fa0ecb')] + } + ]); + } + + function checkResults(transformed: Record[]) { + expect(transformed[0]).toMatchObject({ + _id: 1n, + text: 'text', + uuid: 'baeb2514-4c57-436d-b3cc-c1256211656d', + bool: 1n, + bytea: new Uint8Array([116, 101, 115, 116]), + int2: 1000n, + int4: 1000000n, + int8: 9007199254740993n, + float: 3.14, + null: null + }); + expect(transformed[1]).toMatchObject({ + _id: 2n, + nested: '{"test":"thing"}' + }); + + expect(transformed[2]).toMatchObject({ + _id: 3n, + date: '2023-03-06 13:47:00.000Z' + }); + + expect(transformed[3]).toMatchObject({ + _id: 4n, + objectId: '66e834cc91d805df11fa0ecb', + timestamp: 1958505087099n + }); + } + + function checkResultsNested(transformed: Record[]) { + expect(transformed[0]).toMatchObject({ + _id: 1n, + text: `["text"]`, + uuid: '["baeb2514-4c57-436d-b3cc-c1256211656d"]', + bool: '[1]', + bytea: '[null]', + int2: '[1000]', + int4: '[1000000]', + int8: `[9007199254740993]`, + float: '[3.14]', + null: '[null]' + }); + + // Note: Depending on to what extent we use the original postgres value, the whitespace may change, and order may change. + // We do expect that decimals and big numbers are preserved. + expect(transformed[1]).toMatchObject({ + _id: 2n, + nested: '[{"test":"thing"}]' + }); + + expect(transformed[2]).toMatchObject({ + _id: 3n, + date: '["2023-03-06 13:47:00.000Z"]' + }); + + expect(transformed[3]).toMatchObject({ + _id: 10n, + objectId: '["66e834cc91d805df11fa0ecb"]', + timestamp: '[1958505087099]' + }); + } + + test('test direct queries', async () => { + const { db, client } = await connectMongoData(); + const collection = db.collection('test_data'); + try { + await setupTable(db); + + await insert(collection); + + const transformed = [...ChangeStream.getQueryData(await db.collection('test_data').find().toArray())]; + + checkResults(transformed); + } finally { + await client.close(); + } + }); + + test('test direct queries - arrays', async () => { + const { db, client } = await connectMongoData(); + const collection = db.collection('test_data_arrays'); + try { + await setupTable(db); + + await insertNested(collection); + + const transformed = [...ChangeStream.getQueryData(await db.collection('test_data_arrays').find().toArray())]; + + checkResultsNested(transformed); + } finally { + await client.close(); + } + }); + + test('test replication', async () => { + // With MongoDB, replication uses the exact same document format + // as normal queries. We test it anyway. + const { db, client } = await connectMongoData(); + const collection = db.collection('test_data'); + try { + await setupTable(db); + + const stream = db.watch([], { + useBigInt64: true, + maxAwaitTimeMS: 50, + fullDocument: 'updateLookup' + }); + + await stream.tryNext(); + + await insert(collection); + + const transformed = await getReplicationTx(stream, 4); + + checkResults(transformed); + } finally { + await client.close(); + } + }); + + test('test replication - arrays', async () => { + const { db, client } = await connectMongoData(); + const collection = db.collection('test_data'); + try { + await setupTable(db); + + const stream = db.watch([], { + useBigInt64: true, + maxAwaitTimeMS: 50, + fullDocument: 'updateLookup' + }); + + await stream.tryNext(); + + await insertNested(collection); + + const transformed = await getReplicationTx(stream, 4); + + checkResultsNested(transformed); + } finally { + await client.close(); + } + }); + + test('connection schema', async () => { + const adapter = new MongoRouteAPIAdapter({ + type: 'mongodb', + ...TEST_CONNECTION_OPTIONS + }); + try { + const db = adapter.db; + await clearTestDb(db); + + const collection = db.collection('test_data'); + await setupTable(db); + await insert(collection); + + const schema = await adapter.getConnectionSchema(); + const dbSchema = schema.filter((s) => s.name == TEST_CONNECTION_OPTIONS.database)[0]; + expect(dbSchema).not.toBeNull(); + expect(dbSchema.tables).toEqual([ + { + name: 'test_data', + columns: [ + { name: '_id', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'bool', sqlite_type: 4, internal_type: 'Boolean' }, + { name: 'bytea', sqlite_type: 1, internal_type: 'Binary' }, + { name: 'date', sqlite_type: 2, internal_type: 'Date' }, + { name: 'float', sqlite_type: 8, internal_type: 'Double' }, + { name: 'int2', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int4', sqlite_type: 4, internal_type: 'Integer' }, + { name: 'int8', sqlite_type: 4, internal_type: 'Long' }, + { name: 'nested', sqlite_type: 2, internal_type: 'Object' }, + { name: 'null', sqlite_type: 0, internal_type: 'Null' }, + { name: 'objectId', sqlite_type: 2, internal_type: 'ObjectId' }, + { name: 'text', sqlite_type: 2, internal_type: 'String' }, + { name: 'timestamp', sqlite_type: 4, internal_type: 'Timestamp' }, + { name: 'uuid', sqlite_type: 2, internal_type: 'UUID' } + ] + } + ]); + } finally { + await adapter.shutdown(); + } + }); +}); + +/** + * Return all the inserts from the first transaction in the replication stream. + */ +async function getReplicationTx(replicationStream: mongo.ChangeStream, count: number) { + let transformed: SqliteRow[] = []; + for await (const doc of replicationStream) { + transformed.push(constructAfterRecord((doc as any).fullDocument)); + if (transformed.length == count) { + break; + } + } + return transformed; +} diff --git a/modules/module-mongodb/test/src/setup.ts b/modules/module-mongodb/test/src/setup.ts new file mode 100644 index 000000000..b924cf736 --- /dev/null +++ b/modules/module-mongodb/test/src/setup.ts @@ -0,0 +1,7 @@ +import { container } from '@powersync/lib-services-framework'; +import { beforeAll } from 'vitest'; + +beforeAll(() => { + // Executes for every test file + container.registerDefaults(); +}); diff --git a/modules/module-mongodb/test/src/util.ts b/modules/module-mongodb/test/src/util.ts new file mode 100644 index 000000000..a101f77a5 --- /dev/null +++ b/modules/module-mongodb/test/src/util.ts @@ -0,0 +1,52 @@ +import * as types from '@module/types/types.js'; +import { BucketStorageFactory, Metrics, MongoBucketStorage, OpId } from '@powersync/service-core'; + +import { env } from './env.js'; +import { logger } from '@powersync/lib-services-framework'; +import { connectMongo } from '@core-tests/util.js'; +import * as mongo from 'mongodb'; + +// The metrics need to be initialized before they can be used +await Metrics.initialise({ + disable_telemetry_sharing: true, + powersync_instance_id: 'test', + internal_metrics_endpoint: 'unused.for.tests.com' +}); +Metrics.getInstance().resetCounters(); + +export const TEST_URI = env.MONGO_TEST_DATA_URL; + +export const TEST_CONNECTION_OPTIONS = types.normalizeConnectionConfig({ + type: 'mongodb', + uri: TEST_URI +}); + +export type StorageFactory = () => Promise; + +export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => { + const db = await connectMongo(); + + // None of the PG tests insert data into this collection, so it was never created + if (!(await db.db.listCollections({ name: db.bucket_parameters.collectionName }).hasNext())) { + await db.db.createCollection('bucket_parameters'); + } + + await db.clear(); + + return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }); +}; + +export async function clearTestDb(db: mongo.Db) { + await db.dropDatabase(); +} + +export async function connectMongoData() { + const client = new mongo.MongoClient(env.MONGO_TEST_DATA_URL, { + connectTimeoutMS: env.CI ? 15_000 : 5_000, + socketTimeoutMS: env.CI ? 15_000 : 5_000, + serverSelectionTimeoutMS: env.CI ? 15_000 : 2_500, + useBigInt64: true + }); + const dbname = new URL(env.MONGO_TEST_DATA_URL).pathname.substring(1); + return { client, db: client.db(dbname) }; +} diff --git a/modules/module-mongodb/test/tsconfig.json b/modules/module-mongodb/test/tsconfig.json new file mode 100644 index 000000000..18898c4ee --- /dev/null +++ b/modules/module-mongodb/test/tsconfig.json @@ -0,0 +1,28 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "baseUrl": "./", + "noEmit": true, + "esModuleInterop": true, + "skipLibCheck": true, + "sourceMap": true, + "paths": { + "@/*": ["../../../packages/service-core/src/*"], + "@module/*": ["../src/*"], + "@core-tests/*": ["../../../packages/service-core/test/src/*"] + } + }, + "include": ["src"], + "references": [ + { + "path": "../" + }, + { + "path": "../../../packages/service-core/test" + }, + { + "path": "../../../packages/service-core/" + } + ] +} diff --git a/modules/module-mongodb/tsconfig.json b/modules/module-mongodb/tsconfig.json new file mode 100644 index 000000000..6afdde02f --- /dev/null +++ b/modules/module-mongodb/tsconfig.json @@ -0,0 +1,28 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "rootDir": "src", + "outDir": "dist", + "esModuleInterop": true, + "skipLibCheck": true, + "sourceMap": true + }, + "include": ["src"], + "references": [ + { + "path": "../../packages/types" + }, + { + "path": "../../packages/jsonbig" + }, + { + "path": "../../packages/sync-rules" + }, + { + "path": "../../packages/service-core" + }, + { + "path": "../../libs/lib-services" + } + ] +} diff --git a/modules/module-mongodb/vitest.config.ts b/modules/module-mongodb/vitest.config.ts new file mode 100644 index 000000000..b392696b7 --- /dev/null +++ b/modules/module-mongodb/vitest.config.ts @@ -0,0 +1,9 @@ +import { defineConfig } from 'vitest/config'; +import tsconfigPaths from 'vite-tsconfig-paths'; + +export default defineConfig({ + plugins: [tsconfigPaths()], + test: { + setupFiles: './test/src/setup.ts' + } +}); diff --git a/packages/service-core/package.json b/packages/service-core/package.json index ec7b14cd3..9001441b9 100644 --- a/packages/service-core/package.json +++ b/packages/service-core/package.json @@ -40,6 +40,7 @@ "mongodb": "^6.7.0", "node-fetch": "^3.3.2", "ts-codec": "^1.2.2", + "uri-js": "^4.4.1", "uuid": "^9.0.1", "winston": "^3.13.0", "yaml": "^2.3.2" diff --git a/packages/service-core/src/api/RouteAPI.ts b/packages/service-core/src/api/RouteAPI.ts index 393d56342..bbe92180c 100644 --- a/packages/service-core/src/api/RouteAPI.ts +++ b/packages/service-core/src/api/RouteAPI.ts @@ -44,7 +44,7 @@ export interface RouteAPI { * replicated yet, in bytes. * @param {string} syncRulesId An identifier representing which set of sync rules the lag is required for. */ - getReplicationLag(syncRulesId: string): Promise; + getReplicationLag(syncRulesId: string): Promise; /** * Get the current LSN or equivalent replication HEAD position identifier @@ -54,9 +54,6 @@ export interface RouteAPI { /** * @returns The schema for tables inside the connected database. This is typically * used to validate sync rules. - * Side Note: https://github.com/powersync-ja/powersync-service/blob/33bbb8c0ab1c48555956593f427fc674a8f15768/packages/types/src/definitions.ts#L100 - * contains `pg_type` which we might need to deprecate and add another generic - * type field - or just use this field as the connection specific type. */ getConnectionSchema(): Promise; diff --git a/packages/service-core/src/api/schema.ts b/packages/service-core/src/api/schema.ts index aff6d770f..64b48e1a0 100644 --- a/packages/service-core/src/api/schema.ts +++ b/packages/service-core/src/api/schema.ts @@ -5,7 +5,9 @@ import * as api from '../api/api-index.js'; export async function getConnectionsSchema(api: api.RouteAPI): Promise { if (!api) { return { - connections: [] + connections: [], + defaultConnectionTag: 'default', + defaultSchema: '' }; } @@ -18,6 +20,8 @@ export async function getConnectionsSchema(api: api.RouteAPI): Promise; + /** + * Get the last checkpoint LSN, from either commit or keepalive. + */ + lastCheckpointLsn: string | null; + markSnapshotDone(tables: SourceTable[], no_checkpoint_before_lsn: string): Promise; /** diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 30f9121bf..f0f95cd4e 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -90,6 +90,10 @@ export class MongoBucketBatch extends DisposableObserver { let result: FlushedResult | null = null; // One flush may be split over multiple transactions. @@ -555,7 +559,7 @@ export class MongoBucketBatch extends DisposableObserver { await this.flush(); - if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) { + if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { // When re-applying transactions, don't create a new checkpoint until // we are past the last transaction. logger.info(`Re-applied transaction ${lsn} - skipping checkpoint`); diff --git a/packages/service-core/src/storage/mongo/config.ts b/packages/service-core/src/storage/mongo/config.ts new file mode 100644 index 000000000..8ff241e25 --- /dev/null +++ b/packages/service-core/src/storage/mongo/config.ts @@ -0,0 +1,40 @@ +import * as urijs from 'uri-js'; + +export interface MongoConnectionConfig { + uri: string; + username?: string; + password?: string; + database?: string; +} + +/** + * Validate and normalize connection options. + * + * Returns destructured options. + * + * For use by both storage and mongo module. + */ +export function normalizeMongoConfig(options: MongoConnectionConfig) { + let uri = urijs.parse(options.uri); + + const database = options.database ?? uri.path?.substring(1) ?? ''; + + const userInfo = uri.userinfo?.split(':'); + + const username = options.username ?? userInfo?.[0]; + const password = options.password ?? userInfo?.[1]; + + if (database == '') { + throw new Error(`database required`); + } + + delete uri.userinfo; + + return { + uri: urijs.serialize(uri), + database, + + username, + password + }; +} diff --git a/packages/service-core/src/storage/storage-index.ts b/packages/service-core/src/storage/storage-index.ts index 076248882..b58576639 100644 --- a/packages/service-core/src/storage/storage-index.ts +++ b/packages/service-core/src/storage/storage-index.ts @@ -17,4 +17,5 @@ export * from './mongo/MongoSyncRulesLock.js'; export * from './mongo/OperationBatch.js'; export * from './mongo/PersistedBatch.js'; export * from './mongo/util.js'; +export * from './mongo/config.js'; export * from './write-checkpoint.js'; diff --git a/packages/sync-rules/src/TableQuerySchema.ts b/packages/sync-rules/src/TableQuerySchema.ts index c223371b3..5c68ae1e9 100644 --- a/packages/sync-rules/src/TableQuerySchema.ts +++ b/packages/sync-rules/src/TableQuerySchema.ts @@ -2,7 +2,10 @@ import { ColumnDefinition } from './ExpressionType.js'; import { QuerySchema, SourceSchemaTable } from './types.js'; export class TableQuerySchema implements QuerySchema { - constructor(private tables: SourceSchemaTable[], private alias: string) {} + constructor( + private tables: SourceSchemaTable[], + private alias: string + ) {} getColumn(table: string, column: string): ColumnDefinition | undefined { if (table != this.alias) { diff --git a/packages/sync-rules/src/events/SqlEventDescriptor.ts b/packages/sync-rules/src/events/SqlEventDescriptor.ts index a02cd42ef..c57dd7d09 100644 --- a/packages/sync-rules/src/events/SqlEventDescriptor.ts +++ b/packages/sync-rules/src/events/SqlEventDescriptor.ts @@ -14,7 +14,10 @@ export class SqlEventDescriptor { name: string; source_queries: SqlEventSourceQuery[] = []; - constructor(name: string, public idSequence: IdSequence) { + constructor( + name: string, + public idSequence: IdSequence + ) { this.name = name; } diff --git a/packages/types/src/definitions.ts b/packages/types/src/definitions.ts index d387fb87e..dc3bdf879 100644 --- a/packages/types/src/definitions.ts +++ b/packages/types/src/definitions.ts @@ -126,13 +126,13 @@ export const TableSchema = t.object({ * Full type name, e.g. "character varying(255)[]" * @deprecated - use internal_type */ - type: t.string.optional(), + type: t.string, /** * Internal postgres type, e.g. "varchar[]". * @deprecated - use internal_type instead */ - pg_type: t.string.optional() + pg_type: t.string }) ) }); @@ -151,6 +151,8 @@ export const InstanceSchema = t.object({ tag: t.string, schemas: t.array(DatabaseSchema) }) - ) + ), + defaultConnectionTag: t.string, + defaultSchema: t.string }); export type InstanceSchema = t.Encoded; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index eac450a03..d1c1a27f3 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -94,6 +94,49 @@ importers: specifier: ^2.1.1 version: 2.1.1(@types/node@22.5.5) + modules/module-mongodb: + dependencies: + '@powersync/lib-services-framework': + specifier: workspace:* + version: link:../../libs/lib-services + '@powersync/service-core': + specifier: workspace:* + version: link:../../packages/service-core + '@powersync/service-jsonbig': + specifier: workspace:* + version: link:../../packages/jsonbig + '@powersync/service-sync-rules': + specifier: workspace:* + version: link:../../packages/sync-rules + '@powersync/service-types': + specifier: workspace:* + version: link:../../packages/types + mongodb: + specifier: ^6.7.0 + version: 6.8.0(socks@2.8.3) + ts-codec: + specifier: ^1.2.2 + version: 1.2.2 + uri-js: + specifier: ^4.4.1 + version: 4.4.1 + uuid: + specifier: ^9.0.1 + version: 9.0.1 + devDependencies: + '@types/uuid': + specifier: ^9.0.4 + version: 9.0.8 + typescript: + specifier: ^5.2.2 + version: 5.6.2 + vite-tsconfig-paths: + specifier: ^4.3.2 + version: 4.3.2(typescript@5.6.2)(vite@5.3.3(@types/node@22.5.5)) + vitest: + specifier: ^0.34.6 + version: 0.34.6 + modules/module-postgres: dependencies: '@powersync/lib-services-framework': @@ -278,6 +321,9 @@ importers: ts-codec: specifier: ^1.2.2 version: 1.2.2 + uri-js: + specifier: ^4.4.1 + version: 4.4.1 uuid: specifier: ^9.0.1 version: 9.0.1 @@ -373,6 +419,9 @@ importers: '@powersync/service-jsonbig': specifier: workspace:* version: link:../packages/jsonbig + '@powersync/service-module-mongodb': + specifier: workspace:* + version: link:../modules/module-mongodb '@powersync/service-module-postgres': specifier: workspace:* version: link:../modules/module-postgres @@ -740,6 +789,10 @@ packages: resolution: {integrity: sha512-O8jcjabXaleOG9DQ0+ARXWZBTfnP4WNAqzuiJK7ll44AmxGKv/J2M4TPjxjY3znBCfvBXFzucm1twdyFybFqEA==} engines: {node: '>=12'} + '@jest/schemas@29.6.3': + resolution: {integrity: sha512-mo5j5X+jIZmJQveBKeS/clAueipV7KgiX1vMgCxam1RNYiqE1w62n0/tJJnHtjW8ZHcQco5gY85jA3mi0L+nSA==} + engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} + '@jridgewell/resolve-uri@3.1.2': resolution: {integrity: sha512-bRISgCIjP20/tbWSPWMEi54QVPRZExkuD9lJL+UIxUKtwVJA8wW1Trb1jMs1RFXo1CBTNZ/5hpC9QvmKWdopKw==} engines: {node: '>=6.0.0'} @@ -1196,6 +1249,9 @@ packages: resolution: {integrity: sha512-2bRovzs0nJZFlCN3rXirE4gwxCn97JNjMmwpecqlbgV9WcxX7WRuIrgzx/X7Ib7MYRbyUTpBYE0s2x6AmZXnlg==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} + '@sinclair/typebox@0.27.8': + resolution: {integrity: sha512-+Fj43pSMwJs4KRrH/938Uf+uAELIgVBmQzg/q1YG10djyfA3TnrU8N8XzqCh/okZdszqBQTZf96idMfE5lnwTA==} + '@sindresorhus/is@5.6.0': resolution: {integrity: sha512-TV7t8GKYaJWsn00tFDqBw8+Uqmr8A0fRU1tvTQhyZzGv0sJCGRQL3JGMI3ucuKo3XIZdUP+Lx7/gh2t3lewy7g==} engines: {node: '>=14.16'} @@ -1234,6 +1290,12 @@ packages: '@types/async@3.2.24': resolution: {integrity: sha512-8iHVLHsCCOBKjCF2KwFe0p9Z3rfM9mL+sSP8btyR5vTjJRAqpBYD28/ZLgXPf0pjG1VxOvtCV/BgXkQbpSe8Hw==} + '@types/chai-subset@1.3.5': + resolution: {integrity: sha512-c2mPnw+xHtXDoHmdtcCXGwyLMiauiAyxWMzhGpqHC4nqI/Y5G2XhTampslK2rb59kpcuHon03UH8W6iYUzw88A==} + + '@types/chai@4.3.20': + resolution: {integrity: sha512-/pC9HAB5I/xMlc5FP77qjCnI16ChlJfW0tGa0IUcFn38VJrTV6DeZ60NU5KZBtaOZqjdpwTWohz5HU1RrhiYxQ==} + '@types/connect@3.4.36': resolution: {integrity: sha512-P63Zd/JUGq+PdrM1lv0Wv5SBYeA2+CORvbrXbngriYY0jzLUWfQMQQxOhjONEz/wlHOAxOdY7CY65rgQdTjq2w==} @@ -1297,6 +1359,9 @@ packages: '@types/ws@8.2.3': resolution: {integrity: sha512-ahRJZquUYCdOZf/rCsWg88S0/+cb9wazUBHv6HZEe3XdYaBe2zr/slM8J28X07Hn88Pnm4ezo7N8/ofnOgrPVQ==} + '@vitest/expect@0.34.6': + resolution: {integrity: sha512-QUzKpUQRc1qC7qdGo7rMK3AkETI7w18gTCUrsNnyjjJKYiuUB9+TQK3QnR1unhCnWRC0AbKv2omLGQDF/mIjOw==} + '@vitest/expect@2.1.1': resolution: {integrity: sha512-YeueunS0HiHiQxk+KEOnq/QMzlUuOzbU1Go+PgAsHvvv3tUkJPm9xWt+6ITNTlzsMXUjmgm5T+U7KBPK2qQV6w==} @@ -1315,15 +1380,27 @@ packages: '@vitest/pretty-format@2.1.1': resolution: {integrity: sha512-SjxPFOtuINDUW8/UkElJYQSFtnWX7tMksSGW0vfjxMneFqxVr8YJ979QpMbDW7g+BIiq88RAGDjf7en6rvLPPQ==} + '@vitest/runner@0.34.6': + resolution: {integrity: sha512-1CUQgtJSLF47NnhN+F9X2ycxUP0kLHQ/JWvNHbeBfwW8CzEGgeskzNnHDyv1ieKTltuR6sdIHV+nmR6kPxQqzQ==} + '@vitest/runner@2.1.1': resolution: {integrity: sha512-uTPuY6PWOYitIkLPidaY5L3t0JJITdGTSwBtwMjKzo5O6RCOEncz9PUN+0pDidX8kTHYjO0EwUIvhlGpnGpxmA==} + '@vitest/snapshot@0.34.6': + resolution: {integrity: sha512-B3OZqYn6k4VaN011D+ve+AA4whM4QkcwcrwaKwAbyyvS/NB1hCWjFIBQxAQQSQir9/RtyAAGuq+4RJmbn2dH4w==} + '@vitest/snapshot@2.1.1': resolution: {integrity: sha512-BnSku1WFy7r4mm96ha2FzN99AZJgpZOWrAhtQfoxjUU5YMRpq1zmHRq7a5K9/NjqonebO7iVDla+VvZS8BOWMw==} + '@vitest/spy@0.34.6': + resolution: {integrity: sha512-xaCvneSaeBw/cz8ySmF7ZwGvL0lBjfvqc1LpQ/vcdHEvpLn3Ff1vAvjw+CoGn0802l++5L/pxb7whwcWAw+DUQ==} + '@vitest/spy@2.1.1': resolution: {integrity: sha512-ZM39BnZ9t/xZ/nF4UwRH5il0Sw93QnZXd9NAZGRpIgj0yvVwPpLd702s/Cx955rGaMlyBQkZJ2Ir7qyY48VZ+g==} + '@vitest/utils@0.34.6': + resolution: {integrity: sha512-IG5aDD8S6zlvloDsnzHw0Ut5xczlF+kv2BOTo+iXfPr54Yhi5qbVOgGB1hZaVq4iJ4C/MZ2J0y15IlsV/ZcI0A==} + '@vitest/utils@2.1.1': resolution: {integrity: sha512-Y6Q9TsI+qJ2CC0ZKj6VBb+T8UPz593N113nnUykqwANqhgf3QkZeHFlusgKLTqrnVHbj/XDKZcDHol+dxVT+rQ==} @@ -1414,6 +1491,10 @@ packages: resolution: {integrity: sha512-zbB9rCJAT1rbjiVDb2hqKFHNYLxgtk8NURxZ3IZwD3F6NtxbXZQCnnSi1Lkx+IDohdPlFp222wVALIheZJQSEg==} engines: {node: '>=8'} + ansi-styles@5.2.0: + resolution: {integrity: sha512-Cxwpt2SfTzTtXcfOlzGEee8O+c+MmUgGrNiBcXnuWxuFJHe6a5Hz7qwhwe5OgaSYI0IJvkLqWX1ASG+cJOkEiA==} + engines: {node: '>=10'} + ansi-styles@6.2.1: resolution: {integrity: sha512-bN798gFfQX+viw3R7yrGWRqnrN2oRkEkUjjl4JNn4E8GxxbjtG3FbrEIIY3l8/hrwUwIeCZvi4QuOTP4MErVug==} engines: {node: '>=12'} @@ -1443,6 +1524,9 @@ packages: resolution: {integrity: sha512-HGyxoOTYUyCM6stUe6EJgnd4EoewAI7zMdfqO+kGjnlZmBDz/cR5pf8r/cR4Wq60sL/p0IkcjUEEPwS3GFrIyw==} engines: {node: '>=8'} + assertion-error@1.1.0: + resolution: {integrity: sha512-jgsaNduz+ndvGyFt3uSuWqvy4lCnIJiovtouQN5JZHOKCS2QuhEdbcQHFhVksz2N2U9hXJo8odG7ETyWlEeuDw==} + assertion-error@2.0.1: resolution: {integrity: sha512-Izi8RQcffqCeNVgFigKli1ssklIbpHnCYc6AknXGYoB6grJqyeby7jv12JUQgmTAnIDnbck1uxksT4dzN3PWBA==} engines: {node: '>=12'} @@ -1534,6 +1618,10 @@ packages: resolution: {integrity: sha512-xlx1yCK2Oc1APsPXDL2LdlNP6+uu8OCDdhOBSVT279M/S+y75O30C2VuD8T2ogdePBBl7PfPF4504tnLgX3zfw==} engines: {node: '>=14.16'} + chai@4.5.0: + resolution: {integrity: sha512-RITGBfijLkBddZvnn8jdqoTypxvqbOLYQkGGxXzeFjVHvudaPw0HNFD9x928/eUwYWd2dPCugVqspGALTZZQKw==} + engines: {node: '>=4'} + chai@5.1.1: resolution: {integrity: sha512-pT1ZgP8rPNqUgieVaEY+ryQr6Q4HXNg8Ei9UnLUrjN4IA7dvQC5JB+/kxVcPNDHyBcc/26CXPkbNzq3qwrOEKA==} engines: {node: '>=12'} @@ -1553,6 +1641,9 @@ packages: chardet@0.7.0: resolution: {integrity: sha512-mT8iDcrh03qDGRRmoA2hmBJnxpllMR+0/0qlzjqZES6NdiWDcZkCNAk4rPFZ9Q85r27unkiNNg8ZOiwZXBHwcA==} + check-error@1.0.3: + resolution: {integrity: sha512-iKEoDYaRmd1mxM90a2OEfWhjsjPpYPuQ+lMYsoxB126+t8fw7ySEO48nmDg5COTjxDI65/Y2OWpeEHk3ZOe8zg==} + check-error@2.1.1: resolution: {integrity: sha512-OAlb+T7V4Op9OwdkjmguYRqncdlx5JiofwOAUkmTF+jNdHwzTaTs4sRAGpzLF3oOz5xAyDGrPgeIDFQmDOTiJw==} engines: {node: '>= 16'} @@ -1652,6 +1743,9 @@ packages: engines: {node: ^14.13.0 || >=16.0.0} hasBin: true + confbox@0.1.7: + resolution: {integrity: sha512-uJcB/FKZtBMCJpK8MQji6bJHgu1tixKPxRLeGkNzBoOZzpnZUJm0jm2/sBDWcuBx1dYgxV4JU+g5hmNxCyAmdA==} + config-chain@1.1.13: resolution: {integrity: sha512-qj+f8APARXHrM0hraqXYb2/bOVSV4PvJQlNZ/DVj0QrmNM2q2euizkeuVckQ57J+W0mRH6Hvi+k50M4Jul2VRQ==} @@ -1724,6 +1818,10 @@ packages: resolution: {integrity: sha512-aW35yZM6Bb/4oJlZncMH2LCoZtJXTRxES17vE3hoRiowU2kWHaJKFkSBDnDR+cm9J+9QhXmREyIfv0pji9ejCQ==} engines: {node: '>=10'} + deep-eql@4.1.4: + resolution: {integrity: sha512-SUwdGfqdKOwxCPeVYjwSyRpJ7Z+fhpwIAtmCUdZIWZ/YP5R9WAsyuSgpLVDi9bjWoN2LXHNss/dk3urXtdQxGg==} + engines: {node: '>=6'} + deep-eql@5.0.2: resolution: {integrity: sha512-h5k/5U50IJJFpzfL6nO9jaaumfjO/f2NjK/oYB2Djzm4p9L+3T9qWpZqZ2hAbLPuuYq9wrU08WQyBTL5GbPk5Q==} engines: {node: '>=6'} @@ -1746,6 +1844,10 @@ packages: resolution: {integrity: sha512-reYkTUJAZb9gUuZ2RvVCNhVHdg62RHnJ7WJl8ftMi4diZ6NWlciOzQN88pUhSELEwflJht4oQDv0F0BMlwaYtA==} engines: {node: '>=8'} + diff-sequences@29.6.3: + resolution: {integrity: sha512-EjePK1srD3P08o2j4f0ExnylqRs5B9tJjcp9t1krH2qRi8CCdsYfwe9JgSLurFBWwq4uOlipzfk5fHNvwFKr8Q==} + engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} + diff@4.0.2: resolution: {integrity: sha512-58lmxKSA4BNyLz+HHMUzlOEpg09FV+ev6ZMe3vJihgdxzgcwZ8VoEEPmALCZG9LmqfVoNMMKpttIYTVG6uDY7A==} engines: {node: '>=0.3.1'} @@ -2308,6 +2410,10 @@ packages: light-my-request@5.13.0: resolution: {integrity: sha512-9IjUN9ZyCS9pTG+KqTDEQo68Sui2lHsYBrfMyVUTTZ3XhH8PMZq7xO94Kr+eP9dhi/kcKsx4N41p2IXEBil1pQ==} + local-pkg@0.4.3: + resolution: {integrity: sha512-SFppqq5p42fe2qcZQqqEOiVRXl+WCP1MdT6k7BDEW1j++sp5fIY+/fdRQitvKgB5BrBcmrs5m/L0v2FrU5MY1g==} + engines: {node: '>=14'} + locate-path@5.0.0: resolution: {integrity: sha512-t7hw9pI+WvuwNJXwk5zVHpyhIqzg2qTlklJOf0mVxGSbe3Fp2VieZcduNYjaLDoy6p9uGpQEGWG87WpMKlNq8g==} engines: {node: '>=8'} @@ -2336,6 +2442,9 @@ packages: lossless-json@2.0.11: resolution: {integrity: sha512-BP0vn+NGYvzDielvBZaFain/wgeJ1hTvURCqtKvhr1SCPePdaaTanmmcplrHfEJSJOUql7hk4FHwToNJjWRY3g==} + loupe@2.3.7: + resolution: {integrity: sha512-zSMINGVYkdpYSOBmLi0D1Uo7JU9nVdQKrHxC8eYlV+9YKK9WePqAlL7lSlorG/U2Fw1w0hTBmaa/jrQ3UbPHtA==} + loupe@3.1.1: resolution: {integrity: sha512-edNu/8D5MKVfGVFRhFf8aAxiTM6Wumfz5XsaatSxlD3w4R1d/WEKUTydCdPGbl9K7QG/Ca3GnDV2sIKIpXRQcw==} @@ -2455,6 +2564,9 @@ packages: engines: {node: '>=10'} hasBin: true + mlly@1.7.1: + resolution: {integrity: sha512-rrVRZRELyQzrIUAVMHxP97kv+G786pHmOKzuFII8zDYahFBS7qnHh2AlYSl1GAHhaMPCz6/oHjVMcfFYgFYHgA==} + mnemonist@0.39.5: resolution: {integrity: sha512-FPUtkhtJ0efmEFGpU14x7jGbTB+s18LrzRL2KgoWz9YvcY3cPomz8tih01GbHwnGk/OmkOKfqd/RAQoc8Lm7DQ==} @@ -2659,6 +2771,10 @@ packages: resolution: {integrity: sha512-TYOanM3wGwNGsZN2cVTYPArw454xnXj5qmWF1bEoAc4+cU/ol7GVh7odevjp1FNHduHc3KZMcFduxU5Xc6uJRQ==} engines: {node: '>=10'} + p-limit@4.0.0: + resolution: {integrity: sha512-5b0R4txpzjPWVw/cXXUResoD4hb6U/x9BH08L7nw+GN1sezDzPdxeRvpc9c433fZhBan/wusjbCsqwqm4EIBIQ==} + engines: {node: ^12.20.0 || ^14.13.1 || >=16.0.0} + p-locate@4.1.0: resolution: {integrity: sha512-R79ZZ/0wAxKGu3oYMlz8jy/kbhsNrS7SKZ7PxEHBgJ5+F2mtFW2fK2cOtBh1cHYkQsbzFV7I+EoRKe6Yt0oK7A==} engines: {node: '>=8'} @@ -2725,6 +2841,9 @@ packages: pathe@1.1.2: resolution: {integrity: sha512-whLdWMYL2TwI08hn8/ZqAbrVemu0LNaNNJZX73O6qaIdCTfXutsLhMkjdENX0qhsQ9uIimo4/aQOmXkoon2nDQ==} + pathval@1.1.1: + resolution: {integrity: sha512-Dp6zGqpTdETdR63lehJYPeIOqpiNBNtc7BpWSLrOje7UaIsE5aY92r/AunQA7rsXvet3lrJ3JnZX29UPTKXyKQ==} + pathval@2.0.0: resolution: {integrity: sha512-vE7JKRyES09KiunauX7nd2Q9/L7lhok4smP9RZTDeD4MVs72Dp2qNFVz39Nz5a0FVEW0BJR6C0DYrq6unoziZA==} engines: {node: '>= 14.16'} @@ -2775,6 +2894,9 @@ packages: resolution: {integrity: sha512-ip4qdzjkAyDDZklUaZkcRFb2iA118H9SgRh8yzTkSQK8HilsOJF7rSY8HoW5+I0M46AZgX/pxbprf2vvzQCE0Q==} hasBin: true + pkg-types@1.2.0: + resolution: {integrity: sha512-+ifYuSSqOQ8CqP4MbZA5hDpb97n3E8SVWdJe+Wms9kj745lmd3b7EZJiqvmLwAlmRfjrI7Hi5z3kdBJ93lFNPA==} + postcss@8.4.39: resolution: {integrity: sha512-0vzE+lAiG7hZl1/9I8yzKLx3aR9Xbof3fBHKunvMfOCYAtMhrsnccJY2iTURb9EZd5+pLuiNV9/c/GZJOHsgIw==} engines: {node: ^10 || ^12 || >=14} @@ -2805,6 +2927,10 @@ packages: engines: {node: '>=14'} hasBin: true + pretty-format@29.7.0: + resolution: {integrity: sha512-Pdlw/oPxN+aXdmM9R00JVC9WVFoCLTKJvDVLgmJ+qAffBMxsV85l/Lu7sNx4zSzPyoL2euImuEwHhOXdEgNFZQ==} + engines: {node: ^14.15.0 || ^16.10.0 || >=18.0.0} + proc-log@3.0.0: resolution: {integrity: sha512-++Vn7NS4Xf9NacaU9Xq3URUuqZETPsf8L4j5/ckhaRYsfPeRyzGw+iDjFhV/Jr3uNmTvvddEJFWh5R1gRgUH8A==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} @@ -2892,6 +3018,9 @@ packages: resolution: {integrity: sha512-y3bGgqKj3QBdxLbLkomlohkvsA8gdAiUQlSBJnBhfn+BPxg4bc62d8TcBW15wavDfgexCgccckhcZvywyQYPOw==} hasBin: true + react-is@18.3.1: + resolution: {integrity: sha512-/LLMVyas0ljjAtoYiPqYiL8VWXzUUdThrmU5+n20DZv+a+ClRoevUzw5JxU+Ieh5/c87ytoTBV9G1FiKfNJdmg==} + read-package-json-fast@3.0.2: resolution: {integrity: sha512-0J+Msgym3vrLOUB3hzQCuZHII0xkNGCtz/HJH9xZshwv9DbDwkw1KaE3gx/e2J5rpEY5rtOy6cyhKOPrkP7FZw==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} @@ -3241,6 +3370,9 @@ packages: resolution: {integrity: sha512-0fk9zBqO67Nq5M/m45qHCJxylV/DhBlIOVExqgOMiCCrzrhU6tCibRXNqE3jwJLftzE9SNuZtYbpzcO+i9FiKw==} engines: {node: '>=14.16'} + strip-literal@1.3.0: + resolution: {integrity: sha512-PugKzOsyXpArk0yWmUwqOZecSO0GH0bPoctLcqNDH9J04pVW3lflYE0ujElBGTloevcxF5MofAOZ7C5l2b+wLg==} + supports-color@5.5.0: resolution: {integrity: sha512-QjVjwdXIt408MIiAqCX4oUKsgU2EqAGzs2Ppkm4aQYbjm+ZEWEcW4SfFNTr4uMNZma0ey4f5lgLrkB0aX0QMow==} engines: {node: '>=4'} @@ -3283,6 +3415,10 @@ packages: tinyexec@0.3.0: resolution: {integrity: sha512-tVGE0mVJPGb0chKhqmsoosjsS+qUnJVGJpZgsHYQcGoPlG3B51R3PouqTgEGH2Dc9jjFyOqOpix6ZHNMXp1FZg==} + tinypool@0.7.0: + resolution: {integrity: sha512-zSYNUlYSMhJ6Zdou4cJwo/p7w5nmAH17GRfU/ui3ctvjXFErXXkruT4MWW6poDeXgCaIBlGLrfU6TbTXxyGMww==} + engines: {node: '>=14.0.0'} + tinypool@1.0.1: resolution: {integrity: sha512-URZYihUbRPcGv95En+sz6MfghfIc2OJ1sv/RmhWZLouPY0/8Vo80viwPvg3dlaS9fuq7fQMEfgRRK7BBZThBEA==} engines: {node: ^18.0.0 || >=20.0.0} @@ -3291,6 +3427,10 @@ packages: resolution: {integrity: sha512-weEDEq7Z5eTHPDh4xjX789+fHfF+P8boiFB+0vbWzpbnbsEr/GRaohi/uMKxg8RZMXnl1ItAi/IUHWMsjDV7kQ==} engines: {node: '>=14.0.0'} + tinyspy@2.2.1: + resolution: {integrity: sha512-KYad6Vy5VDWV4GH3fjpseMQ/XU2BhIYP7Vzd0LG44qRWm/Yt2WCOTicFdvmgo6gWaqooMQCawTtILVQJupKu7A==} + engines: {node: '>=14.0.0'} + tinyspy@3.0.2: resolution: {integrity: sha512-n1cw8k1k0x4pgA2+9XrOkFydTerNcJ1zWCO5Nn9scWHTD+5tp8dghT2x1uduQePZTZgd3Tupf+x9BxJjeJi77Q==} engines: {node: '>=14.0.0'} @@ -3378,6 +3518,10 @@ packages: resolution: {integrity: sha512-i3P9Kgw3ytjELUfpuKVDNBJvk4u5bXL6gskv572mcevPbSKCV3zt3djhmlEQ65yERjIbOSncy7U4cQJaB1CBCg==} engines: {node: ^14.17.0 || ^16.13.0 || >=18.0.0} + type-detect@4.1.0: + resolution: {integrity: sha512-Acylog8/luQ8L7il+geoSxhEkazvkslg7PSNKOX59mbB9cOveP5aq9h74Y7YU8yDpJwetzQQrfIwtf4Wp4LKcw==} + engines: {node: '>=4'} + type-fest@0.21.3: resolution: {integrity: sha512-t0rzBq87m3fVcduHDUFhKmyyX+9eo6WQjZvf51Ea/M0Q7+T374Jp1aUiyUl0GKxp8M/OETVHSDvmkyPgvX+X2w==} engines: {node: '>=10'} @@ -3398,6 +3542,9 @@ packages: engines: {node: '>=14.17'} hasBin: true + ufo@1.5.4: + resolution: {integrity: sha512-UsUk3byDzKd04EyoZ7U4DOlxQaD14JUKQl6/P7wiX4FNvUfm3XL246n9W5AmqwW5RSFJ27NAuM0iLscAOYUiGQ==} + undefsafe@2.0.5: resolution: {integrity: sha512-WxONCrssBM8TSPRqN5EmsjVrsv4A8X12J4ArBiiayv3DyyG3ZlIg6yysuuSYdZsVz3TKcTg2fd//Ujd4CHV1iA==} @@ -3460,6 +3607,11 @@ packages: resolution: {integrity: sha512-BNGbWLfd0eUPabhkXUVm0j8uuvREyTh5ovRa/dyow/BqAbZJyC+5fU+IzQOzmAKzYqYRAISoRhdQr3eIZ/PXqg==} engines: {node: '>= 0.8'} + vite-node@0.34.6: + resolution: {integrity: sha512-nlBMJ9x6n7/Amaz6F3zJ97EBwR2FkzhBRxF5e+jE6LA3yi6Wtc2lyTij1OnDMIr34v5g/tVQtsVAzhT0jc5ygA==} + engines: {node: '>=v14.18.0'} + hasBin: true + vite-node@2.1.1: resolution: {integrity: sha512-N/mGckI1suG/5wQI35XeR9rsMsPqKXzq1CdUndzVstBj/HvyxxGctwnK6WX43NGt5L3Z5tcRf83g4TITKJhPrA==} engines: {node: ^18.0.0 || >=20.0.0} @@ -3501,6 +3653,37 @@ packages: terser: optional: true + vitest@0.34.6: + resolution: {integrity: sha512-+5CALsOvbNKnS+ZHMXtuUC7nL8/7F1F2DnHGjSsszX8zCjWSSviphCb/NuS9Nzf4Q03KyyDRBAXhF/8lffME4Q==} + engines: {node: '>=v14.18.0'} + hasBin: true + peerDependencies: + '@edge-runtime/vm': '*' + '@vitest/browser': '*' + '@vitest/ui': '*' + happy-dom: '*' + jsdom: '*' + playwright: '*' + safaridriver: '*' + webdriverio: '*' + peerDependenciesMeta: + '@edge-runtime/vm': + optional: true + '@vitest/browser': + optional: true + '@vitest/ui': + optional: true + happy-dom: + optional: true + jsdom: + optional: true + playwright: + optional: true + safaridriver: + optional: true + webdriverio: + optional: true + vitest@2.1.1: resolution: {integrity: sha512-97We7/VC0e9X5zBVkvt7SGQMGrRtn3KtySFQG5fpaMlS+l62eeXRQO633AYhSTC3z7IMebnPPNjGXVGNRFlxBA==} engines: {node: ^18.0.0 || >=20.0.0} @@ -3657,6 +3840,10 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + yocto-queue@1.1.1: + resolution: {integrity: sha512-b4JR1PFR10y1mKjhHY9LaGo6tmrgjit7hxVIeAmyMw3jegXR4dhYqLaQF5zMXZxY7tLpMyJeLjr1C4rLmkVe8g==} + engines: {node: '>=12.20'} + yoctocolors-cjs@2.1.2: resolution: {integrity: sha512-cYVsTjKl8b+FrnidjibDWskAv7UKOfcwaVZdp/it9n1s9fU3IkgDbhdIRKCW4JDsAlECJY0ytoVPT3sK6kideA==} engines: {node: '>=18'} @@ -3948,6 +4135,10 @@ snapshots: wrap-ansi: 8.1.0 wrap-ansi-cjs: wrap-ansi@7.0.0 + '@jest/schemas@29.6.3': + dependencies: + '@sinclair/typebox': 0.27.8 + '@jridgewell/resolve-uri@3.1.2': {} '@jridgewell/sourcemap-codec@1.5.0': {} @@ -4505,7 +4696,7 @@ snapshots: '@opentelemetry/semantic-conventions': 1.25.1 '@prisma/instrumentation': 5.16.1 '@sentry/core': 8.17.0 - '@sentry/opentelemetry': 8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.6.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/semantic-conventions@1.25.1) + '@sentry/opentelemetry': 8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.9.0))(@opentelemetry/semantic-conventions@1.25.1) '@sentry/types': 8.17.0 '@sentry/utils': 8.17.0 optionalDependencies: @@ -4513,7 +4704,7 @@ snapshots: transitivePeerDependencies: - supports-color - '@sentry/opentelemetry@8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.6.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/semantic-conventions@1.25.1)': + '@sentry/opentelemetry@8.17.0(@opentelemetry/api@1.9.0)(@opentelemetry/core@1.25.1(@opentelemetry/api@1.6.0))(@opentelemetry/instrumentation@0.52.1(@opentelemetry/api@1.9.0))(@opentelemetry/sdk-trace-base@1.25.1(@opentelemetry/api@1.9.0))(@opentelemetry/semantic-conventions@1.25.1)': dependencies: '@opentelemetry/api': 1.9.0 '@opentelemetry/core': 1.25.1(@opentelemetry/api@1.9.0) @@ -4551,6 +4742,8 @@ snapshots: transitivePeerDependencies: - supports-color + '@sinclair/typebox@0.27.8': {} + '@sindresorhus/is@5.6.0': {} '@syncpoint/wkx@0.5.2': @@ -4580,6 +4773,12 @@ snapshots: '@types/async@3.2.24': {} + '@types/chai-subset@1.3.5': + dependencies: + '@types/chai': 4.3.20 + + '@types/chai@4.3.20': {} + '@types/connect@3.4.36': dependencies: '@types/node': 22.5.5 @@ -4638,6 +4837,12 @@ snapshots: dependencies: '@types/node': 22.5.5 + '@vitest/expect@0.34.6': + dependencies: + '@vitest/spy': 0.34.6 + '@vitest/utils': 0.34.6 + chai: 4.5.0 + '@vitest/expect@2.1.1': dependencies: '@vitest/spy': 2.1.1 @@ -4657,21 +4862,43 @@ snapshots: dependencies: tinyrainbow: 1.2.0 + '@vitest/runner@0.34.6': + dependencies: + '@vitest/utils': 0.34.6 + p-limit: 4.0.0 + pathe: 1.1.2 + '@vitest/runner@2.1.1': dependencies: '@vitest/utils': 2.1.1 pathe: 1.1.2 + '@vitest/snapshot@0.34.6': + dependencies: + magic-string: 0.30.11 + pathe: 1.1.2 + pretty-format: 29.7.0 + '@vitest/snapshot@2.1.1': dependencies: '@vitest/pretty-format': 2.1.1 magic-string: 0.30.11 pathe: 1.1.2 + '@vitest/spy@0.34.6': + dependencies: + tinyspy: 2.2.1 + '@vitest/spy@2.1.1': dependencies: tinyspy: 3.0.2 + '@vitest/utils@0.34.6': + dependencies: + diff-sequences: 29.6.3 + loupe: 2.3.7 + pretty-format: 29.7.0 + '@vitest/utils@2.1.1': dependencies: '@vitest/pretty-format': 2.1.1 @@ -4753,6 +4980,8 @@ snapshots: dependencies: color-convert: 2.0.1 + ansi-styles@5.2.0: {} + ansi-styles@6.2.1: {} anymatch@3.1.3: @@ -4777,6 +5006,8 @@ snapshots: array-union@2.1.0: {} + assertion-error@1.1.0: {} + assertion-error@2.0.1: {} async-mutex@0.5.0: @@ -4909,6 +5140,16 @@ snapshots: camelcase@7.0.1: {} + chai@4.5.0: + dependencies: + assertion-error: 1.1.0 + check-error: 1.0.3 + deep-eql: 4.1.4 + get-func-name: 2.0.2 + loupe: 2.3.7 + pathval: 1.1.1 + type-detect: 4.1.0 + chai@5.1.1: dependencies: assertion-error: 2.0.1 @@ -4932,6 +5173,10 @@ snapshots: chardet@0.7.0: {} + check-error@1.0.3: + dependencies: + get-func-name: 2.0.2 + check-error@2.1.1: {} chokidar@3.6.0: @@ -5033,6 +5278,8 @@ snapshots: tree-kill: 1.2.2 yargs: 17.7.2 + confbox@0.1.7: {} + config-chain@1.1.13: dependencies: ini: 1.3.8 @@ -5107,6 +5354,10 @@ snapshots: dependencies: mimic-response: 3.1.0 + deep-eql@4.1.4: + dependencies: + type-detect: 4.1.0 + deep-eql@5.0.2: {} deep-extend@0.6.0: {} @@ -5121,6 +5372,8 @@ snapshots: detect-indent@6.1.0: {} + diff-sequences@29.6.3: {} + diff@4.0.2: {} dir-glob@3.0.1: @@ -5699,6 +5952,8 @@ snapshots: process-warning: 3.0.0 set-cookie-parser: 2.6.0 + local-pkg@0.4.3: {} + locate-path@5.0.0: dependencies: p-locate: 4.1.0 @@ -5729,6 +5984,10 @@ snapshots: lossless-json@2.0.11: {} + loupe@2.3.7: + dependencies: + get-func-name: 2.0.2 + loupe@3.1.1: dependencies: get-func-name: 2.0.2 @@ -5875,6 +6134,13 @@ snapshots: mkdirp@1.0.4: {} + mlly@1.7.1: + dependencies: + acorn: 8.12.1 + pathe: 1.1.2 + pkg-types: 1.2.0 + ufo: 1.5.4 + mnemonist@0.39.5: dependencies: obliterator: 2.0.4 @@ -6120,6 +6386,10 @@ snapshots: dependencies: yocto-queue: 0.1.0 + p-limit@4.0.0: + dependencies: + yocto-queue: 1.1.1 + p-locate@4.1.0: dependencies: p-limit: 2.3.0 @@ -6190,6 +6460,8 @@ snapshots: pathe@1.1.2: {} + pathval@1.1.1: {} + pathval@2.0.0: {} pause-stream@0.0.11: @@ -6244,6 +6516,12 @@ snapshots: sonic-boom: 3.8.1 thread-stream: 2.7.0 + pkg-types@1.2.0: + dependencies: + confbox: 0.1.7 + mlly: 1.7.1 + pathe: 1.1.2 + postcss@8.4.39: dependencies: nanoid: 3.3.7 @@ -6264,6 +6542,12 @@ snapshots: prettier@3.3.3: {} + pretty-format@29.7.0: + dependencies: + '@jest/schemas': 29.6.3 + ansi-styles: 5.2.0 + react-is: 18.3.1 + proc-log@3.0.0: {} process-nextick-args@2.0.1: {} @@ -6338,6 +6622,8 @@ snapshots: minimist: 1.2.8 strip-json-comments: 2.0.1 + react-is@18.3.1: {} + read-package-json-fast@3.0.2: dependencies: json-parse-even-better-errors: 3.0.2 @@ -6690,6 +6976,10 @@ snapshots: strip-json-comments@5.0.1: {} + strip-literal@1.3.0: + dependencies: + acorn: 8.12.1 + supports-color@5.5.0: dependencies: has-flag: 3.0.0 @@ -6732,10 +7022,14 @@ snapshots: tinyexec@0.3.0: {} + tinypool@0.7.0: {} + tinypool@1.0.1: {} tinyrainbow@1.2.0: {} + tinyspy@2.2.1: {} + tinyspy@3.0.2: {} tmp@0.0.33: @@ -6825,6 +7119,8 @@ snapshots: transitivePeerDependencies: - supports-color + type-detect@4.1.0: {} + type-fest@0.21.3: {} type-fest@1.4.0: {} @@ -6837,6 +7133,8 @@ snapshots: typescript@5.6.2: {} + ufo@1.5.4: {} + undefsafe@2.0.5: {} undici-types@6.19.8: {} @@ -6901,6 +7199,24 @@ snapshots: vary@1.1.2: {} + vite-node@0.34.6(@types/node@22.5.5): + dependencies: + cac: 6.7.14 + debug: 4.3.7 + mlly: 1.7.1 + pathe: 1.1.2 + picocolors: 1.1.0 + vite: 5.3.3(@types/node@22.5.5) + transitivePeerDependencies: + - '@types/node' + - less + - lightningcss + - sass + - stylus + - sugarss + - supports-color + - terser + vite-node@2.1.1(@types/node@22.5.5): dependencies: cac: 6.7.14 @@ -6937,6 +7253,41 @@ snapshots: '@types/node': 22.5.5 fsevents: 2.3.3 + vitest@0.34.6: + dependencies: + '@types/chai': 4.3.20 + '@types/chai-subset': 1.3.5 + '@types/node': 22.5.5 + '@vitest/expect': 0.34.6 + '@vitest/runner': 0.34.6 + '@vitest/snapshot': 0.34.6 + '@vitest/spy': 0.34.6 + '@vitest/utils': 0.34.6 + acorn: 8.12.1 + acorn-walk: 8.3.3 + cac: 6.7.14 + chai: 4.5.0 + debug: 4.3.7 + local-pkg: 0.4.3 + magic-string: 0.30.11 + pathe: 1.1.2 + picocolors: 1.1.0 + std-env: 3.7.0 + strip-literal: 1.3.0 + tinybench: 2.9.0 + tinypool: 0.7.0 + vite: 5.3.3(@types/node@22.5.5) + vite-node: 0.34.6(@types/node@22.5.5) + why-is-node-running: 2.3.0 + transitivePeerDependencies: + - less + - lightningcss + - sass + - stylus + - sugarss + - supports-color + - terser + vitest@2.1.1(@types/node@22.5.5): dependencies: '@vitest/expect': 2.1.1 @@ -7099,6 +7450,8 @@ snapshots: yocto-queue@0.1.0: {} + yocto-queue@1.1.1: {} + yoctocolors-cjs@2.1.2: {} zod@3.23.8: {} diff --git a/service/Dockerfile b/service/Dockerfile index 369abcc59..715931b11 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -18,6 +18,7 @@ COPY packages/types/package.json packages/types/tsconfig.json packages/types/ COPY libs/lib-services/package.json libs/lib-services/tsconfig.json libs/lib-services/ COPY modules/module-postgres/package.json modules/module-postgres/tsconfig.json modules/module-postgres/ +COPY modules/module-mongodb/package.json modules/module-mongodb/tsconfig.json modules/module-mongodb/ RUN pnpm install --frozen-lockfile @@ -34,6 +35,7 @@ COPY packages/types/src packages/types/src/ COPY libs/lib-services/src libs/lib-services/src/ COPY modules/module-postgres/src modules/module-postgres/src/ +COPY modules/module-mongodb/src modules/module-mongodb/src/ RUN pnpm build:production && \ rm -rf node_modules **/node_modules && \ diff --git a/service/package.json b/service/package.json index 4fe401bea..3a3d6652e 100644 --- a/service/package.json +++ b/service/package.json @@ -17,6 +17,7 @@ "@powersync/service-core": "workspace:*", "@powersync/lib-services-framework": "workspace:*", "@powersync/service-module-postgres": "workspace:*", + "@powersync/service-module-mongodb": "workspace:*", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-rsocket-router": "workspace:*", diff --git a/service/src/entry.ts b/service/src/entry.ts index d817f46fe..c92d19397 100644 --- a/service/src/entry.ts +++ b/service/src/entry.ts @@ -1,6 +1,7 @@ import { container, ContainerImplementation } from '@powersync/lib-services-framework'; import * as core from '@powersync/service-core'; import PostgresModule from '@powersync/service-module-postgres'; +import MongoModule from '@powersync/service-module-mongodb'; import { startServer } from './runners/server.js'; import { startStreamRunner } from './runners/stream-worker.js'; @@ -12,7 +13,7 @@ container.registerDefaults(); container.register(ContainerImplementation.REPORTER, createSentryReporter()); const moduleManager = new core.modules.ModuleManager(); -moduleManager.register([PostgresModule]); +moduleManager.register([PostgresModule, MongoModule]); // This is a bit of a hack. Commands such as the teardown command or even migrations might // want access to the ModuleManager in order to use modules container.register(core.ModuleManager, moduleManager); diff --git a/service/tsconfig.json b/service/tsconfig.json index 363c72bdb..40c9a5329 100644 --- a/service/tsconfig.json +++ b/service/tsconfig.json @@ -32,6 +32,9 @@ }, { "path": "../modules/module-postgres" + }, + { + "path": "../modules/module-mongodb" } ] }