Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .changeset/rare-birds-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
---
'@powersync/service-module-postgres-storage': patch
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-module-mongodb': patch
'@powersync/service-core': patch
'@powersync/service-module-mysql': patch
---

Improve handling of some edge cases which could trigger truncating of synced tables.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import {
CheckpointChanges,
GetCheckpointChangesOptions,
ReplicationCheckpoint,
SourceTable,
storage,
utils,
WatchWriteCheckpointOptions
Expand All @@ -23,7 +24,14 @@ import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import * as timers from 'timers/promises';
import { MongoBucketStorage } from '../MongoBucketStorage.js';
import { PowerSyncMongo } from './db.js';
import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleCheckpointState, SyncRuleDocument } from './models.js';
import {
BucketDataDocument,
BucketDataKey,
SourceKey,
SourceTableDocument,
SyncRuleCheckpointState,
SyncRuleDocument
} from './models.js';
import { MongoBucketBatch } from './MongoBucketBatch.js';
import { MongoCompactor } from './MongoCompactor.js';
import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js';
Expand Down Expand Up @@ -163,17 +171,17 @@ export class MongoSyncBucketStorage
let result: storage.ResolveTableResult | null = null;
await this.db.client.withSession(async (session) => {
const col = this.db.source_tables;
let doc = await col.findOne(
{
group_id: group_id,
connection_id: connection_id,
relation_id: objectId,
schema_name: schema,
table_name: table,
replica_id_columns2: columns
},
{ session }
);
let filter: Partial<SourceTableDocument> = {
group_id: group_id,
connection_id: connection_id,
schema_name: schema,
table_name: table,
replica_id_columns2: columns
};
if (objectId != null) {
filter.relation_id = objectId;
}
let doc = await col.findOne(filter, { session });
if (doc == null) {
doc = {
_id: new bson.ObjectId(),
Expand Down Expand Up @@ -202,31 +210,40 @@ export class MongoSyncBucketStorage
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);
sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable);

let dropTables: storage.SourceTable[] = [];
// Detect tables that are either renamed, or have different replica_id_columns
let truncateFilter = [{ schema_name: schema, table_name: table }] as any[];
if (objectId != null) {
// Only detect renames if the source uses relation ids.
truncateFilter.push({ relation_id: objectId });
}
const truncate = await col
.find(
{
group_id: group_id,
connection_id: connection_id,
_id: { $ne: doc._id },
$or: [{ relation_id: objectId }, { schema_name: schema, table_name: table }]
$or: truncateFilter
},
{ session }
)
.toArray();
dropTables = truncate.map(
(doc) =>
new storage.SourceTable(
doc._id,
connection_tag,
doc.relation_id,
doc.schema_name,
doc.table_name,
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
doc.snapshot_done ?? true
)
);

result = {
table: sourceTable,
dropTables: truncate.map(
(doc) =>
new storage.SourceTable(
doc._id,
connection_tag,
doc.relation_id ?? 0,
doc.schema_name,
doc.table_name,
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
doc.snapshot_done ?? true
)
)
dropTables: dropTables
};
});
return result!;
Expand Down
109 changes: 77 additions & 32 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
import { escapeRegExp } from '../utils.js';
import { MongoManager } from './MongoManager.js';
import { constructAfterRecord, createCheckpoint, getMongoRelation } from './MongoRelation.js';
import { constructAfterRecord, createCheckpoint, getCacheIdentifier, getMongoRelation } from './MongoRelation.js';
import { CHECKPOINTS_COLLECTION } from './replication-utils.js';

export interface ChangeStreamOptions {
Expand Down Expand Up @@ -89,6 +89,10 @@ export class ChangeStream {
return this.connections.options.postImages == PostImagesOption.AUTO_CONFIGURE;
}

private get logPrefix() {
return `[powersync_${this.group_id}]`;
}

/**
* This resolves a pattern, persists the related metadata, and returns
* the resulting SourceTables.
Expand Down Expand Up @@ -124,18 +128,13 @@ export class ChangeStream {
.toArray();

if (!tablePattern.isWildcard && collections.length == 0) {
logger.warn(`Collection ${schema}.${tablePattern.name} not found`);
logger.warn(`${this.logPrefix} Collection ${schema}.${tablePattern.name} not found`);
}

for (let collection of collections) {
const table = await this.handleRelation(
batch,
{
name: collection.name,
schema,
objectId: collection.name,
replicationColumns: [{ name: '_id' }]
} as SourceEntityDescriptor,
getMongoRelation({ db: schema, coll: collection.name }),
// This is done as part of the initial setup - snapshot is handled elsewhere
{ snapshot: false, collectionInfo: collection }
);
Expand All @@ -149,7 +148,7 @@ export class ChangeStream {
async initSlot(): Promise<InitResult> {
const status = await this.storage.getStatus();
if (status.snapshot_done && status.checkpoint_lsn) {
logger.info(`Initial replication already done`);
logger.info(`${this.logPrefix} Initial replication already done`);
return { needsInitialSync: false };
}

Expand Down Expand Up @@ -220,7 +219,7 @@ export class ChangeStream {
}

const { comparable: lsn } = new MongoLSN({ timestamp: snapshotTime });
logger.info(`Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
logger.info(`${this.logPrefix} Snapshot commit at ${snapshotTime.inspect()} / ${lsn}`);
await batch.commit(lsn);
}
);
Expand Down Expand Up @@ -289,7 +288,7 @@ export class ChangeStream {
table: storage.SourceTable,
session?: mongo.ClientSession
) {
logger.info(`Replicating ${table.qualifiedName}`);
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName}`);
const estimatedCount = await this.estimatedCount(table);
let at = 0;
let lastLogIndex = 0;
Expand Down Expand Up @@ -319,7 +318,7 @@ export class ChangeStream {

at += 1;
if (at - lastLogIndex >= 5000) {
logger.info(`[${this.group_id}] Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
logger.info(`${this.logPrefix} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
}
Metrics.getInstance().rows_replicated_total.add(1);
Expand All @@ -328,14 +327,16 @@ export class ChangeStream {
}

await batch.flush();
logger.info(`Replicated ${at} documents for ${table.qualifiedName}`);
logger.info(`${this.logPrefix} Replicated ${at} documents for ${table.qualifiedName}`);
}

private async getRelation(
batch: storage.BucketStorageBatch,
descriptor: SourceEntityDescriptor
descriptor: SourceEntityDescriptor,
options: { snapshot: boolean }
): Promise<SourceTable> {
const existing = this.relation_cache.get(descriptor.objectId);
const cacheId = getCacheIdentifier(descriptor);
const existing = this.relation_cache.get(cacheId);
if (existing != null) {
return existing;
}
Expand All @@ -344,7 +345,7 @@ export class ChangeStream {
// missing values.
const collection = await this.getCollectionInfo(descriptor.schema, descriptor.name);

return this.handleRelation(batch, descriptor, { snapshot: false, collectionInfo: collection });
return this.handleRelation(batch, descriptor, { snapshot: options.snapshot, collectionInfo: collection });
}

private async getCollectionInfo(db: string, name: string): Promise<mongo.CollectionInfo | undefined> {
Expand Down Expand Up @@ -375,7 +376,7 @@ export class ChangeStream {
collMod: collectionInfo.name,
changeStreamPreAndPostImages: { enabled: true }
});
logger.info(`Enabled postImages on ${db}.${collectionInfo.name}`);
logger.info(`${this.logPrefix} Enabled postImages on ${db}.${collectionInfo.name}`);
} else if (!enabled) {
throw new ServiceError(ErrorCode.PSYNC_S1343, `postImages not enabled on ${db}.${collectionInfo.name}`);
}
Expand All @@ -394,27 +395,31 @@ export class ChangeStream {
}

const snapshot = options.snapshot;
if (!descriptor.objectId && typeof descriptor.objectId != 'string') {
throw new ReplicationAssertionError('MongoDB replication - objectId expected');
}
const result = await this.storage.resolveTable({
group_id: this.group_id,
connection_id: this.connection_id,
connection_tag: this.connections.connectionTag,
entity_descriptor: descriptor,
sync_rules: this.sync_rules
});
this.relation_cache.set(descriptor.objectId, result.table);
this.relation_cache.set(getCacheIdentifier(descriptor), result.table);

// Drop conflicting tables. This includes for example renamed tables.
await batch.drop(result.dropTables);
// Drop conflicting collections.
// This is generally not expected for MongoDB source dbs, so we log an error.
if (result.dropTables.length > 0) {
logger.error(
`Conflicting collections found for ${JSON.stringify(descriptor)}. Dropping: ${result.dropTables.map((t) => t.id).join(', ')}`
);
await batch.drop(result.dropTables);
}

// Snapshot if:
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
// 2. Snapshot is not already done, AND:
// 3. The table is used in sync rules.
const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny;
if (shouldSnapshot) {
logger.info(`${this.logPrefix} New collection: ${descriptor.schema}.${descriptor.name}`);
// Truncate this table, in case a previous snapshot was interrupted.
await batch.truncate([result.table]);

Expand All @@ -434,7 +439,7 @@ export class ChangeStream {
change: mongo.ChangeStreamDocument
): Promise<storage.FlushedResult | null> {
if (!table.syncAny) {
logger.debug(`Collection ${table.qualifiedName} not used in sync rules - skipping`);
logger.debug(`${this.logPrefix} Collection ${table.qualifiedName} not used in sync rules - skipping`);
return null;
}

Expand Down Expand Up @@ -528,7 +533,7 @@ export class ChangeStream {
const startAfter = lastLsn?.timestamp;
const resumeAfter = lastLsn?.resumeToken;

logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);
logger.info(`${this.logPrefix} Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

const filters = this.getSourceNamespaceFilters();

Expand Down Expand Up @@ -590,13 +595,14 @@ export class ChangeStream {

let splitDocument: mongo.ChangeStreamDocument | null = null;

let flexDbNameWorkaroundLogged = false;

while (true) {
if (this.abort_signal.aborted) {
break;
}

const originalChangeDocument = await stream.tryNext();

// The stream was closed, we will only ever receive `null` from it
if (!originalChangeDocument && stream.closed) {
break;
Expand Down Expand Up @@ -636,6 +642,29 @@ export class ChangeStream {
throw new ReplicationAssertionError(`Incomplete splitEvent: ${JSON.stringify(splitDocument.splitEvent)}`);
}

if (
!filters.multipleDatabases &&
'ns' in changeDocument &&
changeDocument.ns.db != this.defaultDb.databaseName &&
changeDocument.ns.db.endsWith(`_${this.defaultDb.databaseName}`)
) {
// When all of the following conditions are met:
// 1. We're replicating from an Atlas Flex instance.
// 2. There were changestream events recorded while the PowerSync service is paused.
// 3. We're only replicating from a single database.
// Then we've observed an ns with for example {db: '67b83e86cd20730f1e766dde_ps'},
// instead of the expected {db: 'ps'}.
// We correct this.
changeDocument.ns.db = this.defaultDb.databaseName;

if (!flexDbNameWorkaroundLogged) {
flexDbNameWorkaroundLogged = true;
logger.warn(
`${this.logPrefix} Incorrect DB name in change stream: ${changeDocument.ns.db}. Changed to ${this.defaultDb.databaseName}.`
);
}
}

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
Expand Down Expand Up @@ -682,28 +711,44 @@ export class ChangeStream {
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
}
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
const table = await this.getRelation(batch, rel, {
// In most cases, we should not need to snapshot this. But if this is the first time we see the collection
// for whatever reason, then we do need to snapshot it.
// This may result in some duplicate operations when a collection is created for the first time after
// sync rules was deployed.
snapshot: true
});
if (table.syncAny) {
await this.writeChange(batch, table, changeDocument);
}
} else if (changeDocument.operationType == 'drop') {
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
const table = await this.getRelation(batch, rel, {
// We're "dropping" this collection, so never snapshot it.
snapshot: false
});
if (table.syncAny) {
await batch.drop([table]);
this.relation_cache.delete(table.objectId);
this.relation_cache.delete(getCacheIdentifier(rel));
}
} else if (changeDocument.operationType == 'rename') {
const relFrom = getMongoRelation(changeDocument.ns);
const relTo = getMongoRelation(changeDocument.to);
const tableFrom = await this.getRelation(batch, relFrom);
const tableFrom = await this.getRelation(batch, relFrom, {
// We're "dropping" this collection, so never snapshot it.
snapshot: false
});
if (tableFrom.syncAny) {
await batch.drop([tableFrom]);
this.relation_cache.delete(tableFrom.objectId);
this.relation_cache.delete(getCacheIdentifier(relFrom));
}
// Here we do need to snapshot the new table
const collection = await this.getCollectionInfo(relTo.schema, relTo.name);
await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection });
await this.handleRelation(batch, relTo, {
// This is a new (renamed) collection, so always snapshot it.
snapshot: true,
collectionInfo: collection
});
}
}
}
Expand Down
Loading
Loading