diff --git a/.changeset/grumpy-cameras-breathe.md b/.changeset/grumpy-cameras-breathe.md new file mode 100644 index 000000000..264e7be63 --- /dev/null +++ b/.changeset/grumpy-cameras-breathe.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-mysql': patch +--- + +Added a heartbeat mechanism to the MySQL binlog listener replication connection to detect connection timeouts. diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 65f0ced80..483f292f9 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -476,11 +476,36 @@ AND table_type = 'BASE TABLE';`, return; } - logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); + // Set a heartbeat interval for the Zongji replication connection + // Zongji does not explicitly handle the heartbeat events - they are categorized as event:unknown + // The heartbeat events are enough to keep the connection alive for setTimeout to work on the socket. + await new Promise((resolve, reject) => { + zongji.connection.query( + // In nanoseconds, 10^9 = 1s + 'set @master_heartbeat_period=28*1000000000', + function (error: any, results: any, fields: any) { + if (error) { + reject(error); + } else { + resolve(results); + } + } + ); + }); + logger.info('Successfully set up replication connection heartbeat...'); + // The _socket member is only set after a query is run on the connection, so we set the timeout after setting the heartbeat. + // The timeout here must be greater than the master_heartbeat_period. + const socket = zongji.connection._socket!; + socket.setTimeout(60_000, () => { + socket.destroy(new Error('Replication connection timeout.')); + }); + + logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); zongji.start({ + // We ignore the unknown/heartbeat event since it currently serves no purpose other than to keep the connection alive includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], excludeEvents: [], includeSchema: { [this.defaultSchema]: includedTables }, @@ -492,7 +517,7 @@ AND table_type = 'BASE TABLE';`, // Forever young await new Promise((resolve, reject) => { zongji.on('error', (error) => { - logger.error('Error on Binlog listener:', error); + logger.error('Binlog listener error:', error); zongji.stop(); queue.kill(); reject(error); diff --git a/modules/module-mysql/src/replication/zongji/zongji.d.ts b/modules/module-mysql/src/replication/zongji/zongji.d.ts index 9a17f15e9..f5640497e 100644 --- a/modules/module-mysql/src/replication/zongji/zongji.d.ts +++ b/modules/module-mysql/src/replication/zongji/zongji.d.ts @@ -1,4 +1,6 @@ declare module '@powersync/mysql-zongji' { + import { Socket } from 'net'; + export type ZongjiOptions = { host: string; user: string; @@ -108,7 +110,15 @@ declare module '@powersync/mysql-zongji' { export type BinLogEvent = BinLogRotationEvent | BinLogGTIDLogEvent | BinLogXidEvent | BinLogMutationEvent; + // @vlasky/mysql Connection + export interface MySQLConnection { + _socket?: Socket; + /** There are other forms of this method as well - this is the most basic one. */ + query(sql: string, callback: (error: any, results: any, fields: any) => void): void; + } + export default class ZongJi { + connection: MySQLConnection; constructor(options: ZongjiOptions); start(options: StartOptions): void;