diff --git a/.changeset/honest-cats-draw.md b/.changeset/honest-cats-draw.md new file mode 100644 index 000000000..952442475 --- /dev/null +++ b/.changeset/honest-cats-draw.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix timestamp replication issues for MySQL. diff --git a/.changeset/many-shrimps-watch.md b/.changeset/many-shrimps-watch.md new file mode 100644 index 000000000..bbb901ad9 --- /dev/null +++ b/.changeset/many-shrimps-watch.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-mysql': patch +'@powersync/service-image': patch +--- + +Fix resuming MySQL replication after a restart. diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index 8cc2487d8..c1aaeb039 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -99,6 +99,7 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu } export function toSQLiteRow(row: Record, columns: Map): sync_rules.SqliteRow { + let result: sync_rules.DatabaseInputRow = {}; for (let key in row) { // We are very much expecting the column to be there const column = columns.get(key)!; @@ -107,17 +108,35 @@ export function toSQLiteRow(row: Record, columns: Map, columns: Map, columns: Map('START TRANSACTION'); + await promiseConnection.query(`SET time_zone = '+00:00'`); + const sourceTables = this.syncRules.getSourceTables(); await this.storage.startBatch( { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, @@ -291,38 +295,32 @@ AND table_type = 'BASE TABLE';`, logger.info(`Replicating ${table.qualifiedName}`); // TODO count rows and log progress at certain batch sizes - let columns: Map; - return new Promise((resolve, reject) => { - // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query - connection - .query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${table.schema}.${table.table}`) - .on('error', (err) => { - reject(err); - }) - .on('fields', (fields: FieldPacket[]) => { - // Map the columns and their types - columns = toColumnDescriptors(fields); - }) - .on('result', async (row) => { - connection.pause(); - const record = common.toSQLiteRow(row, columns); - - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) - }); - connection.resume(); - Metrics.getInstance().rows_replicated_total.add(1); - }) - .on('end', async function () { - await batch.flush(); - resolve(); - }); + // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query + const query = connection.query(`SELECT /*+ MAX_EXECUTION_TIME(0) */ * FROM ${escapeMysqlTableName(table)}`); + const stream = query.stream(); + + let columns: Map | undefined = undefined; + stream.on('fields', (fields: FieldPacket[]) => { + // Map the columns and their types + columns = toColumnDescriptors(fields); }); + + for await (let row of query.stream()) { + if (columns == null) { + throw new Error(`No 'fields' event emitted`); + } + const record = common.toSQLiteRow(row, columns!); + + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } + await batch.flush(); } async replicate() { @@ -350,6 +348,18 @@ AND table_type = 'BASE TABLE';`, const initialReplicationCompleted = await this.checkInitialReplicated(); if (!initialReplicationCompleted) { await this.startInitialReplication(); + } else { + // We need to find the existing tables, to populate our table cache. + // This is needed for includeSchema to work correctly. + const sourceTables = this.syncRules.getSourceTables(); + await this.storage.startBatch( + { zeroLSN: ReplicatedGTID.ZERO.comparable, defaultSchema: this.defaultSchema, storeCurrentData: true }, + async (batch) => { + for (let tablePattern of sourceTables) { + await this.getQualifiedTableNames(batch, tablePattern); + } + } + ); } } @@ -578,7 +588,7 @@ AND table_type = 'BASE TABLE';`, beforeReplicaId: beforeUpdated ? getUuidReplicaIdentityBson(beforeUpdated, payload.sourceTable.replicaIdColumns) : undefined, - after: common.toSQLiteRow(payload.data, payload.columns), + after: after, afterReplicaId: getUuidReplicaIdentityBson(after, payload.sourceTable.replicaIdColumns) }); diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index 3693b9ce2..4548d8381 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -62,7 +62,14 @@ export class MySQLConnectionManager { * @param params */ async query(query: string, params?: any[]): Promise<[RowDataPacket[], FieldPacket[]]> { - return this.promisePool.query(query, params); + let connection: mysqlPromise.PoolConnection | undefined; + try { + connection = await this.promisePool.getConnection(); + await connection.query(`SET time_zone = '+00:00'`); + return connection.query(query, params); + } finally { + connection?.release(); + } } /** diff --git a/modules/module-mysql/src/utils/mysql-utils.ts b/modules/module-mysql/src/utils/mysql-utils.ts index a2279c234..f1e831273 100644 --- a/modules/module-mysql/src/utils/mysql-utils.ts +++ b/modules/module-mysql/src/utils/mysql-utils.ts @@ -3,6 +3,7 @@ import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import * as types from '../types/types.js'; import { coerce, gte } from 'semver'; +import { SourceTable } from '@powersync/service-core'; export type RetriedQueryOptions = { connection: mysqlPromise.Connection; @@ -82,3 +83,7 @@ export function isVersionAtLeast(version: string, minimumVersion: string): boole return gte(coercedVersion!, coercedMinimumVersion!, { loose: true }); } + +export function escapeMysqlTableName(table: SourceTable): string { + return `\`${table.schema.replaceAll('`', '``')}\`.\`${table.table.replaceAll('`', '``')}\``; +} diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts index 44240d461..5ac980cbf 100644 --- a/modules/module-mysql/test/src/BinLogStream.test.ts +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -1,11 +1,9 @@ import { putOp, removeOp } from '@core-tests/stream_utils.js'; -import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; -import { BucketStorageFactory, Metrics } from '@powersync/service-core'; -import { describe, expect, test } from 'vitest'; -import { binlogStreamTest } from './BinlogStreamUtils.js'; +import { MONGO_STORAGE_FACTORY, StorageFactory } from '@core-tests/util.js'; +import { Metrics } from '@powersync/service-core'; import { v4 as uuid } from 'uuid'; - -type StorageFactory = () => Promise; +import { describe, expect, test } from 'vitest'; +import { BinlogStreamTestContext } from './BinlogStreamUtils.js'; const BASIC_SYNC_RULES = ` bucket_definitions: @@ -14,92 +12,81 @@ bucket_definitions: - SELECT id, description FROM "test_data" `; -describe( - ' Binlog stream - mongodb', - function () { - defineBinlogStreamTests(MONGO_STORAGE_FACTORY); - }, - { timeout: 20_000 } -); +describe('Binlog stream - mongodb', { timeout: 20_000 }, function () { + defineBinlogStreamTests(MONGO_STORAGE_FACTORY); +}); function defineBinlogStreamTests(factory: StorageFactory) { - test( - 'Replicate basic values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + test('Replicate basic values', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT id, description, num FROM "test_data"`); - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); - - await context.replicateSnapshot(); - - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - - context.startStreaming(); - const testId = uuid(); - await connectionManager.query( - `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` - ); - const data = await context.getBucketData('global[]'); - - expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); - - test( - 'replicating case sensitive table', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); + + await context.replicateSnapshot(); + + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + const testId = uuid(); + await connectionManager.query( + `INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)` + ); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }); + + test('replicating case sensitive table', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT id, description FROM "test_DATA" `); - await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); + await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - context.startStreaming(); + context.startStreaming(); - const testId = uuid(); - await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`); + const testId = uuid(); + await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`); - const data = await context.getBucketData('global[]'); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }); - expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); - - // TODO: Not supported yet - // test( - // 'replicating TRUNCATE', - // binlogStreamTest(factory, async (context) => { - // const { connectionManager } = context; - // const syncRuleContent = ` + // TODO: Not supported yet + // test('replicating TRUNCATE', async () => { + // await using context = await BinlogStreamTestContext.create(factory); + // const { connectionManager } = context; + // const syncRuleContent = ` // bucket_definitions: // global: // data: @@ -108,199 +95,235 @@ function defineBinlogStreamTests(factory: StorageFactory) { // parameters: SELECT id FROM test_data WHERE id = token_parameters.user_id // data: [] // `; - // await context.updateSyncRules(syncRuleContent); - // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); - // await connectionManager.query( - // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` - // ); - // - // await context.replicateSnapshot(); - // context.startStreaming(); - // - // const [{ test_id }] = pgwireRows( - // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) - // ); - // await connectionManager.query(`TRUNCATE test_data`); - // - // const data = await context.getBucketData('global[]'); - // - // expect(data).toMatchObject([ - // putOp('test_data', { id: test_id, description: 'test1' }), - // removeOp('test_data', test_id) - // ]); - // }) + // await context.updateSyncRules(syncRuleContent); + // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); + // await connectionManager.query( + // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` // ); - test( - 'replicating changing primary key', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); - - await context.replicateSnapshot(); - context.startStreaming(); - - const testId1 = uuid(); - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`); - - const testId2 = uuid(); - await connectionManager.query( - `UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'` - ); - - // This update may fail replicating with: - // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} - await connectionManager.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${testId2}'`); + // await context.replicateSnapshot(); + // context.startStreaming(); - // Re-use old id again - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}', 'test1b')`); - await connectionManager.query(`UPDATE test_data SET description = 'test1c' WHERE id = '${testId1}'`); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - // Initial insert - putOp('test_data', { id: testId1, description: 'test1' }), - // Update id, then description - removeOp('test_data', testId1), - putOp('test_data', { id: testId2, description: 'test2a' }), - putOp('test_data', { id: testId2, description: 'test2b' }), - // Re-use old id - putOp('test_data', { id: testId1, description: 'test1b' }), - putOp('test_data', { id: testId1, description: 'test1c' }) - ]); - }) - ); - - test( - 'initial sync', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); - - const testId = uuid(); - await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); - - await context.replicateSnapshot(); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); - }) - ); - - test( - 'snapshot with date values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + // const [{ test_id }] = pgwireRows( + // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) + // ); + // await connectionManager.query(`TRUNCATE test_data`); + + // const data = await context.getBucketData('global[]'); + + // expect(data).toMatchObject([ + // putOp('test_data', { id: test_id, description: 'test1' }), + // removeOp('test_data', test_id) + // ]); + // }); + + test('replicating changing primary key', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); + + await context.replicateSnapshot(); + context.startStreaming(); + + const testId1 = uuid(); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`); + + const testId2 = uuid(); + await connectionManager.query( + `UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'` + ); + + // This update may fail replicating with: + // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} + await connectionManager.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${testId2}'`); + + // Re-use old id again + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}', 'test1b')`); + await connectionManager.query(`UPDATE test_data SET description = 'test1c' WHERE id = '${testId1}'`); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + // Initial insert + putOp('test_data', { id: testId1, description: 'test1' }), + // Update id, then description + removeOp('test_data', testId1), + putOp('test_data', { id: testId2, description: 'test2a' }), + putOp('test_data', { id: testId2, description: 'test2b' }), + // Re-use old id + putOp('test_data', { id: testId1, description: 'test1b' }), + putOp('test_data', { id: testId1, description: 'test1c' }) + ]); + }); + + test('initial sync', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`); + + const testId = uuid(); + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`); + + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); + }); + + test('snapshot with date values', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT * FROM "test_data" `); - await connectionManager.query( - `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` - ); + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + ); - const testId = uuid(); - await connectionManager.query(` + const testId = uuid(); + await connectionManager.query(` INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') `); - await context.replicateSnapshot(); - - const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([ - putOp('test_data', { - id: testId, - description: 'testDates', - date: `2023-03-06`, - datetime: '2023-03-06T15:47:00.000Z', - timestamp: '2023-03-06T15:47:00.000Z' - }) - ]); - }) - ); - - test( - 'replication with date values', - binlogStreamTest(factory, async (context) => { - const { connectionManager } = context; - await context.updateSyncRules(` + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + }); + + test('replication with date values', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` bucket_definitions: global: data: - SELECT * FROM "test_data" `); - await connectionManager.query( - `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` - ); + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME NULL, timestamp TIMESTAMP NULL)` + ); - await context.replicateSnapshot(); + await context.replicateSnapshot(); - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - context.startStreaming(); + context.startStreaming(); - const testId = uuid(); - await connectionManager.query(` + const testId = uuid(); + await connectionManager.query(` INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') `); + await connectionManager.query(`UPDATE test_data SET description = ? WHERE id = ?`, ['testUpdated', testId]); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }), + putOp('test_data', { + id: testId, + description: 'testUpdated', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(2); + expect(endTxCount - startTxCount).toEqual(2); + }); + + test('table not in sync rules', async () => { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`); + + await context.replicateSnapshot(); + + const startRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + // There was a transaction, but we should not replicate any actual data + expect(endRowCount - startRowCount).toEqual(0); + expect(endTxCount - startTxCount).toEqual(1); + }); + + test('Resume replication', async () => { + const testId1 = uuid(); + const testId2 = uuid(); + { + await using context = await BinlogStreamTestContext.open(factory); + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, description, num FROM "test_data"`); + + await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`); + await context.replicateSnapshot(); + context.startStreaming(); + await connectionManager.query( + `INSERT INTO test_data(id, description, num) VALUES('${testId1}', 'test1', 1152921504606846976)` + ); const data = await context.getBucketData('global[]'); expect(data).toMatchObject([ - putOp('test_data', { - id: testId, - description: 'testDates', - date: `2023-03-06`, - datetime: '2023-03-06T15:47:00.000Z', - timestamp: '2023-03-06T15:47:00.000Z' - }) + putOp('test_data', { id: testId1, description: 'test1', num: 1152921504606846976n }) ]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); - - test( - 'table not in sync rules', - binlogStreamTest(factory, async (context) => { + } + { + await using context = await BinlogStreamTestContext.open(factory, { doNotClear: true }); const { connectionManager } = context; - await context.updateSyncRules(BASIC_SYNC_RULES); - - await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`); - + await context.loadActiveSyncRules(); + // Does not actually do a snapshot again - just does the required intialization. await context.replicateSnapshot(); - - const startRowCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const startTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - context.startStreaming(); - - await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`); + await connectionManager.query(`INSERT INTO test_data(id, description, num) VALUES('${testId2}', 'test2', 0)`); const data = await context.getBucketData('global[]'); - expect(data).toMatchObject([]); - const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; - const endTxCount = - (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; - - // There was a transaction, but we should not replicate any actual data - expect(endRowCount - startRowCount).toEqual(0); - expect(endTxCount - startTxCount).toEqual(1); - }) - ); + expect(data).toMatchObject([ + putOp('test_data', { id: testId1, description: 'test1', num: 1152921504606846976n }), + putOp('test_data', { id: testId2, description: 'test2', num: 0n }) + ]); + } + }); } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index c08f22c60..5cf3f0ddd 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -12,33 +12,15 @@ import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManag import mysqlPromise from 'mysql2/promise'; import { readExecutedGtid } from '@module/common/read-executed-gtid.js'; import { logger } from '@powersync/lib-services-framework'; +import { StorageFactory } from '@core-tests/util.js'; /** * Tests operating on the binlog stream need to configure the stream and manage asynchronous * replication, which gets a little tricky. * - * This wraps a test in a function that configures all the context, and tears it down afterward. + * This wraps all the context required for testing, and tears it down afterward + * by using `await using`. */ -export function binlogStreamTest( - factory: () => Promise, - test: (context: BinlogStreamTestContext) => Promise -): () => Promise { - return async () => { - const f = await factory(); - const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); - - const connection = await connectionManager.getConnection(); - await clearTestDb(connection); - connection.release(); - const context = new BinlogStreamTestContext(f, connectionManager); - try { - await test(context); - } finally { - await context.dispose(); - } - }; -} - export class BinlogStreamTestContext { private _binlogStream?: BinLogStream; private abortController = new AbortController(); @@ -46,6 +28,18 @@ export class BinlogStreamTestContext { public storage?: SyncRulesBucketStorage; private replicationDone = false; + static async open(factory: StorageFactory, options?: { doNotClear?: boolean }) { + const f = await factory({ doNotClear: options?.doNotClear }); + const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); + + if (!options?.doNotClear) { + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + connection.release(); + } + return new BinlogStreamTestContext(f, connectionManager); + } + constructor( public factory: BucketStorageFactory, public connectionManager: MySQLConnectionManager @@ -57,6 +51,10 @@ export class BinlogStreamTestContext { await this.connectionManager.end(); } + [Symbol.asyncDispose]() { + return this.dispose(); + } + get connectionTag() { return this.connectionManager.connectionTag; } @@ -67,6 +65,27 @@ export class BinlogStreamTestContext { return this.storage!; } + async loadNextSyncRules() { + const syncRules = await this.factory.getNextSyncRulesContent(); + if (syncRules == null) { + throw new Error(`Next sync rules not available`); + } + + this.storage = this.factory.getInstance(syncRules); + return this.storage!; + } + + async loadActiveSyncRules() { + const syncRules = await this.factory.getActiveSyncRulesContent(); + if (syncRules == null) { + throw new Error(`Active sync rules not available`); + } + + this.storage = this.factory.getInstance(syncRules); + this.replicationDone = true; + return this.storage!; + } + get binlogStream(): BinLogStream { if (this.storage == null) { throw new Error('updateSyncRules() first'); diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 9cebdccd2..371bc589c 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -200,6 +200,7 @@ INSERT INTO test_data ( test('Date types mappings', async () => { await setupTable(); + // Timezone offset is set on the pool to +00:00 await connectionManager.query(` INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) VALUES('2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47', '15:47:00', '2023'); @@ -222,23 +223,40 @@ INSERT INTO test_data ( test('Date types edge cases mappings', async () => { await setupTable(); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); - await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); - await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); - await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); - - const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); - const replicatedRows = await getReplicatedRows(4); - const expectedResults = [ - { timestamp_col: '1970-01-01T00:00:01.000Z' }, - { timestamp_col: '2038-01-19T03:14:07.499Z' }, - { datetime_col: '1000-01-01T00:00:00.000Z' }, - { datetime_col: '9999-12-31T23:59:59.499Z' } - ]; - - for (let i = 0; i < expectedResults.length; i++) { - expect(databaseRows[i]).toMatchObject(expectedResults[i]); - expect(replicatedRows[i]).toMatchObject(expectedResults[i]); + const connection = await connectionManager.getConnection(); + try { + // Disable strict mode, to allow dates such as '2024-00-00'. + await connection.query(`SET SESSION sql_mode=''`); + await connection.query(`SET SESSION time_zone='+00:00'`); + + await connection.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); + await connection.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('0000-00-00 00:00:00')`); + await connection.query(`INSERT INTO test_data(datetime_col) VALUES('2024-00-00 00:00:00')`); + // TODO: This has a mismatch between querying directly and with Zongji. + // await connection.query(`INSERT INTO test_data(date_col) VALUES('2024-00-00')`); + + const expectedResults = [ + { timestamp_col: '1970-01-01T00:00:01.000Z' }, + { timestamp_col: '2038-01-19T03:14:07.499Z' }, + { datetime_col: '1000-01-01T00:00:00.000Z' }, + { datetime_col: '9999-12-31T23:59:59.499Z' }, + { datetime_col: null }, + { datetime_col: null } + // { date_col: '2023-11-30' } or { date_col: null }? + ]; + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(expectedResults.length); + + for (let i = 0; i < expectedResults.length; i++) { + expect(databaseRows[i]).toMatchObject(expectedResults[i]); + expect(replicatedRows[i]).toMatchObject(expectedResults[i]); + } + } finally { + connection.release; } }); @@ -282,8 +300,7 @@ async function getReplicatedRows(expectedTransactionsCount?: number): Promise((resolve, reject) => { diff --git a/packages/service-core/src/storage/SourceTable.ts b/packages/service-core/src/storage/SourceTable.ts index f514f9081..a2c2f65e7 100644 --- a/packages/service-core/src/storage/SourceTable.ts +++ b/packages/service-core/src/storage/SourceTable.ts @@ -48,6 +48,8 @@ export class SourceTable { } /** + * Use for postgres only. + * * Usage: db.query({statement: `SELECT $1::regclass`, params: [{type: 'varchar', value: table.qualifiedName}]}) */ get qualifiedName() { @@ -55,6 +57,8 @@ export class SourceTable { } /** + * Use for postgres and logs only. + * * Usage: db.query(`SELECT * FROM ${table.escapedIdentifier}`) */ get escapedIdentifier() {