diff --git a/.changeset/chilled-bears-kneel.md b/.changeset/chilled-bears-kneel.md index e06cfbf17..b50c7a23c 100644 --- a/.changeset/chilled-bears-kneel.md +++ b/.changeset/chilled-bears-kneel.md @@ -1,6 +1,7 @@ --- '@powersync/service-core': minor '@powersync/service-types': minor +'@powersync/service-image': minor --- -Add EdDSA support for signing JWTs +Add EdDSA support for JWTs. diff --git a/.changeset/young-rings-fold.md b/.changeset/young-rings-fold.md new file mode 100644 index 000000000..d6c402a38 --- /dev/null +++ b/.changeset/young-rings-fold.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres': minor +'@powersync/service-jpgwire': patch +'@powersync/service-image': minor +--- + +Improve timeouts and table snapshots for Postgres initial replication. diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index c08509660..ad1ab899d 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -1,6 +1,11 @@ import * as pgwire from '@powersync/service-jpgwire'; import { NormalizedPostgresConnectionConfig } from '../types/types.js'; +/** + * Shorter timeout for snapshot connections than for replication connections. + */ +const SNAPSHOT_SOCKET_TIMEOUT = 30_000; + export class PgManager { /** * Do not use this for any transactions. @@ -39,9 +44,18 @@ export class PgManager { const p = pgwire.connectPgWire(this.options, { type: 'standard' }); this.connectionPromises.push(p); const connection = await p; + + // Use an shorter timeout for snapshot connections. + // This is to detect broken connections early, instead of waiting + // for the full 6 minutes. + // This we are constantly using the connection, we don't need any + // custom keepalives. + (connection as any)._socket.setTimeout(SNAPSHOT_SOCKET_TIMEOUT); + // Disable statement timeout for snapshot queries. // On Supabase, the default is 2 minutes. await connection.query(`set session statement_timeout = 0`); + return connection; } diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index f7e0d1e0c..8aaf1b737 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -342,9 +342,6 @@ WHERE oid = $1::regclass`, await this.storage.startBatch( { zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: true }, async (batch) => { - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const startLsn = rs.rows[0][0]; - for (let tablePattern of sourceTables) { const tables = await this.getQualifiedTableNames(batch, db, tablePattern); for (let table of tables) { @@ -352,15 +349,28 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`); continue; } - await this.snapshotTable(batch, db, table); + let tableLsnNotBefore: string; + await db.query('BEGIN'); + try { + await this.snapshotTable(batch, db, table); + + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + tableLsnNotBefore = rs.rows[0][0]; + } finally { + // Read-only transaction, commit does not actually do anything. + await db.query('COMMIT'); + } - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const tableLsnNotBefore = rs.rows[0][0]; await batch.markSnapshotDone([table], tableLsnNotBefore); await touch(); } } - await batch.commit(startLsn); + + // Always commit the initial snapshot at zero. + // This makes sure we don't skip any changes applied before starting this snapshot, + // in the case of snapshot retries. + // We could alternatively commit at the replication slot LSN. + await batch.commit(ZERO_LSN); } ); } @@ -376,51 +386,70 @@ WHERE oid = $1::regclass`, const estimatedCount = await this.estimatedCount(db, table); let at = 0; let lastLogIndex = 0; - const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` }); - let columns: { i: number; name: string }[] = []; - // pgwire streams rows in chunks. - // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. - - for await (let chunk of cursor) { - if (chunk.tag == 'RowDescription') { - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name }; - }); - continue; - } - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = row[c.i]; - } - return q; + // We do streaming on two levels: + // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. + // 2. Fine level: Stream chunks from each fetch call. + await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`); + + let columns: { i: number; name: string }[] = []; + let hasRemainingData = true; + while (hasRemainingData) { + // Fetch 10k at a time. + // The balance here is between latency overhead per FETCH call, + // and not spending too much time on each FETCH call. + // We aim for a couple of seconds on each FETCH call. + const cursor = db.stream({ + statement: `FETCH 10000 FROM powersync_cursor` }); - if (rows.length > 0 && at - lastLogIndex >= 5000) { - logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); - lastLogIndex = at; - } - if (this.abort_signal.aborted) { - throw new Error(`Aborted initial replication of ${this.slot_name}`); - } + hasRemainingData = false; + // pgwire streams rows in chunks. + // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. + // There are typically 100-200 rows per chunk. + for await (let chunk of cursor) { + if (chunk.tag == 'RowDescription') { + // We get a RowDescription for each FETCH call, but they should + // all be the same. + let i = 0; + columns = chunk.payload.map((c) => { + return { i: i++, name: c.name }; + }); + continue; + } - for (const record of WalStream.getQueryData(rows)) { - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + const rows = chunk.rows.map((row) => { + let q: DatabaseInputRow = {}; + for (let c of columns) { + q[c.name] = row[c.i]; + } + return q; }); - } + if (rows.length > 0 && at - lastLogIndex >= 5000) { + logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`); + lastLogIndex = at; + hasRemainingData = true; + } + if (this.abort_signal.aborted) { + throw new Error(`Aborted initial replication of ${this.slot_name}`); + } + + for (const record of WalStream.getQueryData(rows)) { + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } - at += rows.length; - Metrics.getInstance().rows_replicated_total.add(rows.length); + at += rows.length; + Metrics.getInstance().rows_replicated_total.add(rows.length); - await touch(); + await touch(); + } } await batch.flush(); diff --git a/packages/jpgwire/src/pgwire_node.js b/packages/jpgwire/src/pgwire_node.js index 77573d711..330d26f7f 100644 --- a/packages/jpgwire/src/pgwire_node.js +++ b/packages/jpgwire/src/pgwire_node.js @@ -14,9 +14,14 @@ import { recordBytesRead } from './metrics.js'; // pgwire doesn't natively support configuring timeouts, but we just hardcode a default. // Timeout idle connections after 6 minutes (we ping at least every 5 minutes). const POWERSYNC_SOCKET_DEFAULT_TIMEOUT = 360_000; + // Timeout for the initial connection (pre-TLS) // Must be less than the timeout for a HTTP request const POWERSYNC_SOCKET_CONNECT_TIMEOUT = 20_000; + +// TCP keepalive delay in milliseconds. +// This can help detect dead connections earlier. +const POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY = 40_000; // END POWERSYNC const pbkdf2 = promisify(_pbkdf2); @@ -66,7 +71,20 @@ class SocketAdapter { static async connect(host, port) { // START POWERSYNC // Custom timeout handling - const socket = net.connect({ host, port, timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT }); + const socket = net.connect({ + host, + port, + + // This closes the connection if no data was sent or received for the given time, + // even if the connection is still actaully alive. + timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT, + + // This configures TCP keepalive. + keepAlive: true, + keepAliveInitialDelay: POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY + // Unfortunately it is not possible to set tcp_keepalive_intvl or + // tcp_keepalive_probes here. + }); try { const timeout = setTimeout(() => { socket.destroy(new Error(`Timeout while connecting to ${host}:${port}`)); @@ -102,6 +120,13 @@ class SocketAdapter { this._writeResume(); }); } + + // START POWERSYNC CUSTOM TIMEOUT + setTimeout(timeout) { + this._socket.setTimeout(timeout); + } + // END POWERSYNC CUSTOM TIMEOUT + async startTls(host, ca) { // START POWERSYNC CUSTOM OPTIONS HANDLING if (!Array.isArray(ca) && typeof ca[0] == 'object') {