Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/honest-cats-draw.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-mysql': patch
'@powersync/service-image': patch
---

Fix timestamp replication issues for MySQL.
6 changes: 6 additions & 0 deletions .changeset/many-shrimps-watch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-mysql': patch
'@powersync/service-image': patch
---

Fix resuming MySQL replication after a restart.
44 changes: 35 additions & 9 deletions modules/module-mysql/src/common/mysql-to-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu
}

export function toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
let result: sync_rules.DatabaseInputRow = {};
for (let key in row) {
// We are very much expecting the column to be there
const column = columns.get(key)!;
Expand All @@ -107,17 +108,35 @@ export function toSQLiteRow(row: Record<string, any>, columns: Map<string, Colum
switch (column.typeId) {
case mysql.Types.DATE:
// Only parse the date part
row[key] = row[key].toISOString().split('T')[0];
{
const date = row[key] as Date;
if (isNaN(date.getTime())) {
// Invalid dates, such as 2024-00-00.
// we can't do anything meaningful with this, so just use null.
result[key] = null;
} else {
result[key] = date.toISOString().split('T')[0];
}
}
break;
case mysql.Types.DATETIME:
case ADDITIONAL_MYSQL_TYPES.DATETIME2:
case mysql.Types.TIMESTAMP:
case ADDITIONAL_MYSQL_TYPES.TIMESTAMP2:
row[key] = row[key].toISOString();
{
const date = row[key] as Date;
if (isNaN(date.getTime())) {
// Invalid dates, such as 2024-00-00.
// we can't do anything meaningful with this, so just use null.
result[key] = null;
} else {
result[key] = date.toISOString();
}
}
break;
case mysql.Types.JSON:
if (typeof row[key] === 'string') {
row[key] = new JsonContainer(row[key]);
result[key] = new JsonContainer(row[key]);
}
break;
case mysql.Types.BIT:
Expand All @@ -127,14 +146,16 @@ export function toSQLiteRow(row: Record<string, any>, columns: Map<string, Colum
case mysql.Types.LONG_BLOB:
case ADDITIONAL_MYSQL_TYPES.BINARY:
case ADDITIONAL_MYSQL_TYPES.VARBINARY:
row[key] = new Uint8Array(Object.values(row[key]));
result[key] = new Uint8Array(Object.values(row[key]));
break;
case mysql.Types.LONGLONG:
if (typeof row[key] === 'string') {
row[key] = BigInt(row[key]);
result[key] = BigInt(row[key]);
} else if (typeof row[key] === 'number') {
// Zongji returns BIGINT as a number when it can be represented as a number
row[key] = BigInt(row[key]);
result[key] = BigInt(row[key]);
} else {
result[key] = row[key];
}
break;
case mysql.Types.TINY:
Expand All @@ -143,18 +164,23 @@ export function toSQLiteRow(row: Record<string, any>, columns: Map<string, Colum
case mysql.Types.INT24:
// Handle all integer values a BigInt
if (typeof row[key] === 'number') {
row[key] = BigInt(row[key]);
result[key] = BigInt(row[key]);
} else {
result[key] = row[key];
}
break;
case mysql.Types.SET:
// Convert to JSON array from string
const values = row[key].split(',');
row[key] = JSONBig.stringify(values);
result[key] = JSONBig.stringify(values);
break;
default:
result[key] = row[key];
break;
}
}
}
return sync_rules.toSyncRulesRow(row);
return sync_rules.toSyncRulesRow(result);
}

export function toExpressionTypeFromMySQLType(mysqlType: string | undefined): ExpressionType {
Expand Down
76 changes: 43 additions & 33 deletions modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as zongji_utils from './zongji/zongji-utils.js';
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js';
import mysqlPromise from 'mysql2/promise';
import { createRandomServerId } from '../utils/mysql-utils.js';
import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js';

export interface BinLogStreamOptions {
connections: MySQLConnectionManager;
Expand Down Expand Up @@ -114,8 +114,10 @@ export class BinLogStream {
// Start the snapshot inside a transaction.
// We use a dedicated connection for this.
const connection = await this.connections.getStreamingConnection();

const promiseConnection = (connection as mysql.Connection).promise();
try {
await promiseConnection.query(`SET time_zone = '+00:00'`);
await promiseConnection.query('BEGIN');
try {
gtid = await common.readExecutedGtid(promiseConnection);
Expand Down Expand Up @@ -258,6 +260,8 @@ AND table_type = 'BASE TABLE';`,
'SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY'
);
await promiseConnection.query<mysqlPromise.RowDataPacket[]>('START TRANSACTION');
await promiseConnection.query(`SET time_zone = '+00:00'`);

const sourceTables = this.syncRules.getSourceTables();
await this.storage.startBatch(
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true },
Expand Down Expand Up @@ -291,38 +295,32 @@ AND table_type = 'BASE TABLE';`,
logger.info(`Replicating ${table.qualifiedName}`);
// TODO count rows and log progress at certain batch sizes

let columns: Map<string, ColumnDescriptor>;
return new Promise<void>((resolve, reject) => {
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
connection
.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`)
.on('error', (err) => {
reject(err);
})
.on('fields', (fields: FieldPacket[]) => {
// Map the columns and their types
columns = toColumnDescriptors(fields);
})
.on('result', async (row) => {
connection.pause();
const record = common.toSQLiteRow(row, columns);

await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
connection.resume();
Metrics.getInstance().rows_replicated_total.add(1);
})
.on('end', async function () {
await batch.flush();
resolve();
});
// MAX_EXECUTION_TIME(0) hint disables execution timeout for this query
const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${escapeMysqlTableName(table)}`);
const stream = query.stream();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice simplification to the streaming process! 🥳


let columns: Map<string, ColumnDescriptor> | undefined = undefined;
stream.on('fields', (fields: FieldPacket[]) => {
// Map the columns and their types
columns = toColumnDescriptors(fields);
});

for await (let row of query.stream()) {
if (columns == null) {
throw new Error(`No 'fields' event emitted`);
}
const record = common.toSQLiteRow(row, columns!);

await batch.save({
tag: storage.SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
after: record,
afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns)
});
}
await batch.flush();
}

async replicate() {
Expand Down Expand Up @@ -350,6 +348,18 @@ AND table_type = 'BASE TABLE';`,
const initialReplicationCompleted = await this.checkInitialReplicated();
if (!initialReplicationCompleted) {
await this.startInitialReplication();
} else {
// We need to find the existing tables, to populate our table cache.
// This is needed for includeSchema to work correctly.
const sourceTables = this.syncRules.getSourceTables();
await this.storage.startBatch(
{ zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true },
async (batch) => {
for (let tablePattern of sourceTables) {
await this.getQualifiedTableNames(batch, tablePattern);
}
}
);
}
}

Expand Down Expand Up @@ -578,7 +588,7 @@ AND table_type = 'BASE TABLE';`,
beforeReplicaId: beforeUpdated
? getUuidReplicaIdentityBson(beforeUpdated, payload.sourceTable.replicaIdColumns)
: undefined,
after: common.toSQLiteRow(payload.data, payload.columns),
after: after,
afterReplicaId: getUuidReplicaIdentityBson(after, payload.sourceTable.replicaIdColumns)
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,14 @@ export class MySQLConnectionManager {
* @param params
*/
async query(query: string, params?: any[]): Promise<[RowDataPacket[], FieldPacket[]]> {
return this.promisePool.query<RowDataPacket[]>(query, params);
let connection: mysqlPromise.PoolConnection | undefined;
try {
connection = await this.promisePool.getConnection();
await connection.query(`SET time_zone = '+00:00'`);
return connection.query<RowDataPacket[]>(query, params);
} finally {
connection?.release();
}
}

/**
Expand Down
5 changes: 5 additions & 0 deletions modules/module-mysql/src/utils/mysql-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import mysql from 'mysql2';
import mysqlPromise from 'mysql2/promise';
import * as types from '../types/types.js';
import { coerce, gte } from 'semver';
import { SourceTable } from '@powersync/service-core';

export type RetriedQueryOptions = {
connection: mysqlPromise.Connection;
Expand Down Expand Up @@ -82,3 +83,7 @@ export function isVersionAtLeast(version: string, minimumVersion: string): boole

return gte(coercedVersion!, coercedMinimumVersion!, { loose: true });
}

export function escapeMysqlTableName(table: SourceTable): string {
return `\`${table.schema.replaceAll('`', '``')}\`.\`${table.table.replaceAll('`', '``')}\``;
}
Loading
Loading