Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/grumpy-cameras-breathe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@powersync/service-module-mysql': patch
---

Added a heartbeat mechanism to the MySQL binlog listener replication connection to detect connection timeouts.
29 changes: 27 additions & 2 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -476,11 +476,36 @@ AND table_type = 'BASE TABLE';`,
return;
}

logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`);
// Set a heartbeat interval for the Zongji replication connection
// Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown
// The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket.
await new Promise((resolve, reject) => {
zongji.connection.query(
// In nanoseconds, 10^9 = 1s
'set @master_heartbeat_period=28*1000000000',
function (error: any, results: any, fields: any) {
if (error) {
reject(error);
} else {
resolve(results);
}
}
);
});
logger.info('Successfully set up replication connection heartbeat...');

// The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat.
// The timeout here must be greater than the master_heartbeat_period.
const socket = zongji.connection._socket!;
socket.setTimeout(60_000, () => {
socket.destroy(new Error('Replication connection timeout.'));
});

logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`);
// Only listen for changes to tables in the sync rules
const includedTables = [...this.tableCache.values()].map((table) => table.table);
zongji.start({
// We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive
includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'],
excludeEvents: [],
includeSchema: { [this.defaultSchema]: includedTables },
Expand All @@ -492,7 +517,7 @@ AND table_type = 'BASE TABLE';`,
// Forever young
await new Promise<void>((resolve, reject) => {
zongji.on('error', (error) => {
logger.error('Error on Binlog listener:', error);
logger.error('Binlog listener error:', error);
zongji.stop();
queue.kill();
reject(error);
Expand Down
10 changes: 10 additions & 0 deletions modules/module-mysql/src/replication/zongji/zongji.d.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
declare module '@powersync/mysql-zongji' {
import { Socket } from 'net';

export type ZongjiOptions = {
host: string;
user: string;
Expand Down Expand Up @@ -108,7 +110,15 @@ declare module '@powersync/mysql-zongji' {

export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent;

// @vlasky/mysql Connection
export interface MySQLConnection {
_socket?: Socket;
/** There are other forms of this method as well - this is the most basic one. */
query(sql: string, callback: (error: any, results: any, fields: any) => void): void;
}

export default class ZongJi {
connection: MySQLConnection;
constructor(options: ZongjiOptions);

start(options: StartOptions): void;
Expand Down
Loading