From 570f94d54f945c26ec6a4f722cdde4b01f403327 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 12:27:01 +0200 Subject: [PATCH 01/17] Handle MySQL servers with non-UTC timezone. --- .../src/replication/MySQLConnectionManager.ts | 9 ++++++++- modules/module-mysql/test/src/mysql-to-sqlite.test.ts | 6 +++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index 3693b9ce2..7d395b1bb 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -62,7 +62,14 @@ export class MySQLConnectionManager { * @param params */ async query(query: string, params?: any[]): Promise<[RowDataPacket[], FieldPacket[]]> { - return this.promisePool.query(query, params); + let connection: mysqlPromise.PoolConnection | undefined; + try { + connection = await this.promisePool.getConnection(); + connection.query(`SET time_zone = "+00:00"`); + return connection.query(query, params); + } finally { + connection?.release(); + } } /** diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 9cebdccd2..275c72ff3 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -202,7 +202,7 @@ INSERT INTO test_data ( await setupTable(); await connectionManager.query(` INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) - VALUES('2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47', '15:47:00', '2023'); + VALUES('2023-03-06', '2023-03-06 15:47:00+00:00', '2023-03-06 15:47:00+00:00', '15:47:00', '2023'); `); const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); @@ -222,8 +222,8 @@ INSERT INTO test_data ( test('Date types edge cases mappings', async () => { await setupTable(); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01+00:00')`); + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499+00:00')`); await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); From 4889504fecc039e6c26fa217d3135cdfb101e90e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 12:32:38 +0200 Subject: [PATCH 02/17] Use same config for ZongJi in tests as in the service. --- modules/module-mysql/test/src/mysql-to-sqlite.test.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 275c72ff3..6fac035e4 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -282,8 +282,7 @@ async function getReplicatedRows(expectedTransactionsCount?: number): Promise((resolve, reject) => { From d3c8659ec862c688d03ba8dfcf198566cb5be459 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 12:36:29 +0200 Subject: [PATCH 03/17] Only set timezone offset on the pool, for compatibility with mysql 5.7. --- modules/module-mysql/test/src/mysql-to-sqlite.test.ts | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 6fac035e4..944218ea7 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -200,9 +200,10 @@ INSERT INTO test_data ( test('Date types mappings', async () => { await setupTable(); + // Timezone offset is set on the pool to +00:00 await connectionManager.query(` INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) - VALUES('2023-03-06', '2023-03-06 15:47:00+00:00', '2023-03-06 15:47:00+00:00', '15:47:00', '2023'); + VALUES('2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47', '15:47:00', '2023'); `); const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); @@ -222,8 +223,9 @@ INSERT INTO test_data ( test('Date types edge cases mappings', async () => { await setupTable(); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01+00:00')`); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499+00:00')`); + // Timezone offset is set on the pool to +00:00 + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); From ec8b6efea17ab0c1d9681cd157baa7277423a934 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 14:08:03 +0200 Subject: [PATCH 04/17] Use query.stream() to improve error handling of snapshots. --- .../src/replication/BinLogStream.ts | 56 +++++++++---------- 1 file changed, 25 insertions(+), 31 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index d5e0c43ad..9bfe32690 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -291,38 +291,32 @@ AND table_type = 'BASE TABLE';`, logger.info(`Replicating ${table.qualifiedName}`); // TODO count rows and log progress at certain batch sizes - let columns: Map; - return new Promise((resolve, reject) => { - // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - connection - .query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`) - .on('error', (err) => { - reject(err); - }) - .on('fields', (fields: FieldPacket[]) => { - // Map the columns and their types - columns = toColumnDescriptors(fields); - }) - .on('result', async (row) => { - connection.pause(); - const record = common.toSQLiteRow(row, columns); - - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) - }); - connection.resume(); - Metrics.getInstance().rows_replicated_total.add(1); - }) - .on('end', async function () { - await batch.flush(); - resolve(); - }); + // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query + const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`); + const stream = query.stream(); + + let columns: Map | undefined = undefined; + stream.on('fields', (fields: FieldPacket[]) => { + // Map the columns and their types + columns = toColumnDescriptors(fields); }); + + for await (let row of query.stream()) { + if (columns == null) { + throw new Error(`No 'fields' event emitted`); + } + const record = common.toSQLiteRow(row, columns!); + + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } + await batch.flush(); } async replicate() { From 1008b1ea9f4766d4f98296bc96061c4bf480ecd6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 14:26:03 +0200 Subject: [PATCH 05/17] Do not modify original rows when replicating. --- .../src/common/mysql-to-sqlite.ts | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index 8cc2487d8..703b94d59 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -99,6 +99,7 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu } export function toSQLiteRow(row: Record, columns: Map): sync_rules.SqliteRow { + let result: sync_rules.DatabaseInputRow = {}; for (let key in row) { // We are very much expecting the column to be there const column = columns.get(key)!; @@ -107,17 +108,17 @@ export function toSQLiteRow(row: Record, columns: Map, columns: Map, columns: Map Date: Tue, 31 Dec 2024 14:30:45 +0200 Subject: [PATCH 06/17] Gracefully handle invalid dates. --- .../src/common/mysql-to-sqlite.ts | 22 +++++- .../test/src/mysql-to-sqlite.test.ts | 71 ++++++++++++++----- 2 files changed, 74 insertions(+), 19 deletions(-) diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index 703b94d59..b106c90f2 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -108,13 +108,31 @@ export function toSQLiteRow(row: Record, columns: Map { await setupTable(); - // Timezone offset is set on the pool to +00:00 - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); - await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); - await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); + const connection = await connectionManager.getConnection(); + try { + // Disable strict mode, to allow dates such as '2024-00-00'. + await connection.query(`SET SESSION sql_mode=''`); + await connection.query(`SET SESSION time_zone='+00:00'`); + + await connection.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); + await connection.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('0000-00-00 00:00:00')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('2024-00-00 00:00:00')`); + // TODO: This has a mismatch between querying directly and with Zongji. + // await connection.query(`INSERT INTO test_data(date_col) VALUES('2024-00-00')`); + + const expectedResults = [ + { timestamp_col: '1970-01-01T00:00:01.000Z' }, + { timestamp_col: '2038-01-19T03:14:07.499Z' }, + { datetime_col: '1000-01-01T00:00:00.000Z' }, + { datetime_col: '9999-12-31T23:59:59.499Z' }, + { datetime_col: null }, + { datetime_col: null } + // { date_col: '2023-11-30' } or { date_col: null }? + ]; + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(expectedResults.length); + + for (let i = 0; i < expectedResults.length; i++) { + expect(databaseRows[i]).toMatchObject(expectedResults[i]); + expect(replicatedRows[i]).toMatchObject(expectedResults[i]); + } + } finally { + connection.release; + } + }); + + test.skip('Date types mappings - null', async () => { + await setupTable(); + await connectionManager.query(` + INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) + VALUES(NULL, NULL, NULL, NULL, NULL); + `); const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); - const replicatedRows = await getReplicatedRows(4); - const expectedResults = [ - { timestamp_col: '1970-01-01T00:00:01.000Z' }, - { timestamp_col: '2038-01-19T03:14:07.499Z' }, - { datetime_col: '1000-01-01T00:00:00.000Z' }, - { datetime_col: '9999-12-31T23:59:59.499Z' } - ]; - - for (let i = 0; i < expectedResults.length; i++) { - expect(databaseRows[i]).toMatchObject(expectedResults[i]); - expect(replicatedRows[i]).toMatchObject(expectedResults[i]); - } + const replicatedRows = await getReplicatedRows(); + const expectedResult = { + date_col: null, + datetime_col: null, + timestamp_col: null, + time_col: null, + year_col: null + }; + + expect(databaseRows[0]).toMatchObject(expectedResult); + expect(replicatedRows[0]).toMatchObject(expectedResult); }); test('Json types mappings', async () => { From 0f2889810ad532d9be23b2fd88a81c1578295010 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 14:35:38 +0200 Subject: [PATCH 07/17] Specifically set time_zone on snapshot connections. --- modules/module-mysql/src/replication/BinLogStream.ts | 4 ++++ .../module-mysql/src/replication/MySQLConnectionManager.ts | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 9bfe32690..6b9af1c2f 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -114,8 +114,10 @@ export class BinLogStream { // Start the snapshot inside a transaction. // We use a dedicated connection for this. const connection = await this.connections.getStreamingConnection(); + const promiseConnection = (connection as mysql.Connection).promise(); try { + await promiseConnection.query(`SET time_zone = '+00:00'`); await promiseConnection.query('BEGIN'); try { gtid = await common.readExecutedGtid(promiseConnection); @@ -258,6 +260,8 @@ AND table_type = 'BASE TABLE';`, 'SET TRANSACTION ISOLATION LEVEL REPEATABLE READ, READ ONLY' ); await promiseConnection.query('START TRANSACTION'); + await promiseConnection.query(`SET time_zone = '+00:00'`); + const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index 7d395b1bb..4548d8381 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -65,7 +65,7 @@ export class MySQLConnectionManager { let connection: mysqlPromise.PoolConnection | undefined; try { connection = await this.promisePool.getConnection(); - connection.query(`SET time_zone = "+00:00"`); + await connection.query(`SET time_zone = '+00:00'`); return connection.query(query, params); } finally { connection?.release(); From 872887888978c515928be925fb85149666aa52c3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 14:48:03 +0200 Subject: [PATCH 08/17] Fix resuming replication after a restart. --- modules/module-mysql/src/replication/BinLogStream.ts | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 6b9af1c2f..a6d218ae2 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -348,6 +348,18 @@ AND table_type = 'BASE TABLE';`, const initialReplicationCompleted = await this.checkInitialReplicated(); if (!initialReplicationCompleted) { await this.startInitialReplication(); + } else { + // We need to find the existing tables, to populate our table cache. + // This is needed for includeSchema to work correctly. + const sourceTables = this.syncRules.getSourceTables(); + await this.storage.startBatch( + { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + async (batch) => { + for (let tablePattern of sourceTables) { + await this.getQualifiedTableNames(batch, tablePattern); + } + } + ); } } From 0241924a12ee295d50763bc3b24bf15fcf8264d1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 14:56:46 +0200 Subject: [PATCH 09/17] Avoid duplicating row conversion on update. --- modules/module-mysql/src/replication/BinLogStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index a6d218ae2..4a4fb4a94 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -588,7 +588,7 @@ AND table_type = 'BASE TABLE';`, beforeReplicaId: beforeUpdated ? getUuidReplicaIdentityBson(beforeUpdated, payload.sourceTable.replicaIdColumns) : undefined, - after: common.toSQLiteRow(payload.data, payload.columns), + after: after, afterReplicaId: getUuidReplicaIdentityBson(after, payload.sourceTable.replicaIdColumns) }); From 7fb0f21f2a68fc076f5bac55523e1f37559a15a8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 31 Dec 2024 15:05:01 +0200 Subject: [PATCH 10/17] Properly escape table name in snapshot query. --- modules/module-mysql/src/replication/BinLogStream.ts | 4 ++-- modules/module-mysql/src/utils/mysql-utils.ts | 5 +++++ packages/service-core/src/storage/SourceTable.ts | 4 ++++ 3 files changed, 11 insertions(+), 2 deletions(-) diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 4a4fb4a94..65f0ced80 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -11,7 +11,7 @@ import * as zongji_utils from './zongji/zongji-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js'; import mysqlPromise from 'mysql2/promise'; -import { createRandomServerId } from '../utils/mysql-utils.js'; +import { createRandomServerId, escapeMysqlTableName } from '../utils/mysql-utils.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -296,7 +296,7 @@ AND table_type = 'BASE TABLE';`, // TODO count rows and log progress at certain batch sizes // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`); + const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${escapeMysqlTableName(table)}`); const stream = query.stream(); let columns: Map | undefined = undefined; diff --git a/modules/module-mysql/src/utils/mysql-utils.ts b/modules/module-mysql/src/utils/mysql-utils.ts index a2279c234..f1e831273 100644 --- a/modules/module-mysql/src/utils/mysql-utils.ts +++ b/modules/module-mysql/src/utils/mysql-utils.ts @@ -3,6 +3,7 @@ import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import * as types from '../types/types.js'; import { coerce, gte } from 'semver'; +import { SourceTable } from '@powersync/service-core'; export type RetriedQueryOptions = { connection: mysqlPromise.Connection; @@ -82,3 +83,7 @@ export function isVersionAtLeast(version: string, minimumVersion: string): boole return gte(coercedVersion!, coercedMinimumVersion!, { loose: true }); } + +export function escapeMysqlTableName(table: SourceTable): string { + return `\`${table.schema.replaceAll('`', '``')}\`.\`${table.table.replaceAll('`', '``')}\``; +} diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index f514f9081..a2c2f65e7 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -48,6 +48,8 @@ export class SourceTable { } /** + * Use for postgres only. + * * Usage: db.query({statement: `SELECT $1::regclass`, params: [{type: 'varchar', value: table.qualifiedName}]}) */ get qualifiedName() { @@ -55,6 +57,8 @@ export class SourceTable { } /** + * Use for postgres and logs only. + * * Usage: db.query(`SELECT * FROM ${table.escapedIdentifier}`) */ get escapedIdentifier() { From ae803e65a0cafc0ab316629068de229886c83c4e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 11:30:25 +0200 Subject: [PATCH 11/17] Test updating a row with timestamp values. --- .../test/src/BinLogStream.test.ts | 22 +++++++++++-------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 44240d461..9f740f83b 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -14,13 +14,9 @@ bucket_definitions: - SELECT id, description FROM "test_data" `; -describe( - ' Binlog stream - mongodb', - function () { - defineBinlogStreamTests(MONGO_STORAGE_FACTORY); - }, - { timeout: 20_000 } -); +describe(' Binlog stream - mongodb', { timeout: 20_000 }, function () { + defineBinlogStreamTests(MONGO_STORAGE_FACTORY); +}); function defineBinlogStreamTests(factory: StorageFactory) { test( @@ -254,6 +250,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { await connectionManager.query(` INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') `); + await connectionManager.query(`UPDATE test_data SET description = ? WHERE id = ?`, ['testUpdated', testId]); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([ @@ -263,13 +260,20 @@ function defineBinlogStreamTests(factory: StorageFactory) { date: `2023-03-06`, datetime: '2023-03-06T15:47:00.000Z', timestamp: '2023-03-06T15:47:00.000Z' + }), + putOp('test_data', { + id: testId, + description: 'testUpdated', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' }) ]); const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; const endTxCount = (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); + expect(endRowCount - startRowCount).toEqual(2); + expect(endTxCount - startTxCount).toEqual(2); }) ); From 5cb51bd78fc139f92c7ee040be4b6346db993166 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 11:43:42 +0200 Subject: [PATCH 12/17] Refactor BinLogStream tests to use `await using`. --- .../test/src/BinLogStream.test.ts | 460 +++++++++--------- .../test/src/BinlogStreamUtils.ts | 37 +- 2 files changed, 236 insertions(+), 261 deletions(-) diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 9f740f83b..eb9669aae 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -1,9 +1,9 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; import { BucketStorageFactory, Metrics } from '@powersync/service-core'; -import { describe, expect, test } from 'vitest'; -import { binlogStreamTest } from './BinlogStreamUtils.js'; import { v4 as uuid } from 'uuid'; +import { describe, expect, test } from 'vitest'; +import { BinlogStreamTestContext } from './BinlogStreamUtils.js'; type StorageFactory = () => Promise; @@ -19,83 +19,76 @@ describe(' Binlog stream - mongodb', { timeout: 20_000 }, function () { }); function defineBinlogStreamTests(factory: StorageFactory) { - test( - 'Replicate basic values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + test('Replicate basic values', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT id, description, num FROM "test_data"`); - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); - - await context.replicateSnapshot(); - - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - - context.startStreaming(); - const testId = uuid(); - await connectionManager.query( - `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` - ); - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); - - test( - 'replicating case sensitive table', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); + + await context.replicateSnapshot(); + + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + const testId = uuid(); + await connectionManager.query( + `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` + ); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }); + + test('replicating case sensitive table', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT id, description FROM "test_DATA" `); - await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); + await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - context.startStreaming(); + context.startStreaming(); - const testId = uuid(); - await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`); + const testId = uuid(); + await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); + expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }); - // TODO: Not supported yet - // test( - // 'replicating TRUNCATE', - // binlogStreamTest(factory, async (context) => { - // const { connectionManager } = context; - // const syncRuleContent = ` + // TODO: Not supported yet + // test('replicating TRUNCATE', async () => { + // await using context = await BinlogStreamTestContext.create(factory); + // const { connectionManager } = context; + // const syncRuleContent = ` // bucket_definitions: // global: // data: @@ -104,207 +97,194 @@ function defineBinlogStreamTests(factory: StorageFactory) { // parameters: SELECT id FROM test_data WHERE id = token_parameters.user_id // data: [] // `; - // await context.updateSyncRules(syncRuleContent); - // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); - // await connectionManager.query( - // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` - // ); - // - // await context.replicateSnapshot(); - // context.startStreaming(); - // - // const [{ test_id }] = pgwireRows( - // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) - // ); - // await connectionManager.query(`TRUNCATE test_data`); - // - // const data = await context.getBucketData('global[]'); - // - // expect(data).toMatchObject([ - // putOp('test_data', { id: test_id, description: 'test1' }), - // removeOp('test_data', test_id) - // ]); - // }) + // await context.updateSyncRules(syncRuleContent); + // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); + // await connectionManager.query( + // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` // ); - test( - 'replicating changing primary key', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); - - await context.replicateSnapshot(); - context.startStreaming(); - - const testId1 = uuid(); - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`); - - const testId2 = uuid(); - await connectionManager.query( - `UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'` - ); - - // This update may fail replicating with: - // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} - await connectionManager.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${testId2}'`); - - // Re-use old id again - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}', 'test1b')`); - await connectionManager.query(`UPDATE test_data SET description = 'test1c' WHERE id = '${testId1}'`); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - // Initial insert - putOp('test_data', { id: testId1, description: 'test1' }), - // Update id, then description - removeOp('test_data', testId1), - putOp('test_data', { id: testId2, description: 'test2a' }), - putOp('test_data', { id: testId2, description: 'test2b' }), - // Re-use old id - putOp('test_data', { id: testId1, description: 'test1b' }), - putOp('test_data', { id: testId1, description: 'test1c' }) - ]); - }) - ); - - test( - 'initial sync', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); - - const testId = uuid(); - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); - - await context.replicateSnapshot(); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); - }) - ); - - test( - 'snapshot with date values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + // await context.replicateSnapshot(); + // context.startStreaming(); + + // const [{ test_id }] = pgwireRows( + // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) + // ); + // await connectionManager.query(`TRUNCATE test_data`); + + // const data = await context.getBucketData('global[]'); + + // expect(data).toMatchObject([ + // putOp('test_data', { id: test_id, description: 'test1' }), + // removeOp('test_data', test_id) + // ]); + // }); + + test('replicating changing primary key', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); + + await context.replicateSnapshot(); + context.startStreaming(); + + const testId1 = uuid(); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`); + + const testId2 = uuid(); + await connectionManager.query( + `UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'` + ); + + // This update may fail replicating with: + // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} + await connectionManager.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${testId2}'`); + + // Re-use old id again + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}', 'test1b')`); + await connectionManager.query(`UPDATE test_data SET description = 'test1c' WHERE id = '${testId1}'`); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + // Initial insert + putOp('test_data', { id: testId1, description: 'test1' }), + // Update id, then description + removeOp('test_data', testId1), + putOp('test_data', { id: testId2, description: 'test2a' }), + putOp('test_data', { id: testId2, description: 'test2b' }), + // Re-use old id + putOp('test_data', { id: testId1, description: 'test1b' }), + putOp('test_data', { id: testId1, description: 'test1c' }) + ]); + }); + + test('initial sync', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); + + const testId = uuid(); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); + }); + + test('snapshot with date values', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT * FROM "test_data" `); - await connectionManager.query( - `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` - ); + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + ); - const testId = uuid(); - await connectionManager.query(` + const testId = uuid(); + await connectionManager.query(` INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') `); - await context.replicateSnapshot(); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - putOp('test_data', { - id: testId, - description: 'testDates', - date: `2023-03-06`, - datetime: '2023-03-06T15:47:00.000Z', - timestamp: '2023-03-06T15:47:00.000Z' - }) - ]); - }) - ); - - test( - 'replication with date values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + }); + + test('replication with date values', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT * FROM "test_data" `); - await connectionManager.query( - `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` - ); + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + ); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - context.startStreaming(); + context.startStreaming(); - const testId = uuid(); - await connectionManager.query(` + const testId = uuid(); + await connectionManager.query(` INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') `); - await connectionManager.query(`UPDATE test_data SET description = ? WHERE id = ?`, ['testUpdated', testId]); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - putOp('test_data', { - id: testId, - description: 'testDates', - date: `2023-03-06`, - datetime: '2023-03-06T15:47:00.000Z', - timestamp: '2023-03-06T15:47:00.000Z' - }), - putOp('test_data', { - id: testId, - description: 'testUpdated', - date: `2023-03-06`, - datetime: '2023-03-06T15:47:00.000Z', - timestamp: '2023-03-06T15:47:00.000Z' - }) - ]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(2); - expect(endTxCount - startTxCount).toEqual(2); - }) - ); - - test( - 'table not in sync rules', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`); - - await context.replicateSnapshot(); - - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - - context.startStreaming(); - - await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`); - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - - // There was a transaction, but we should not replicate any actual data - expect(endRowCount - startRowCount).toEqual(0); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); + await connectionManager.query(`UPDATE test_data SET description = ? WHERE id = ?`, ['testUpdated', testId]); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }), + putOp('test_data', { + id: testId, + description: 'testUpdated', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(2); + expect(endTxCount - startTxCount).toEqual(2); + }); + + test('table not in sync rules', async () => { + await using context = await BinlogStreamTestContext.create(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`); + + await context.replicateSnapshot(); + + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + // There was a transaction, but we should not replicate any actual data + expect(endRowCount - startRowCount).toEqual(0); + expect(endTxCount - startTxCount).toEqual(1); + }); } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index c08f22c60..902da29bc 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -17,28 +17,9 @@ import { logger } from '@powersync/lib-services-framework'; * Tests operating on the binlog stream need to configure the stream and manage asynchronous * replication, which gets a little tricky. * - * This wraps a test in a function that configures all the context, and tears it down afterward. + * This wraps all the context required for testing, and tears it down afterward + * by using `await using`. */ -export function binlogStreamTest( - factory: () => Promise, - test: (context: BinlogStreamTestContext) => Promise -): () => Promise { - return async () => { - const f = await factory(); - const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); - - const connection = await connectionManager.getConnection(); - await clearTestDb(connection); - connection.release(); - const context = new BinlogStreamTestContext(f, connectionManager); - try { - await test(context); - } finally { - await context.dispose(); - } - }; -} - export class BinlogStreamTestContext { private _binlogStream?: BinLogStream; private abortController = new AbortController(); @@ -46,6 +27,16 @@ export class BinlogStreamTestContext { public storage?: SyncRulesBucketStorage; private replicationDone = false; + static async create(factory: () => Promise) { + const f = await factory(); + const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); + + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + connection.release(); + return new BinlogStreamTestContext(f, connectionManager); + } + constructor( public factory: BucketStorageFactory, public connectionManager: MySQLConnectionManager @@ -57,6 +48,10 @@ export class BinlogStreamTestContext { await this.connectionManager.end(); } + [Symbol.asyncDispose]() { + return this.dispose(); + } + get connectionTag() { return this.connectionManager.connectionTag; } From b4c668c422c7c53ce6e1a0169beff73654cc8dee Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 11:52:08 +0200 Subject: [PATCH 13/17] NULLable columns in tests, to avoid auto-updating timestamp in MySQL 5.7 --- modules/module-mysql/test/src/BinLogStream.test.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index eb9669aae..a4d6a0744 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -14,7 +14,7 @@ bucket_definitions: - SELECT id, description FROM "test_data" `; -describe(' Binlog stream - mongodb', { timeout: 20_000 }, function () { +describe('Binlog stream - mongodb', { timeout: 20_000 }, function () { defineBinlogStreamTests(MONGO_STORAGE_FACTORY); }); @@ -219,7 +219,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { `); await connectionManager.query( - `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME NULL, timestamp TIMESTAMP NULL)` ); await context.replicateSnapshot(); From 4f0b80430914f8bebe60f3da9064f49d3d7e5c59 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 12:32:46 +0200 Subject: [PATCH 14/17] Test resuming replication. --- .../test/src/BinLogStream.test.ts | 61 +++++++++++++++---- .../test/src/BinlogStreamUtils.ts | 34 +++++++++-- 2 files changed, 79 insertions(+), 16 deletions(-) diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index a4d6a0744..5ac980cbf 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -1,12 +1,10 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; -import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; -import { BucketStorageFactory, Metrics } from '@powersync/service-core'; +import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js'; +import { Metrics } from '@powersync/service-core'; import { v4 as uuid } from 'uuid'; import { describe, expect, test } from 'vitest'; import { BinlogStreamTestContext } from './BinlogStreamUtils.js'; -type StorageFactory = () => Promise; - const BASIC_SYNC_RULES = ` bucket_definitions: global: @@ -20,7 +18,7 @@ describe('Binlog stream - mongodb', { timeout: 20_000 }, function () { function defineBinlogStreamTests(factory: StorageFactory) { test('Replicate basic values', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` bucket_definitions: @@ -52,7 +50,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { }); test('replicating case sensitive table', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` bucket_definitions: @@ -120,7 +118,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { // }); test('replicating changing primary key', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(BASIC_SYNC_RULES); @@ -160,7 +158,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { }); test('initial sync', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(BASIC_SYNC_RULES); @@ -176,7 +174,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { }); test('snapshot with date values', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` bucket_definitions: @@ -209,7 +207,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { }); test('replication with date values', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(` bucket_definitions: @@ -261,7 +259,7 @@ function defineBinlogStreamTests(factory: StorageFactory) { }); test('table not in sync rules', async () => { - await using context = await BinlogStreamTestContext.create(factory); + await using context = await BinlogStreamTestContext.open(factory); const { connectionManager } = context; await context.updateSyncRules(BASIC_SYNC_RULES); @@ -287,4 +285,45 @@ function defineBinlogStreamTests(factory: StorageFactory) { expect(endRowCount - startRowCount).toEqual(0); expect(endTxCount - startTxCount).toEqual(1); }); + + test('Resume replication', async () => { + const testId1 = uuid(); + const testId2 = uuid(); + { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, description, num FROM "test_data"`); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); + + await context.replicateSnapshot(); + context.startStreaming(); + await connectionManager.query( + `INSERT INTO test_data(id, description, num) VALUES('${testId1}', 'test1', 1152921504606846976)` + ); + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { id: testId1, description: 'test1', num: 1152921504606846976n }) + ]); + } + { + await using context = await BinlogStreamTestContext.open(factory, { doNotClear: true }); + const { connectionManager } = context; + await context.loadActiveSyncRules(); + // Does not actually do a snapshot again - just does the required intialization. + await context.replicateSnapshot(); + context.startStreaming(); + await connectionManager.query(`INSERT INTO test_data(id, description, num) VALUES('${testId2}', 'test2', 0)`); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { id: testId1, description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: testId2, description: 'test2', num: 0n }) + ]); + } + }); } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 902da29bc..5cf3f0ddd 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -12,6 +12,7 @@ import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManag import mysqlPromise from 'mysql2/promise'; import { readExecutedGtid } from '@module/common/read-executed-gtid.js'; import { logger } from '@powersync/lib-services-framework'; +import { StorageFactory } from '@core-tests/util.js'; /** * Tests operating on the binlog stream need to configure the stream and manage asynchronous @@ -27,13 +28,15 @@ export class BinlogStreamTestContext { public storage?: SyncRulesBucketStorage; private replicationDone = false; - static async create(factory: () => Promise) { - const f = await factory(); + static async open(factory: StorageFactory, options?: { doNotClear?: boolean }) { + const f = await factory({ doNotClear: options?.doNotClear }); const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); - const connection = await connectionManager.getConnection(); - await clearTestDb(connection); - connection.release(); + if (!options?.doNotClear) { + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + connection.release(); + } return new BinlogStreamTestContext(f, connectionManager); } @@ -62,6 +65,27 @@ export class BinlogStreamTestContext { return this.storage!; } + async loadNextSyncRules() { + const syncRules = await this.factory.getNextSyncRulesContent(); + if (syncRules == null) { + throw new Error(`Next sync rules not available`); + } + + this.storage = this.factory.getInstance(syncRules); + return this.storage!; + } + + async loadActiveSyncRules() { + const syncRules = await this.factory.getActiveSyncRulesContent(); + if (syncRules == null) { + throw new Error(`Active sync rules not available`); + } + + this.storage = this.factory.getInstance(syncRules); + this.replicationDone = true; + return this.storage!; + } + get binlogStream(): BinLogStream { if (this.storage == null) { throw new Error('updateSyncRules() first'); From 900cb1593166367894b72c67f3c7f07605fa5447 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 12:35:19 +0200 Subject: [PATCH 15/17] Add changesets. --- .changeset/honest-cats-draw.md | 6 ++++++ .changeset/many-shrimps-watch.md | 6 ++++++ 2 files changed, 12 insertions(+) create mode 100644 .changeset/honest-cats-draw.md create mode 100644 .changeset/many-shrimps-watch.md diff --git a/.changeset/honest-cats-draw.md b/.changeset/honest-cats-draw.md new file mode 100644 index 000000000..952442475 --- /dev/null +++ b/.changeset/honest-cats-draw.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix timestamp replication issues for MySQL. diff --git a/.changeset/many-shrimps-watch.md b/.changeset/many-shrimps-watch.md new file mode 100644 index 000000000..bbb901ad9 --- /dev/null +++ b/.changeset/many-shrimps-watch.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix resuming MySQL replication after a restart. From 79a4d5d975b0f2219d33a52cfe4ae04d63b80918 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 1 Jan 2025 13:14:36 +0200 Subject: [PATCH 16/17] Remove debug conditional. --- modules/module-mysql/src/common/mysql-to-sqlite.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index b106c90f2..c1aaeb039 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -110,7 +110,7 @@ export function toSQLiteRow(row: Record, columns: Map Date: Wed, 1 Jan 2025 13:17:45 +0200 Subject: [PATCH 17/17] Remove defunct test. --- .../test/src/mysql-to-sqlite.test.ts | 21 ------------------- 1 file changed, 21 deletions(-) diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 32ae5573f..371bc589c 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -260,27 +260,6 @@ INSERT INTO test_data ( } }); - test.skip('Date types mappings - null', async () => { - await setupTable(); - await connectionManager.query(` - INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) - VALUES(NULL, NULL, NULL, NULL, NULL); - `); - - const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); - const replicatedRows = await getReplicatedRows(); - const expectedResult = { - date_col: null, - datetime_col: null, - timestamp_col: null, - time_col: null, - year_col: null - }; - - expect(databaseRows[0]).toMatchObject(expectedResult); - expect(replicatedRows[0]).toMatchObject(expectedResult); - }); - test('Json types mappings', async () => { await setupTable();