diff --git a/.changeset/chatty-boxes-repeat.md b/.changeset/chatty-boxes-repeat.md new file mode 100644 index 000000000..891191b15 --- /dev/null +++ b/.changeset/chatty-boxes-repeat.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres': patch +--- + +Fix replication slot recovery diff --git a/.changeset/serious-rivers-sin.md b/.changeset/serious-rivers-sin.md new file mode 100644 index 000000000..902d86eba --- /dev/null +++ b/.changeset/serious-rivers-sin.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': patch +--- + +Reduce noise in log output diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index b1f690c84..34c168f90 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -192,6 +192,15 @@ export class WalStream { if (slotExists) { // This checks that the slot is still valid const r = await this.checkReplicationSlot(); + if (snapshotDone && r.needsNewSlot) { + // We keep the current snapshot, and create a new replication slot + throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`); + } + // We can have: + // needsInitialSync: true, needsNewSlot: true -> initial sync from scratch + // needsInitialSync: true, needsNewSlot: false -> resume initial sync + // needsInitialSync: false, needsNewSlot: true -> handled above + // needsInitialSync: false, needsNewSlot: false -> resume streaming replication return { needsInitialSync: !snapshotDone, needsNewSlot: r.needsNewSlot @@ -204,7 +213,7 @@ export class WalStream { /** * If a replication slot exists, check that it is healthy. */ - private async checkReplicationSlot(): Promise { + private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> { let last_error = null; const slotName = this.slot_name; @@ -244,7 +253,7 @@ export class WalStream { // Success logger.info(`Slot ${slotName} appears healthy`); - return { needsInitialSync: false, needsNewSlot: false }; + return { needsNewSlot: false }; } catch (e) { last_error = e; logger.warn(`${slotName} Replication slot error`, e); @@ -274,9 +283,9 @@ export class WalStream { // Sample: publication "powersync" does not exist // Happens when publication deleted or never created. // Slot must be re-created in this case. - logger.info(`${slotName} does not exist anymore, will create new slot`); + logger.info(`${slotName} is not valid anymore`); - return { needsInitialSync: true, needsNewSlot: true }; + return { needsNewSlot: true }; } // Try again after a pause await new Promise((resolve) => setTimeout(resolve, 1000)); diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 7b8674ce6..0398f1da2 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -5,6 +5,7 @@ import { pgwireRows } from '@powersync/service-jpgwire'; import * as crypto from 'crypto'; import { describe, expect, test } from 'vitest'; import { WalStreamTestContext } from './wal_stream_utils.js'; +import { MissingReplicationSlotError } from '@module/replication/WalStream.js'; type StorageFactory = () => Promise; @@ -291,4 +292,52 @@ bucket_definitions: expect(endRowCount - startRowCount).toEqual(0); expect(endTxCount - startTxCount).toEqual(1); }); + + test('reporting slot issues', async () => { + { + await using context = await WalStreamTestContext.open(factory); + const { pool } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query( + `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)` + ); + await pool.query( + `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id` + ); + await context.replicateSnapshot(); + await context.startStreaming(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { + id: '8133cd37-903b-4937-a022-7c8294015a3a', + description: 'test1' + }) + ]); + + expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true }); + } + + { + await using context = await WalStreamTestContext.open(factory, { doNotClear: true }); + const { pool } = context; + await pool.query('DROP PUBLICATION powersync'); + await pool.query(`UPDATE test_data SET description = 'updated'`); + await pool.query('CREATE PUBLICATION powersync FOR ALL TABLES'); + + await context.loadActiveSyncRules(); + await expect(async () => { + await context.replicateSnapshot(); + }).rejects.toThrowError(MissingReplicationSlotError); + + // The error is handled on a higher level, which triggers + // creating a new replication slot. + } + }); } diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index c217d1b88..3cde14cf5 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -77,6 +77,16 @@ export class WalStreamTestContext implements AsyncDisposable { return this.storage!; } + async loadActiveSyncRules() { + const syncRules = await this.factory.getActiveSyncRulesContent(); + if (syncRules == null) { + throw new Error(`Active sync rules not available`); + } + + this.storage = this.factory.getInstance(syncRules); + return this.storage!; + } + get walStream() { if (this.storage == null) { throw new Error('updateSyncRules() first'); diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 693d8bc91..c882b5100 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -609,6 +609,8 @@ export class MongoBucketBatch extends DisposableObserver { await this.flush(); @@ -619,9 +621,12 @@ export class MongoBucketBatch extends DisposableObserver 5_000) { + logger.info( + `Waiting until ${this.no_checkpoint_before_lsn} before creating checkpoint, currently at ${lsn}. Persisted op: ${this.persisted_op}` + ); + this.lastWaitingLogThottled = Date.now(); + } // Edge case: During initial replication, we have a no_checkpoint_before_lsn set, // and don't actually commit the snapshot. diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 2305bacc3..77cf9c100 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -522,11 +522,13 @@ export class MongoSyncBucketStorage while (true) { try { await this.clearIteration(); + + logger.info(`${this.slot_name} Done clearing data`); return; } catch (e: unknown) { if (e instanceof mongo.MongoServerError && e.codeName == 'MaxTimeMSExpired') { logger.info( - `Clearing took longer than ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, waiting and triggering another iteration.` + `${this.slot_name} Cleared batch of data in ${db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...` ); await timers.setTimeout(db.mongo.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5); continue;