Skip to content

Commit cabcb93

Browse files
committed
Made it possible to specify timezone on zongji listener configuration to stop unwanted timezone skew
Added serverId configuration Added check for binlog existence before starting replication.
1 parent 2f9e24f commit cabcb93

File tree

11 files changed

+89
-462
lines changed

11 files changed

+89
-462
lines changed

modules/module-mysql/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@
3232
"@powersync/service-core": "workspace:*",
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
35-
"@powersync/mysql-zongji": "0.0.0-dev-20241021144804",
35+
"@powersync/mysql-zongji": "0.0.0-dev-20241022103518",
36+
"date-fns": "^4.1.0",
3637
"semver": "^7.5.4",
3738
"async": "^3.2.4",
3839
"mysql2": "^3.11.0",

modules/module-mysql/src/common/mysql-to-sqlite.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,21 @@
1+
import { logger } from '@powersync/lib-services-framework';
12
import * as sync_rules from '@powersync/service-sync-rules';
23
import { ExpressionType } from '@powersync/service-sync-rules';
4+
import { ColumnDescriptor } from '@powersync/service-core';
5+
import { formatISO } from 'date-fns';
6+
import mysql from 'mysql2';
37

4-
export function toSQLiteRow(row: Record<string, any>): sync_rules.SqliteRow {
8+
export function toSQLiteRow(row: Record<string, any>, columns?: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
59
for (let key in row) {
610
if (row[key] instanceof Date) {
7-
row[key] = row[key].toISOString();
11+
logger.info(`Date before conversion: ${key}=${row[key].toISOString()}`);
12+
const column = columns?.get(key);
13+
if (column?.typeId == mysql.Types.DATE) {
14+
row[key] = formatISO(row[key], { representation: 'date' });
15+
} else {
16+
row[key] = row[key].toISOString();
17+
}
18+
logger.info(`Converted date to string: ${key}=${row[key]}`);
819
}
920
}
1021
return sync_rules.toSyncRulesRow(row);

modules/module-mysql/src/common/read-executed-gtid.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,15 @@ export async function readExecutedGtid(connection: mysqlPromise.Connection): Pro
4141
raw_gtid: binlogStatus.Executed_Gtid_Set
4242
});
4343
}
44+
45+
export async function isBinlogStillAvailable(
46+
connection: mysqlPromise.Connection,
47+
binlogFile: string
48+
): Promise<boolean> {
49+
const [logFiles] = await mysql_utils.retriedQuery({
50+
connection,
51+
query: `SHOW BINARY LOGS;`
52+
});
53+
54+
return logFiles.some((f) => f['Log_name'] == binlogFile);
55+
}

modules/module-mysql/src/replication/BinLogStream.ts

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,16 @@ import { logger } from '@powersync/lib-services-framework';
22
import * as sync_rules from '@powersync/service-sync-rules';
33
import async from 'async';
44

5-
import { framework, getUuidReplicaIdentityBson, Metrics, storage } from '@powersync/service-core';
6-
import mysql from 'mysql2';
5+
import { ColumnDescriptor, framework, getUuidReplicaIdentityBson, Metrics, storage } from '@powersync/service-core';
6+
import mysql, { FieldPacket } from 'mysql2';
77

88
import { BinLogEvent } from '@powersync/mysql-zongji';
99
import * as common from '../common/common-index.js';
1010
import * as zongji_utils from './zongji/zongji-utils.js';
1111
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
12-
import { ReplicatedGTID } from '../common/common-index.js';
12+
import { isBinlogStillAvailable, ReplicatedGTID } from '../common/common-index.js';
1313
import mysqlPromise from 'mysql2/promise';
14+
import { MySQLTypesMap } from '../utils/mysql_utils.js';
1415

1516
export interface BinLogStreamOptions {
1617
connections: MySQLConnectionManager;
@@ -211,10 +212,23 @@ AND table_type = 'BASE TABLE';`,
211212
*/
212213
protected async checkInitialReplicated(): Promise<boolean> {
213214
const status = await this.storage.getStatus();
215+
const lastKnowGTID = status.checkpoint_lsn ? common.ReplicatedGTID.fromSerialized(status.checkpoint_lsn) : null;
214216
if (status.snapshot_done && status.checkpoint_lsn) {
215-
logger.info(`Initial replication already done. MySQL appears healthy`);
217+
logger.info(`Initial replication already done.`);
218+
219+
if (lastKnowGTID) {
220+
// Check if the binlog is still available. If it isn't we need to snapshot again.
221+
const connection = await this.connections.getConnection();
222+
try {
223+
return await isBinlogStillAvailable(connection, lastKnowGTID.position.filename);
224+
} finally {
225+
connection.release();
226+
}
227+
}
228+
216229
return true;
217230
}
231+
218232
return false;
219233
}
220234

@@ -270,17 +284,24 @@ AND table_type = 'BASE TABLE';`,
270284
logger.info(`Replicating ${table.qualifiedName}`);
271285
// TODO count rows and log progress at certain batch sizes
272286

287+
const columns = new Map<string, ColumnDescriptor>();
273288
return new Promise<void>((resolve, reject) => {
274289
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
275290
connection
276291
.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`)
277-
.stream()
278292
.on('error', (err) => {
279293
reject(err);
280294
})
281-
.on('data', async (row) => {
295+
.on('fields', (fields: FieldPacket[]) => {
296+
// Map the columns and their types
297+
fields.forEach((field) => {
298+
const columnType = MySQLTypesMap[field.type as number];
299+
columns.set(field.name, { name: field.name, type: columnType, typeId: field.type });
300+
});
301+
})
302+
.on('result', async (row) => {
282303
connection.pause();
283-
const record = common.toSQLiteRow(row);
304+
const record = common.toSQLiteRow(row, columns);
284305

285306
await batch.save({
286307
tag: storage.SaveOperationTag.INSERT,

modules/module-mysql/src/replication/zongji/zongji.d.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ declare module '@powersync/mysql-zongji' {
44
user: string;
55
password: string;
66
dateStrings?: boolean;
7+
timeZone?: string;
78
};
89

910
interface DatabaseFilter {
@@ -31,6 +32,11 @@ declare module '@powersync/mysql-zongji' {
3132
* BinLog position offset to start reading events from in file specified
3233
*/
3334
position?: number;
35+
36+
/**
37+
* Unique server ID for this replication client.
38+
*/
39+
serverId?: number;
3440
};
3541

3642
export type ColumnSchema = {

modules/module-mysql/src/types/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ export interface NormalizedMySQLConnectionConfig {
1414

1515
username: string;
1616
password: string;
17+
server_id: number;
1718

1819
cacert?: string;
1920
client_certificate?: string;
@@ -29,6 +30,7 @@ export const MySQLConnectionConfig = service_types.configFile.DataSourceConfig.a
2930
username: t.string.optional(),
3031
password: t.string.optional(),
3132
database: t.string.optional(),
33+
server_id: t.number.optional(),
3234

3335
cacert: t.string.optional(),
3436
client_certificate: t.string.optional(),
@@ -97,6 +99,8 @@ export function normalizeConnectionConfig(options: MySQLConnectionConfig): Norma
9799
database,
98100

99101
username,
100-
password
102+
password,
103+
104+
server_id: options.server_id ?? 1
101105
};
102106
}

modules/module-mysql/src/utils/mysql_utils.ts

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,13 @@
11
import { logger } from '@powersync/lib-services-framework';
2-
import mysql from 'mysql2';
2+
import mysql, { Types } from 'mysql2';
33
import mysqlPromise from 'mysql2/promise';
44
import * as types from '../types/types.js';
55

6+
export const MySQLTypesMap: { [key: number]: string } = {};
7+
for (const [name, code] of Object.entries(Types)) {
8+
MySQLTypesMap[code as number] = name;
9+
}
10+
611
export type RetriedQueryOptions = {
712
connection: mysqlPromise.Connection;
813
query: string;
@@ -42,6 +47,9 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option
4247
password: config.password,
4348
database: config.database,
4449
ssl: hasSSLOptions ? sslOptions : undefined,
50+
supportBigNumbers: true,
51+
// dateStrings: true,
52+
timezone: 'Z', // Ensure no auto timezone manipulation of the dates occur
4553
...(options || {})
4654
});
4755
}

0 commit comments

Comments
 (0)