Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/young-rings-fold.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres': minor
'@powersync/service-jpgwire': patch
'@powersync/service-image': patch
---

Improve timeouts and table snapshots for Postgres initial replication.
14 changes: 14 additions & 0 deletions modules/module-postgres/src/replication/PgManager.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
import * as pgwire from '@powersync/service-jpgwire';
import { NormalizedPostgresConnectionConfig } from '../types/types.js';

/**
* Shorter timeout for snapshot connections than for replication connections.
*/
const SNAPSHOT_SOCKET_TIMEOUT = 30_000;

export class PgManager {
/**
* Do not use this for any transactions.
Expand Down Expand Up @@ -39,9 +44,18 @@ export class PgManager {
const p = pgwire.connectPgWire(this.options, { type: 'standard' });
this.connectionPromises.push(p);
const connection = await p;

// Use an shorter timeout for snapshot connections.
// This is to detect broken connections early, instead of waiting
// for the full 6 minutes.
// This we are constantly using the connection, we don't need any
// custom keepalives.
(connection as any)._socket.setTimeout(SNAPSHOT_SOCKET_TIMEOUT);

// Disable statement timeout for snapshot queries.
// On Supabase, the default is 2 minutes.
await connection.query(`set session statement_timeout = 0`);

return connection;
}

Expand Down
121 changes: 75 additions & 46 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -342,25 +342,35 @@ WHERE oid = $1::regclass`,
await this.storage.startBatch(
{ zeroLSN: ZERO_LSN, defaultSchema: POSTGRES_DEFAULT_SCHEMA, storeCurrentData: true, skipExistingRows: true },
async (batch) => {
const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
const startLsn = rs.rows[0][0];

for (let tablePattern of sourceTables) {
const tables = await this.getQualifiedTableNames(batch, db, tablePattern);
for (let table of tables) {
if (table.snapshotComplete) {
logger.info(`${this.slot_name} Skipping ${table.qualifiedName} - snapshot already done`);
continue;
}
await this.snapshotTable(batch, db, table);
let tableLsnNotBefore: string;
await db.query('BEGIN');
try {
await this.snapshotTable(batch, db, table);

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
tableLsnNotBefore = rs.rows[0][0];
} finally {
// Read-only transaction, commit does not actually do anything.
await db.query('COMMIT');
}

const rs = await db.query(`select pg_current_wal_lsn() as lsn`);
const tableLsnNotBefore = rs.rows[0][0];
await batch.markSnapshotDone([table], tableLsnNotBefore);
await touch();
}
}
await batch.commit(startLsn);

// Always commit the initial snapshot at zero.
// This makes sure we don't skip any changes applied before starting this snapshot,
// in the case of snapshot retries.
// We could alternatively commit at the replication slot LSN.
await batch.commit(ZERO_LSN);
}
);
}
Expand All @@ -376,51 +386,70 @@ WHERE oid = $1::regclass`,
const estimatedCount = await this.estimatedCount(db, table);
let at = 0;
let lastLogIndex = 0;
const cursor = db.stream({ statement: `SELECT * FROM ${table.escapedIdentifier}` });
let columns: { i: number; name: string }[] = [];
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.

for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
continue;
}

const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
// We do streaming on two levels:
// 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time.
// 2. Fine level: Stream chunks from each fetch call.
await db.query(`DECLARE powersync_cursor CURSOR FOR SELECT * FROM ${table.escapedIdentifier}`);

let columns: { i: number; name: string }[] = [];
let hasRemainingData = true;
while (hasRemainingData) {
// Fetch 10k at a time.
// The balance here is between latency overhead per FETCH call,
// and not spending too much time on each FETCH call.
// We aim for a couple of seconds on each FETCH call.
const cursor = db.stream({
statement: `FETCH 10000 FROM powersync_cursor`
});
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}
hasRemainingData = false;
// pgwire streams rows in chunks.
// These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically.
// There are typically 100-200 rows per chunk.
for await (let chunk of cursor) {
if (chunk.tag == 'RowDescription') {
// We get a RowDescription for each FETCH call, but they should
// all be the same.
let i = 0;
columns = chunk.payload.map((c) => {
return { i: i++, name: c.name };
});
continue;
}

for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
const rows = chunk.rows.map((row) => {
let q: DatabaseInputRow = {};
for (let c of columns) {
q[c.name] = row[c.i];
}
return q;
});
}
if (rows.length > 0 && at - lastLogIndex >= 5000) {
logger.info(`${this.slot_name} Replicating ${table.qualifiedName} ${at}/${estimatedCount}`);
lastLogIndex = at;
hasRemainingData = true;
}
if (this.abort_signal.aborted) {
throw new Error(`Aborted initial replication of ${this.slot_name}`);
}

for (const record of WalStream.getQueryData(rows)) {
// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}

at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);
at += rows.length;
Metrics.getInstance().rows_replicated_total.add(rows.length);

await touch();
await touch();
}
}

await batch.flush();
Expand Down
27 changes: 26 additions & 1 deletion packages/jpgwire/src/pgwire_node.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,14 @@ import { recordBytesRead } from './metrics.js';
// pgwire doesn't natively support configuring timeouts, but we just hardcode a default.
// Timeout idle connections after 6 minutes (we ping at least every 5 minutes).
const POWERSYNC_SOCKET_DEFAULT_TIMEOUT = 360_000;

// Timeout for the initial connection (pre-TLS)
// Must be less than the timeout for a HTTP request
const POWERSYNC_SOCKET_CONNECT_TIMEOUT = 20_000;

// TCP keepalive delay in milliseconds.
// This can help detect dead connections earlier.
const POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY = 40_000;
// END POWERSYNC

const pbkdf2 = promisify(_pbkdf2);
Expand Down Expand Up @@ -66,7 +71,20 @@ class SocketAdapter {
static async connect(host, port) {
// START POWERSYNC
// Custom timeout handling
const socket = net.connect({ host, port, timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT });
const socket = net.connect({
host,
port,

// This closes the connection if no data was sent or received for the given time,
// even if the connection is still actaully alive.
timeout: POWERSYNC_SOCKET_DEFAULT_TIMEOUT,

// This configures TCP keepalive.
keepAlive: true,
keepAliveInitialDelay: POWERSYNC_SOCKET_KEEPALIVE_INITIAL_DELAY
// Unfortunately it is not possible to set tcp_keepalive_intvl or
// tcp_keepalive_probes here.
});
try {
const timeout = setTimeout(() => {
socket.destroy(new Error(`Timeout while connecting to ${host}:${port}`));
Expand Down Expand Up @@ -102,6 +120,13 @@ class SocketAdapter {
this._writeResume();
});
}

// START POWERSYNC CUSTOM TIMEOUT
setTimeout(timeout) {
this._socket.setTimeout(timeout);
}
// END POWERSYNC CUSTOM TIMEOUT

async startTls(host, ca) {
// START POWERSYNC CUSTOM OPTIONS HANDLING
if (!Array.isArray(ca) && typeof ca[0] == 'object') {
Expand Down
Loading