diff --git a/.changeset/long-dolphins-judge.md b/.changeset/long-dolphins-judge.md new file mode 100644 index 000000000..44f57e246 --- /dev/null +++ b/.changeset/long-dolphins-judge.md @@ -0,0 +1,10 @@ +--- +'@powersync/service-module-postgres-storage': patch +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +--- + +Cleanly interrupt clearing of storage when the process is stopped/restarted. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 24fb9aca2..01e7b6497 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -129,7 +129,7 @@ export class MongoBucketBatch return this.last_checkpoint_lsn; } - async flush(options?: storage.BucketBatchCommitOptions): Promise { + async flush(options?: storage.BatchBucketFlushOptions): Promise { let result: storage.FlushedResult | null = null; // One flush may be split over multiple transactions. // Each flushInner() is one transaction. @@ -142,7 +142,7 @@ export class MongoBucketBatch return result; } - private async flushInner(options?: storage.BucketBatchCommitOptions): Promise { + private async flushInner(options?: storage.BatchBucketFlushOptions): Promise { const batch = this.batch; if (batch == null) { return null; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index a22bb371b..404994980 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -4,6 +4,7 @@ import { BaseObserver, ErrorCode, logger, + ReplicationAbortedError, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; @@ -504,7 +505,7 @@ export class MongoSyncBucketStorage async terminate(options?: storage.TerminateOptions) { // Default is to clear the storage except when explicitly requested not to. if (!options || options?.clearStorage) { - await this.clear(); + await this.clear(options); } await this.db.sync_rules.updateOne( { @@ -547,8 +548,11 @@ export class MongoSyncBucketStorage }; } - async clear(): Promise { + async clear(options?: storage.ClearStorageOptions): Promise { while (true) { + if (options?.signal?.aborted) { + throw new ReplicationAbortedError('Aborted clearing data'); + } try { await this.clearIteration(); diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index a1cbc1ebf..cc2f230dd 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -670,7 +670,7 @@ export class ChangeStream { if (result.needsInitialSync) { if (result.snapshotLsn == null) { // Snapshot LSN is not present, so we need to start replication from scratch. - await this.storage.clear(); + await this.storage.clear({ signal: this.abort_signal }); } await this.initialReplication(result.snapshotLsn); } diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index edb516b0c..6907e842c 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -288,7 +288,7 @@ AND table_type = 'BASE TABLE';`, * and starts again from scratch. */ async startInitialReplication() { - await this.storage.clear(); + await this.storage.clear({ signal: this.abortSignal }); // Replication will be performed in a single transaction on this connection const connection = await this.connections.getStreamingConnection(); const promiseConnection = (connection as mysql.Connection).promise(); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index 7ab896cad..c3eeb641b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -567,7 +567,7 @@ export class PostgresSyncRulesStorage async terminate(options?: storage.TerminateOptions) { if (!options || options?.clearStorage) { - await this.clear(); + await this.clear(options); } await this.db.sql` UPDATE sync_rules @@ -606,7 +606,8 @@ export class PostgresSyncRulesStorage }; } - async clear(): Promise { + async clear(options?: storage.ClearStorageOptions): Promise { + // TODO: Cleanly abort the cleanup when the provided signal is aborted. await this.db.sql` UPDATE sync_rules SET diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 792e0649d..2b6783a39 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -431,7 +431,7 @@ WHERE oid = $1::regclass`, // In those cases, we have to start replication from scratch. // If there is an existing healthy slot, we can skip this and continue // initial replication where we left off. - await this.storage.clear(); + await this.storage.clear({ signal: this.abort_signal }); await db.query({ statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1', @@ -948,7 +948,7 @@ WHERE oid = $1::regclass`, skipKeepalive = false; // flush() must be before the resnapshot check - that is // typically what reports the resnapshot records. - await batch.flush(); + await batch.flush({ oldestUncommittedChange: this.oldestUncommittedChange }); // This _must_ be checked after the flush(), and before // commit() or ack(). We never persist the resnapshot list, // so we have to process it before marking our progress. diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 97d163fb7..fe97aaa8a 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,4 +1,4 @@ -import { container, logger } from '@powersync/lib-services-framework'; +import { container, logger, ReplicationAbortedError } from '@powersync/lib-services-framework'; import { PgManager } from './PgManager.js'; import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js'; @@ -104,6 +104,10 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob this.lastStream = stream; await stream.replicate(); } catch (e) { + if (this.isStopped && e instanceof ReplicationAbortedError) { + // Ignore aborted errors + return; + } this.logger.error(`Replication error`, e); if (e.cause != null) { // Example: diff --git a/packages/service-core/src/replication/AbstractReplicator.ts b/packages/service-core/src/replication/AbstractReplicator.ts index a475124d0..6cb7a3d30 100644 --- a/packages/service-core/src/replication/AbstractReplicator.ts +++ b/packages/service-core/src/replication/AbstractReplicator.ts @@ -48,11 +48,11 @@ export abstract class AbstractReplicator { + this.abortController = new AbortController(); this.runLoop().catch((e) => { this.logger.error('Data source fatal replication error', e); container.reporter.captureException(e); @@ -107,7 +112,7 @@ export abstract class AbstractReplicator { - this.stopped = true; + this.abortController?.abort(); let promises: Promise[] = []; for (const job of this.replicationJobs.values()) { promises.push(job.stop()); @@ -241,6 +246,7 @@ export abstract class AbstractReplicator; + flush(options?: BatchBucketFlushOptions): Promise; /** * Flush and commit any saved ops. This creates a new checkpoint by default. @@ -161,13 +161,7 @@ export interface FlushedResult { flushed_op: InternalOpId; } -export interface BucketBatchCommitOptions { - /** - * Creates a new checkpoint even if there were no persisted operations. - * Defaults to true. - */ - createEmptyCheckpoints?: boolean; - +export interface BatchBucketFlushOptions { /** * The timestamp of the first change in this batch, according to the source database. * @@ -176,4 +170,12 @@ export interface BucketBatchCommitOptions { oldestUncommittedChange?: Date | null; } +export interface BucketBatchCommitOptions extends BatchBucketFlushOptions { + /** + * Creates a new checkpoint even if there were no persisted operations. + * Defaults to true. + */ + createEmptyCheckpoints?: boolean; +} + export type ResolvedBucketBatchCommitOptions = Required; diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 62bd608f4..2317b830f 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -48,7 +48,7 @@ export interface SyncRulesBucketStorage /** * Clear the storage, without changing state. */ - clear(): Promise; + clear(options?: ClearStorageOptions): Promise; autoActivate(): Promise; @@ -210,7 +210,11 @@ export interface CompactOptions { moveBatchQueryLimit?: number; } -export interface TerminateOptions { +export interface ClearStorageOptions { + signal?: AbortSignal; +} + +export interface TerminateOptions extends ClearStorageOptions { /** * If true, also clear the storage before terminating. */