Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
207 changes: 105 additions & 102 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export class ChangeStream {
const session = await this.client.startSession();
try {
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName },
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
// Start by resolving all tables.
// This checks postImage configuration, and that should fail as
Expand Down Expand Up @@ -510,126 +510,129 @@ export class ChangeStream {
// Auto-activate as soon as initial replication is done
await this.storage.autoActivate();

await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName }, async (batch) => {
const lastLsn = batch.lastCheckpointLsn;
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: this.defaultDb.databaseName, storeCurrentData: false },
async (batch) => {
const lastLsn = batch.lastCheckpointLsn;
const startAfter = mongoLsnToTimestamp(lastLsn) ?? undefined;
logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn}`);

// TODO: Use changeStreamSplitLargeEvent
// TODO: Use changeStreamSplitLargeEvent

const filters = this.getSourceNamespaceFilters();
const filters = this.getSourceNamespaceFilters();

const pipeline: mongo.Document[] = [
{
$match: filters.$match
}
];

let fullDocument: 'required' | 'updateLookup';

if (this.usePostImages) {
// 'read_only' or 'auto_configure'
// Configuration happens during snapshot, or when we see new
// collections.
fullDocument = 'required';
} else {
fullDocument = 'updateLookup';
}

const streamOptions: mongo.ChangeStreamOptions = {
startAtOperationTime: startAfter,
showExpandedEvents: true,
useBigInt64: true,
maxAwaitTimeMS: 200,
fullDocument: fullDocument
};
let stream: mongo.ChangeStream<mongo.Document>;
if (filters.multipleDatabases) {
// Requires readAnyDatabase@admin on Atlas
stream = this.client.watch(pipeline, streamOptions);
} else {
// Same general result, but requires less permissions than the above
stream = this.defaultDb.watch(pipeline, streamOptions);
}
const pipeline: mongo.Document[] = [
{
$match: filters.$match
}
];

if (this.abort_signal.aborted) {
stream.close();
return;
}
let fullDocument: 'required' | 'updateLookup';

this.abort_signal.addEventListener('abort', () => {
stream.close();
});
if (this.usePostImages) {
// 'read_only' or 'auto_configure'
// Configuration happens during snapshot, or when we see new
// collections.
fullDocument = 'required';
} else {
fullDocument = 'updateLookup';
}

// Always start with a checkpoint.
// This helps us to clear erorrs when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);
const streamOptions: mongo.ChangeStreamOptions = {
startAtOperationTime: startAfter,
showExpandedEvents: true,
useBigInt64: true,
maxAwaitTimeMS: 200,
fullDocument: fullDocument
};
let stream: mongo.ChangeStream<mongo.Document>;
if (filters.multipleDatabases) {
// Requires readAnyDatabase@admin on Atlas
stream = this.client.watch(pipeline, streamOptions);
} else {
// Same general result, but requires less permissions than the above
stream = this.defaultDb.watch(pipeline, streamOptions);
}

while (true) {
if (this.abort_signal.aborted) {
break;
stream.close();
return;
}

const changeDocument = await stream.tryNext();
this.abort_signal.addEventListener('abort', () => {
stream.close();
});

if (changeDocument == null || this.abort_signal.aborted) {
continue;
}
await touch();
// Always start with a checkpoint.
// This helps us to clear erorrs when restarting, even if there is
// no data to replicate.
let waitForCheckpointLsn: string | null = await createCheckpoint(this.client, this.defaultDb);

if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) {
continue;
}
while (true) {
if (this.abort_signal.aborted) {
break;
}

// console.log('event', changeDocument);
const changeDocument = await stream.tryNext();

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const lsn = getMongoLsn(changeDocument.clusterTime!);
if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
await batch.commit(lsn);
} else if (
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'delete'
) {
if (waitForCheckpointLsn == null) {
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
}
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
if (table.syncAny) {
await this.writeChange(batch, table, changeDocument);
if (changeDocument == null || this.abort_signal.aborted) {
continue;
}
} else if (changeDocument.operationType == 'drop') {
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
if (table.syncAny) {
await batch.drop([table]);
this.relation_cache.delete(table.objectId);
await touch();

if (startAfter != null && changeDocument.clusterTime?.lte(startAfter)) {
continue;
}
} else if (changeDocument.operationType == 'rename') {
const relFrom = getMongoRelation(changeDocument.ns);
const relTo = getMongoRelation(changeDocument.to);
const tableFrom = await this.getRelation(batch, relFrom);
if (tableFrom.syncAny) {
await batch.drop([tableFrom]);
this.relation_cache.delete(tableFrom.objectId);

// console.log('event', changeDocument);

if (
(changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace') &&
changeDocument.ns.coll == CHECKPOINTS_COLLECTION
) {
const lsn = getMongoLsn(changeDocument.clusterTime!);
if (waitForCheckpointLsn != null && lsn >= waitForCheckpointLsn) {
waitForCheckpointLsn = null;
}
await batch.commit(lsn);
} else if (
changeDocument.operationType == 'insert' ||
changeDocument.operationType == 'update' ||
changeDocument.operationType == 'replace' ||
changeDocument.operationType == 'delete'
) {
if (waitForCheckpointLsn == null) {
waitForCheckpointLsn = await createCheckpoint(this.client, this.defaultDb);
}
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
if (table.syncAny) {
await this.writeChange(batch, table, changeDocument);
}
} else if (changeDocument.operationType == 'drop') {
const rel = getMongoRelation(changeDocument.ns);
const table = await this.getRelation(batch, rel);
if (table.syncAny) {
await batch.drop([table]);
this.relation_cache.delete(table.objectId);
}
} else if (changeDocument.operationType == 'rename') {
const relFrom = getMongoRelation(changeDocument.ns);
const relTo = getMongoRelation(changeDocument.to);
const tableFrom = await this.getRelation(batch, relFrom);
if (tableFrom.syncAny) {
await batch.drop([tableFrom]);
this.relation_cache.delete(tableFrom.objectId);
}
// Here we do need to snapshot the new table
const collection = await this.getCollectionInfo(relTo.schema, relTo.name);
await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection });
}
// Here we do need to snapshot the new table
const collection = await this.getCollectionInfo(relTo.schema, relTo.name);
await this.handleRelation(batch, relTo, { snapshot: true, collectionInfo: collection });
}
}
});
);
}
}

Expand Down
4 changes: 2 additions & 2 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ AND table_type = 'BASE TABLE';`,
await promiseConnection.query<mysqlPromise.RowDataPacket[]>('START TRANSACTION');
const sourceTables = this.syncRules.getSourceTables();
await this.storage.startBatch(
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema },
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true },
async (batch) => {
for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, tablePattern);
Expand Down Expand Up @@ -383,7 +383,7 @@ AND table_type = 'BASE TABLE';`,

if (!this.stopped) {
await this.storage.startBatch(
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema },
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true },
async (batch) => {
const zongji = this.connections.createBinlogListener();

Expand Down
104 changes: 55 additions & 49 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -341,17 +341,20 @@ WHERE oid = $1::regclass`,

async initialReplication(db: pgwire.PgConnection, lsn: string) {
const sourceTables = this.sync_rules.getSourceTables();
await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => {
for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
for (let table of tables) {
await this.snapshotTable(batch, db, table);
await batch.markSnapshotDone([table], lsn);
await touch();
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true },
async (batch) => {
for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
for (let table of tables) {
await this.snapshotTable(batch, db, table);
await batch.markSnapshotDone([table], lsn);
await touch();
}
}
await batch.commit(lsn);
}
await batch.commit(lsn);
});
);
}

static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteRow> {
Expand Down Expand Up @@ -577,55 +580,58 @@ WHERE oid = $1::regclass`,
// Auto-activate as soon as initial replication is done
await this.storage.autoActivate();

await this.storage.startBatch({ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA }, async (batch) => {
// Replication never starts in the middle of a transaction
let inTx = false;
let count = 0;
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true },
async (batch) => {
// Replication never starts in the middle of a transaction
let inTx = false;
let count = 0;

for await (const chunk of replicationStream.pgoutputDecode()) {
await touch();
for await (const chunk of replicationStream.pgoutputDecode()) {
await touch();

if (this.abort_signal.aborted) {
break;
}
if (this.abort_signal.aborted) {
break;
}

// chunkLastLsn may come from normal messages in the chunk,
// or from a PrimaryKeepalive message.
const { messages, lastLsn: chunkLastLsn } = chunk;
for (const msg of messages) {
if (msg.tag == 'relation') {
await this.handleRelation(batch, getPgOutputRelation(msg), true);
} else if (msg.tag == 'begin') {
inTx = true;
} else if (msg.tag == 'commit') {
Metrics.getInstance().transactions_replicated_total.add(1);
inTx = false;
await batch.commit(msg.lsn!);
await this.ack(msg.lsn!, replicationStream);
} else {
if (count % 100 == 0) {
logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`);
}
// chunkLastLsn may come from normal messages in the chunk,
// or from a PrimaryKeepalive message.
const { messages, lastLsn: chunkLastLsn } = chunk;
for (const msg of messages) {
if (msg.tag == 'relation') {
await this.handleRelation(batch, getPgOutputRelation(msg), true);
} else if (msg.tag == 'begin') {
inTx = true;
} else if (msg.tag == 'commit') {
Metrics.getInstance().transactions_replicated_total.add(1);
inTx = false;
await batch.commit(msg.lsn!);
await this.ack(msg.lsn!, replicationStream);
} else {
if (count % 100 == 0) {
logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`);
}

count += 1;
await this.writeChange(batch, msg);
count += 1;
await this.writeChange(batch, msg);
}
}
}

if (!inTx) {
// In a transaction, we ack and commit according to the transaction progress.
// Outside transactions, we use the PrimaryKeepalive messages to advance progress.
// Big caveat: This _must not_ be used to skip individual messages, since this LSN
// may be in the middle of the next transaction.
// It must only be used to associate checkpoints with LSNs.
if (await batch.keepalive(chunkLastLsn)) {
await this.ack(chunkLastLsn, replicationStream);
if (!inTx) {
// In a transaction, we ack and commit according to the transaction progress.
// Outside transactions, we use the PrimaryKeepalive messages to advance progress.
// Big caveat: This _must not_ be used to skip individual messages, since this LSN
// may be in the middle of the next transaction.
// It must only be used to associate checkpoints with LSNs.
if (await batch.keepalive(chunkLastLsn)) {
await this.ack(chunkLastLsn, replicationStream);
}
}
}

Metrics.getInstance().chunks_replicated_total.add(1);
Metrics.getInstance().chunks_replicated_total.add(1);
}
}
});
);
}

async ack(lsn: string, replicationStream: pgwire.ReplicationStream) {
Expand Down
Loading
Loading