diff --git a/.changeset/lemon-terms-play.md b/.changeset/lemon-terms-play.md new file mode 100644 index 000000000..218d1ebff --- /dev/null +++ b/.changeset/lemon-terms-play.md @@ -0,0 +1,8 @@ +--- +'@powersync/service-module-mysql': minor +--- + +Generate random serverId based on syncrule id for MySQL replication client +Consolidated type mappings between snapshot and replicated values +Enabled MySQL tests in CI + diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 3705cd600..0edb07512 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -143,3 +143,68 @@ jobs: - name: Test run: pnpm test --filter='./modules/module-postgres' + + run-mysql-tests: + name: MySQL Test + runs-on: ubuntu-latest + needs: run-core-tests + + strategy: + fail-fast: false + matrix: + mysql-version: [ 8.0, 8.4 ] + + steps: + - uses: actions/checkout@v4 + + - name: Start MySQL + run: | + docker run \ + --name MySQLTestDatabase \ + -e MYSQL_ROOT_PASSWORD=mypassword \ + -e MYSQL_DATABASE=mydatabase \ + -p 3306:3306 \ + -d mysql:${{ matrix.mysql-version }} \ + --log-bin=/var/lib/mysql/mysql-bin.log \ + --gtid_mode=ON \ + --enforce_gtid_consistency=ON + + - name: Start MongoDB + uses: supercharge/mongodb-github-action@1.8.0 + with: + mongodb-version: '6.0' + mongodb-replica-set: test-rs + + - name: Setup NodeJS + uses: actions/setup-node@v4 + with: + node-version-file: '.nvmrc' + + - uses: pnpm/action-setup@v4 + name: Install pnpm + with: + version: 9 + run_install: false + + - name: Get pnpm store directory + shell: bash + run: | + echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV + + - uses: actions/cache@v3 + name: Setup pnpm cache + with: + path: ${{ env.STORE_PATH }} + key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }} + restore-keys: | + ${{ runner.os }}-pnpm-store- + + - name: Install dependencies + run: pnpm install + + - name: Build + shell: bash + run: pnpm build + + - name: Test + run: pnpm test --filter='./modules/module-mysql' diff --git a/modules/module-mysql/dev/config/sync_rules.yaml b/modules/module-mysql/dev/config/sync_rules.yaml index eb74a7198..5c0eb9932 100644 --- a/modules/module-mysql/dev/config/sync_rules.yaml +++ b/modules/module-mysql/dev/config/sync_rules.yaml @@ -3,10 +3,8 @@ # Note that changes to this file are not watched. # The service needs to be restarted for changes to take effect. -# Note that specifying the schema is currently required due to the default -# schema being specified as `public`, but in mysql the schema is the database name bucket_definitions: global: data: - - SELECT * FROM mydatabase.lists - - SELECT * FROM mydatabase.todos + - SELECT * FROM lists + - SELECT * FROM todos diff --git a/modules/module-mysql/package.json b/modules/module-mysql/package.json index 4d16d74a8..18611a52c 100644 --- a/modules/module-mysql/package.json +++ b/modules/module-mysql/package.json @@ -32,7 +32,8 @@ "@powersync/service-core": "workspace:*", "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", - "@powersync/mysql-zongji": "0.0.0-dev-20241023144335", + "@powersync/service-jsonbig": "workspace:*", + "@powersync/mysql-zongji": "0.0.0-dev-20241031142605", "semver": "^7.5.4", "async": "^3.2.4", "mysql2": "^3.11.0", diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index 29fbfe0fc..8cc2487d8 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -2,16 +2,155 @@ import * as sync_rules from '@powersync/service-sync-rules'; import { ExpressionType } from '@powersync/service-sync-rules'; import { ColumnDescriptor } from '@powersync/service-core'; import mysql from 'mysql2'; +import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { ColumnDefinition, TableMapEntry } from '@powersync/mysql-zongji'; -export function toSQLiteRow(row: Record, columns?: Map): sync_rules.SqliteRow { +export enum ADDITIONAL_MYSQL_TYPES { + DATETIME2 = 18, + TIMESTAMP2 = 17, + BINARY = 100, + VARBINARY = 101, + TEXT = 102 +} + +export const MySQLTypesMap: { [key: number]: string } = {}; +for (const [name, code] of Object.entries(mysql.Types)) { + MySQLTypesMap[code as number] = name; +} +for (const [name, code] of Object.entries(ADDITIONAL_MYSQL_TYPES)) { + MySQLTypesMap[code as number] = name; +} + +export function toColumnDescriptors(columns: mysql.FieldPacket[]): Map; +export function toColumnDescriptors(tableMap: TableMapEntry): Map; + +export function toColumnDescriptors(columns: mysql.FieldPacket[] | TableMapEntry): Map { + const columnMap = new Map(); + if (Array.isArray(columns)) { + for (const column of columns) { + columnMap.set(column.name, toColumnDescriptorFromFieldPacket(column)); + } + } else { + for (const column of columns.columns) { + columnMap.set(column.name, toColumnDescriptorFromDefinition(column)); + } + } + + return columnMap; +} + +export function toColumnDescriptorFromFieldPacket(column: mysql.FieldPacket): ColumnDescriptor { + let typeId = column.type!; + const BINARY_FLAG = 128; + const MYSQL_ENUM_FLAG = 256; + const MYSQL_SET_FLAG = 2048; + + switch (column.type) { + case mysql.Types.STRING: + if (((column.flags as number) & BINARY_FLAG) !== 0) { + typeId = ADDITIONAL_MYSQL_TYPES.BINARY; + } else if (((column.flags as number) & MYSQL_ENUM_FLAG) !== 0) { + typeId = mysql.Types.ENUM; + } else if (((column.flags as number) & MYSQL_SET_FLAG) !== 0) { + typeId = mysql.Types.SET; + } + break; + + case mysql.Types.VAR_STRING: + typeId = ((column.flags as number) & BINARY_FLAG) !== 0 ? ADDITIONAL_MYSQL_TYPES.VARBINARY : column.type; + break; + case mysql.Types.BLOB: + typeId = ((column.flags as number) & BINARY_FLAG) === 0 ? ADDITIONAL_MYSQL_TYPES.TEXT : column.type; + break; + } + + const columnType = MySQLTypesMap[typeId]; + + return { + name: column.name, + type: columnType, + typeId: typeId + }; +} + +export function toColumnDescriptorFromDefinition(column: ColumnDefinition): ColumnDescriptor { + let typeId = column.type; + + switch (column.type) { + case mysql.Types.STRING: + typeId = !column.charset ? ADDITIONAL_MYSQL_TYPES.BINARY : column.type; + break; + case mysql.Types.VAR_STRING: + case mysql.Types.VARCHAR: + typeId = !column.charset ? ADDITIONAL_MYSQL_TYPES.VARBINARY : column.type; + break; + case mysql.Types.BLOB: + typeId = column.charset ? ADDITIONAL_MYSQL_TYPES.TEXT : column.type; + break; + } + + const columnType = MySQLTypesMap[typeId]; + + return { + name: column.name, + type: columnType, + typeId: typeId + }; +} + +export function toSQLiteRow(row: Record, columns: Map): sync_rules.SqliteRow { for (let key in row) { - if (row[key] instanceof Date) { - const column = columns?.get(key); - if (column?.typeId == mysql.Types.DATE) { - // Only parse the date part - row[key] = row[key].toISOString().split('T')[0]; - } else { - row[key] = row[key].toISOString(); + // We are very much expecting the column to be there + const column = columns.get(key)!; + + if (row[key] !== null) { + switch (column.typeId) { + case mysql.Types.DATE: + // Only parse the date part + row[key] = row[key].toISOString().split('T')[0]; + break; + case mysql.Types.DATETIME: + case ADDITIONAL_MYSQL_TYPES.DATETIME2: + case mysql.Types.TIMESTAMP: + case ADDITIONAL_MYSQL_TYPES.TIMESTAMP2: + row[key] = row[key].toISOString(); + break; + case mysql.Types.JSON: + if (typeof row[key] === 'string') { + row[key] = new JsonContainer(row[key]); + } + break; + case mysql.Types.BIT: + case mysql.Types.BLOB: + case mysql.Types.TINY_BLOB: + case mysql.Types.MEDIUM_BLOB: + case mysql.Types.LONG_BLOB: + case ADDITIONAL_MYSQL_TYPES.BINARY: + case ADDITIONAL_MYSQL_TYPES.VARBINARY: + row[key] = new Uint8Array(Object.values(row[key])); + break; + case mysql.Types.LONGLONG: + if (typeof row[key] === 'string') { + row[key] = BigInt(row[key]); + } else if (typeof row[key] === 'number') { + // Zongji returns BIGINT as a number when it can be represented as a number + row[key] = BigInt(row[key]); + } + break; + case mysql.Types.TINY: + case mysql.Types.SHORT: + case mysql.Types.LONG: + case mysql.Types.INT24: + // Handle all integer values a BigInt + if (typeof row[key] === 'number') { + row[key] = BigInt(row[key]); + } + break; + case mysql.Types.SET: + // Convert to JSON array from string + const values = row[key].split(','); + row[key] = JSONBig.stringify(values); + break; } } } diff --git a/modules/module-mysql/src/common/read-executed-gtid.ts b/modules/module-mysql/src/common/read-executed-gtid.ts index 7f224c5b9..801fba011 100644 --- a/modules/module-mysql/src/common/read-executed-gtid.ts +++ b/modules/module-mysql/src/common/read-executed-gtid.ts @@ -4,7 +4,6 @@ import { gte } from 'semver'; import { ReplicatedGTID } from './ReplicatedGTID.js'; import { getMySQLVersion } from './check-source-configuration.js'; -import { logger } from '@powersync/lib-services-framework'; /** * Gets the current master HEAD GTID @@ -33,8 +32,6 @@ export async function readExecutedGtid(connection: mysqlPromise.Connection): Pro offset: parseInt(binlogStatus.Position) }; - logger.info('Succesfully read executed GTID', { position }); - return new ReplicatedGTID({ // The head always points to the next position to start replication from position, diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index ad805bf99..a7acb8f32 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -9,9 +9,9 @@ import { BinLogEvent, StartOptions, TableMapEntry } from '@powersync/mysql-zongj import * as common from '../common/common-index.js'; import * as zongji_utils from './zongji/zongji-utils.js'; import { MySQLConnectionManager } from './MySQLConnectionManager.js'; -import { isBinlogStillAvailable, ReplicatedGTID } from '../common/common-index.js'; +import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js'; import mysqlPromise from 'mysql2/promise'; -import { MySQLTypesMap } from '../utils/mysql_utils.js'; +import { createRandomServerId } from '../utils/mysql_utils.js'; export interface BinLogStreamOptions { connections: MySQLConnectionManager; @@ -221,7 +221,13 @@ AND table_type = 'BASE TABLE';`, // Check if the binlog is still available. If it isn't we need to snapshot again. const connection = await this.connections.getConnection(); try { - return await isBinlogStillAvailable(connection, lastKnowGTID.position.filename); + const isAvailable = await isBinlogStillAvailable(connection, lastKnowGTID.position.filename); + if (!isAvailable) { + logger.info( + `Binlog file ${lastKnowGTID.position.filename} is no longer available, starting initial replication again.` + ); + } + return isAvailable; } finally { connection.release(); } @@ -245,7 +251,7 @@ AND table_type = 'BASE TABLE';`, const connection = await this.connections.getStreamingConnection(); const promiseConnection = (connection as mysql.Connection).promise(); const headGTID = await common.readExecutedGtid(promiseConnection); - logger.info(`Using snapshot checkpoint GTID:: '${headGTID}'`); + logger.info(`Using snapshot checkpoint GTID: '${headGTID}'`); try { logger.info(`Starting initial replication`); await promiseConnection.query( @@ -285,7 +291,7 @@ AND table_type = 'BASE TABLE';`, logger.info(`Replicating ${table.qualifiedName}`); // TODO count rows and log progress at certain batch sizes - const columns = new Map(); + let columns: Map; return new Promise((resolve, reject) => { // MAX_EXECUTION_TIME(0) hint disables execution timeout for this query connection @@ -295,10 +301,7 @@ AND table_type = 'BASE TABLE';`, }) .on('fields', (fields: FieldPacket[]) => { // Map the columns and their types - fields.forEach((field) => { - const columnType = MySQLTypesMap[field.type as number]; - columns.set(field.name, { name: field.name, type: columnType, typeId: field.type }); - }); + columns = toColumnDescriptors(fields); }) .on('result', async (row) => { connection.pause(); @@ -363,10 +366,14 @@ AND table_type = 'BASE TABLE';`, async streamChanges() { // Auto-activate as soon as initial replication is done await this.storage.autoActivate(); + const serverId = createRandomServerId(this.storage.group_id); + logger.info(`Starting replication. Created replica client with serverId:${serverId}`); const connection = await this.connections.getConnection(); const { checkpoint_lsn } = await this.storage.getStatus(); - logger.info(`Last known LSN from storage: ${checkpoint_lsn}`); + if (checkpoint_lsn) { + logger.info(`Existing checkpoint found: ${checkpoint_lsn}`); + } const fromGTID = checkpoint_lsn ? common.ReplicatedGTID.fromSerialized(checkpoint_lsn) @@ -447,7 +454,7 @@ AND table_type = 'BASE TABLE';`, zongji.on('binlog', (evt: BinLogEvent) => { if (!this.stopped) { - logger.info(`Pushing Binlog event ${evt.getEventName()}`); + logger.info(`Received Binlog event:${evt.getEventName()}`); queue.push(evt); } else { logger.info(`Replication is busy stopping, ignoring event ${evt.getEventName()}`); @@ -458,16 +465,18 @@ AND table_type = 'BASE TABLE';`, // Powersync is shutting down, don't start replicating return; } + + logger.info(`Reading binlog from: ${binLogPositionState.filename}:${binLogPositionState.offset}`); + // Only listen for changes to tables in the sync rules const includedTables = [...this.tableCache.values()].map((table) => table.table); - logger.info(`Starting replication from ${binLogPositionState.filename}:${binLogPositionState.offset}`); zongji.start({ includeEvents: ['tablemap', 'writerows', 'updaterows', 'deleterows', 'xid', 'rotate', 'gtidlog'], excludeEvents: [], includeSchema: { [this.defaultSchema]: includedTables }, filename: binLogPositionState.filename, position: binLogPositionState.offset, - serverId: this.storage.group_id + serverId: serverId } satisfies StartOptions); // Forever young @@ -516,10 +525,7 @@ AND table_type = 'BASE TABLE';`, tableEntry: TableMapEntry; } ): Promise { - const columns = new Map(); - msg.tableEntry.columns.forEach((column) => { - columns.set(column.name, { name: column.name, typeId: column.type }); - }); + const columns = toColumnDescriptors(msg.tableEntry); for (const [index, row] of msg.data.entries()) { await this.writeChange(batch, { @@ -560,8 +566,10 @@ AND table_type = 'BASE TABLE';`, Metrics.getInstance().rows_replicated_total.add(1); // "before" may be null if the replica id columns are unchanged // It's fine to treat that the same as an insert. - const beforeUpdated = payload.previous_data ? common.toSQLiteRow(payload.previous_data) : undefined; - const after = common.toSQLiteRow(payload.data); + const beforeUpdated = payload.previous_data + ? common.toSQLiteRow(payload.previous_data, payload.columns) + : undefined; + const after = common.toSQLiteRow(payload.data, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.UPDATE, @@ -570,13 +578,13 @@ AND table_type = 'BASE TABLE';`, beforeReplicaId: beforeUpdated ? getUuidReplicaIdentityBson(beforeUpdated, payload.sourceTable.replicaIdColumns) : undefined, - after: common.toSQLiteRow(payload.data), + after: common.toSQLiteRow(payload.data, payload.columns), afterReplicaId: getUuidReplicaIdentityBson(after, payload.sourceTable.replicaIdColumns) }); case storage.SaveOperationTag.DELETE: Metrics.getInstance().rows_replicated_total.add(1); - const beforeDeleted = common.toSQLiteRow(payload.data); + const beforeDeleted = common.toSQLiteRow(payload.data, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.DELETE, diff --git a/modules/module-mysql/src/replication/MySQLConnectionManager.ts b/modules/module-mysql/src/replication/MySQLConnectionManager.ts index b69ddb5ad..fd8cef605 100644 --- a/modules/module-mysql/src/replication/MySQLConnectionManager.ts +++ b/modules/module-mysql/src/replication/MySQLConnectionManager.ts @@ -1,6 +1,6 @@ import { NormalizedMySQLConnectionConfig } from '../types/types.js'; import mysqlPromise from 'mysql2/promise'; -import mysql, { RowDataPacket } from 'mysql2'; +import mysql, { FieldPacket, RowDataPacket } from 'mysql2'; import * as mysql_utils from '../utils/mysql_utils.js'; import ZongJi from '@powersync/mysql-zongji'; import { logger } from '@powersync/lib-services-framework'; @@ -61,7 +61,7 @@ export class MySQLConnectionManager { * @param query * @param params */ - async query(query: string, params?: any[]) { + async query(query: string, params?: any[]): Promise<[RowDataPacket[], FieldPacket[]]> { return this.promisePool.query(query, params); } diff --git a/modules/module-mysql/src/utils/mysql_utils.ts b/modules/module-mysql/src/utils/mysql_utils.ts index 99741b92a..2e65d47c2 100644 --- a/modules/module-mysql/src/utils/mysql_utils.ts +++ b/modules/module-mysql/src/utils/mysql_utils.ts @@ -3,11 +3,6 @@ import mysql from 'mysql2'; import mysqlPromise from 'mysql2/promise'; import * as types from '../types/types.js'; -export const MySQLTypesMap: { [key: number]: string } = {}; -for (const [name, code] of Object.entries(mysql.Types)) { - MySQLTypesMap[code as number] = name; -} - export type RetriedQueryOptions = { connection: mysqlPromise.Connection; query: string; @@ -47,7 +42,21 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option database: config.database, ssl: hasSSLOptions ? sslOptions : undefined, supportBigNumbers: true, + decimalNumbers: true, timezone: 'Z', // Ensure no auto timezone manipulation of the dates occur + jsonStrings: true, // Return JSON columns as strings ...(options || {}) }); } + +/** + * Return a random server id for a given sync rule id. + * Expected format is: 00 + * The max value for server id in MySQL is 2^32 - 1. + * We use the GTID format to keep track of our position in the binlog, no state is kept by the MySQL server, therefore + * it is ok to use a randomised server id every time. + * @param syncRuleId + */ +export function createRandomServerId(syncRuleId: number): number { + return Number.parseInt(`${syncRuleId}00${Math.floor(Math.random() * 10000)}`); +} diff --git a/modules/module-mysql/test/src/BinLogStream.test.ts b/modules/module-mysql/test/src/BinLogStream.test.ts new file mode 100644 index 000000000..7aa19cdd4 --- /dev/null +++ b/modules/module-mysql/test/src/BinLogStream.test.ts @@ -0,0 +1,332 @@ +import { putOp, removeOp } from '@core-tests/stream_utils.js'; +import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js'; +import { BucketStorageFactory, Metrics } from '@powersync/service-core'; +import { describe, expect, test } from 'vitest'; +import { binlogStreamTest } from './BinlogStreamUtils.js'; + +type StorageFactory = () => Promise; + +const BASIC_SYNC_RULES = ` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data" +`; + +describe( + ' Binlog stream - mongodb', + function () { + defineBinlogStreamTests(MONGO_STORAGE_FACTORY); + }, + { timeout: 20_000 } +); + +function defineBinlogStreamTests(factory: StorageFactory) { + test( + 'Replicate basic values', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, description, num FROM "test_data"`); + + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, num BIGINT)` + ); + + await context.replicateSnapshot(); + + const startRowCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + await connectionManager.query(`INSERT INTO test_data(description, num) VALUES('test1', 1152921504606846976)`); + const [[result]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'test1' AND num = 1152921504606846976` + ); + const testId = result.test_id; + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }) + ); + + test( + 'replicating case sensitive table', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT id, description FROM "test_DATA" + `); + + await connectionManager.query( + `CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)` + ); + + await context.replicateSnapshot(); + + const startRowCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_DATA(description) VALUES('test1')`); + const [[result]] = await connectionManager.query( + `SELECT id AS test_id FROM test_DATA WHERE description = 'test1'` + ); + const testId = result.test_id; + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([putOp('test_DATA', { id: testId, description: 'test1' })]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }) + ); + + // TODO: Not supported yet + // test( + // 'replicating TRUNCATE', + // binlogStreamTest(factory, async (context) => { + // const { connectionManager } = context; + // const syncRuleContent = ` + // bucket_definitions: + // global: + // data: + // - SELECT id, description FROM "test_data" + // by_test_data: + // parameters: SELECT id FROM test_data WHERE id = token_parameters.user_id + // data: [] + // `; + // await context.updateSyncRules(syncRuleContent); + // await connectionManager.query(`DROP TABLE IF EXISTS test_data`); + // await connectionManager.query( + // `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)` + // ); + // + // await context.replicateSnapshot(); + // context.startStreaming(); + // + // const [{ test_id }] = pgwireRows( + // await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) + // ); + // await connectionManager.query(`TRUNCATE test_data`); + // + // const data = await context.getBucketData('global[]'); + // + // expect(data).toMatchObject([ + // putOp('test_data', { id: test_id, description: 'test1' }), + // removeOp('test_data', test_id) + // ]); + // }) + // ); + + test( + 'replicating changing primary key', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)` + ); + + await context.replicateSnapshot(); + context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`); + const [[result1]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'test1'` + ); + const testId1 = result1.test_id; + + await connectionManager.query(`UPDATE test_data SET id = UUID(), description = 'test2a' WHERE id = '${testId1}'`); + const [[result2]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'test2a'` + ); + const testId2 = result2.test_id; + + // This update may fail replicating with: + // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} + await connectionManager.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${testId2}'`); + + // Re-use old id again + await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}', 'test1b')`); + await connectionManager.query(`UPDATE test_data SET description = 'test1c' WHERE id = '${testId1}'`); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + // Initial insert + putOp('test_data', { id: testId1, description: 'test1' }), + // Update id, then description + removeOp('test_data', testId1), + putOp('test_data', { id: testId2, description: 'test2a' }), + putOp('test_data', { id: testId2, description: 'test2b' }), + // Re-use old id + putOp('test_data', { id: testId1, description: 'test1b' }), + putOp('test_data', { id: testId1, description: 'test1c' }) + ]); + }) + ); + + test( + 'initial sync', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)` + ); + + await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`); + const [[result]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'test1'` + ); + const testId = result.test_id; + + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1' })]); + }) + ); + + test( + 'snapshot with date values', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT * FROM "test_data" + `); + + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + ); + + await connectionManager.query(` + INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') + `); + const [[result]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'testDates'` + ); + const testId = result.test_id; + + await context.replicateSnapshot(); + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + }) + ); + + test( + 'replication with date values', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(` + bucket_definitions: + global: + data: + - SELECT * FROM "test_data" + `); + + await connectionManager.query( + `CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)` + ); + + await context.replicateSnapshot(); + + const startRowCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + + await connectionManager.query(` + INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47') + `); + const [[result]] = await connectionManager.query( + `SELECT id AS test_id FROM test_data WHERE description = 'testDates'` + ); + const testId = result.test_id; + + const data = await context.getBucketData('global[]'); + expect(data).toMatchObject([ + putOp('test_data', { + id: testId, + description: 'testDates', + date: `2023-03-06`, + datetime: '2023-03-06T15:47:00.000Z', + timestamp: '2023-03-06T15:47:00.000Z' + }) + ]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + expect(endRowCount - startRowCount).toEqual(1); + expect(endTxCount - startTxCount).toEqual(1); + }) + ); + + test( + 'table not in sync rules', + binlogStreamTest(factory, async (context) => { + const { connectionManager } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await connectionManager.query( + `CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)` + ); + + await context.replicateSnapshot(); + + const startRowCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const startTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + context.startStreaming(); + + await connectionManager.query(`INSERT INTO test_donotsync(description) VALUES('test1')`); + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([]); + const endRowCount = (await Metrics.getInstance().getMetricValueForTests('powersync_rows_replicated_total')) ?? 0; + const endTxCount = + (await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0; + + // There was a transaction, but we should not replicate any actual data + expect(endRowCount - startRowCount).toEqual(0); + expect(endTxCount - startTxCount).toEqual(1); + }) + ); +} diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts new file mode 100644 index 000000000..c08f22c60 --- /dev/null +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -0,0 +1,157 @@ +import { + ActiveCheckpoint, + BucketStorageFactory, + OpId, + OplogEntry, + SyncRulesBucketStorage +} from '@powersync/service-core'; +import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; +import { fromAsync } from '@core-tests/stream_utils.js'; +import { BinLogStream, BinLogStreamOptions } from '@module/replication/BinLogStream.js'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import mysqlPromise from 'mysql2/promise'; +import { readExecutedGtid } from '@module/common/read-executed-gtid.js'; +import { logger } from '@powersync/lib-services-framework'; + +/** + * Tests operating on the binlog stream need to configure the stream and manage asynchronous + * replication, which gets a little tricky. + * + * This wraps a test in a function that configures all the context, and tears it down afterward. + */ +export function binlogStreamTest( + factory: () => Promise, + test: (context: BinlogStreamTestContext) => Promise +): () => Promise { + return async () => { + const f = await factory(); + const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); + + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + connection.release(); + const context = new BinlogStreamTestContext(f, connectionManager); + try { + await test(context); + } finally { + await context.dispose(); + } + }; +} + +export class BinlogStreamTestContext { + private _binlogStream?: BinLogStream; + private abortController = new AbortController(); + private streamPromise?: Promise; + public storage?: SyncRulesBucketStorage; + private replicationDone = false; + + constructor( + public factory: BucketStorageFactory, + public connectionManager: MySQLConnectionManager + ) {} + + async dispose() { + this.abortController.abort(); + await this.streamPromise; + await this.connectionManager.end(); + } + + get connectionTag() { + return this.connectionManager.connectionTag; + } + + async updateSyncRules(content: string): Promise { + const syncRules = await this.factory.updateSyncRules({ content: content }); + this.storage = this.factory.getInstance(syncRules); + return this.storage!; + } + + get binlogStream(): BinLogStream { + if (this.storage == null) { + throw new Error('updateSyncRules() first'); + } + if (this._binlogStream) { + return this._binlogStream; + } + const options: BinLogStreamOptions = { + storage: this.storage, + connections: this.connectionManager, + abortSignal: this.abortController.signal + }; + this._binlogStream = new BinLogStream(options); + return this._binlogStream!; + } + + async replicateSnapshot() { + await this.binlogStream.initReplication(); + this.replicationDone = true; + } + + startStreaming() { + if (!this.replicationDone) { + throw new Error('Call replicateSnapshot() before startStreaming()'); + } + this.streamPromise = this.binlogStream.streamChanges(); + } + + async getCheckpoint(options?: { timeout?: number }): Promise { + const connection = await this.connectionManager.getConnection(); + let checkpoint = await Promise.race([ + getClientCheckpoint(connection, this.factory, { timeout: options?.timeout ?? 60_000 }), + this.streamPromise + ]); + connection.release(); + if (typeof checkpoint == undefined) { + // This indicates an issue with the test setup - streamingPromise completed instead + // of getClientCheckpoint() + throw new Error('Test failure - streamingPromise completed'); + } + return checkpoint as string; + } + + async getBucketsDataBatch(buckets: Record, options?: { timeout?: number }) { + const checkpoint = await this.getCheckpoint(options); + const map = new Map(Object.entries(buckets)); + return fromAsync(this.storage!.getBucketDataBatch(checkpoint, map)); + } + + async getBucketData(bucket: string, start = '0', options?: { timeout?: number }): Promise { + const checkpoint = await this.getCheckpoint(options); + const map = new Map([[bucket, start]]); + const batch = this.storage!.getBucketDataBatch(checkpoint, map); + const batches = await fromAsync(batch); + return batches[0]?.batch.data ?? []; + } +} + +export async function getClientCheckpoint( + connection: mysqlPromise.Connection, + bucketStorage: BucketStorageFactory, + options?: { timeout?: number } +): Promise { + const start = Date.now(); + const gtid = await readExecutedGtid(connection); + // This old API needs a persisted checkpoint id. + // Since we don't use LSNs anymore, the only way to get that is to wait. + + const timeout = options?.timeout ?? 50_000; + let lastCp: ActiveCheckpoint | null = null; + + logger.info('Expected Checkpoint: ' + gtid.comparable); + while (Date.now() - start < timeout) { + const cp = await bucketStorage.getActiveCheckpoint(); + lastCp = cp; + //logger.info('Last Checkpoint: ' + lastCp.lsn); + if (!cp.hasSyncRules()) { + throw new Error('No sync rules available'); + } + if (cp.lsn && cp.lsn >= gtid.comparable) { + return cp.checkpoint; + } + + await new Promise((resolve) => setTimeout(resolve, 30)); + } + + throw new Error(`Timeout while waiting for checkpoint ${gtid.comparable}. Last checkpoint: ${lastCp?.lsn}`); +} diff --git a/modules/module-mysql/test/src/MysqlTypeMappings.test.ts b/modules/module-mysql/test/src/MysqlTypeMappings.test.ts new file mode 100644 index 000000000..9cebdccd2 --- /dev/null +++ b/modules/module-mysql/test/src/MysqlTypeMappings.test.ts @@ -0,0 +1,322 @@ +import { SqliteRow } from '@powersync/service-sync-rules'; +import { afterAll, describe, expect, test } from 'vitest'; +import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; +import { eventIsWriteMutation, eventIsXid } from '@module/replication/zongji/zongji-utils.js'; +import * as common from '@module/common/common-index.js'; +import ZongJi, { BinLogEvent } from '@powersync/mysql-zongji'; +import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js'; +import { toColumnDescriptors } from '@module/common/common-index.js'; + +describe('MySQL Data Types', () => { + const connectionManager = new MySQLConnectionManager(TEST_CONNECTION_OPTIONS, {}); + + afterAll(async () => { + await connectionManager.end(); + }); + + async function setupTable() { + const connection = await connectionManager.getConnection(); + await clearTestDb(connection); + await connection.query(`CREATE TABLE test_data ( + tinyint_col TINYINT, + smallint_col SMALLINT, + mediumint_col MEDIUMINT, + int_col INT, + integer_col INTEGER, + bigint_col BIGINT, + float_col FLOAT, + double_col DOUBLE, + decimal_col DECIMAL(10,2), + numeric_col NUMERIC(10,2), + bit_col BIT(8), + boolean_col BOOLEAN, + serial_col SERIAL, + + date_col DATE, + datetime_col DATETIME(3), + timestamp_col TIMESTAMP(3), + time_col TIME, + year_col YEAR, + + char_col CHAR(10), + varchar_col VARCHAR(255), + binary_col BINARY(16), + varbinary_col VARBINARY(256), + tinyblob_col TINYBLOB, + blob_col BLOB, + mediumblob_col MEDIUMBLOB, + longblob_col LONGBLOB, + tinytext_col TINYTEXT, + text_col TEXT, + mediumtext_col MEDIUMTEXT, + longtext_col LONGTEXT, + enum_col ENUM('value1', 'value2', 'value3'), + set_col SET('value1', 'value2', 'value3'), + + json_col JSON, + + geometry_col GEOMETRY, + point_col POINT, + linestring_col LINESTRING, + polygon_col POLYGON, + multipoint_col MULTIPOINT, + multilinestring_col MULTILINESTRING, + multipolygon_col MULTIPOLYGON, + geometrycollection_col GEOMETRYCOLLECTION + )`); + + connection.release(); + } + + test('Number types mappings', async () => { + await setupTable(); + await connectionManager.query(` +INSERT INTO test_data ( + tinyint_col, + smallint_col, + mediumint_col, + int_col, + integer_col, + bigint_col, + double_col, + decimal_col, + numeric_col, + bit_col, + boolean_col + -- serial_col is auto-incremented and can be left out +) VALUES ( + 127, -- TINYINT maximum value + 32767, -- SMALLINT maximum value + 8388607, -- MEDIUMINT maximum value + 2147483647, -- INT maximum value + 2147483647, -- INTEGER maximum value + 9223372036854775807, -- BIGINT maximum value + 3.1415926535, -- DOUBLE example + 12345.67, -- DECIMAL(10,2) example + 12345.67, -- NUMERIC(10,2) example + b'10101010', -- BIT(8) example in binary notation + TRUE -- BOOLEAN value (alias for TINYINT(1)) + -- serial_col is auto-incremented +)`); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(); + + const expectedResult = { + tinyint_col: 127n, + smallint_col: 32767n, + mediumint_col: 8388607n, + int_col: 2147483647n, + integer_col: 2147483647n, + bigint_col: 9223372036854775807n, + double_col: 3.1415926535, + decimal_col: 12345.67, + numeric_col: 12345.67, + bit_col: new Uint8Array([0b10101010]).valueOf(), + boolean_col: 1n, + serial_col: 1n + }; + expect(databaseRows[0]).toMatchObject(expectedResult); + expect(replicatedRows[0]).toMatchObject(expectedResult); + }); + + test('Float type mapping', async () => { + await setupTable(); + const expectedFloatValue = 3.14; + await connectionManager.query(`INSERT INTO test_data (float_col) VALUES (${expectedFloatValue})`); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(); + + const allowedPrecision = 0.0001; + + const actualFloatValueDB = databaseRows[0].float_col; + let difference = Math.abs((actualFloatValueDB as number) - expectedFloatValue); + expect(difference).toBeLessThan(allowedPrecision); + + const actualFloatValueReplicated = replicatedRows[0].float_col; + difference = Math.abs((actualFloatValueReplicated as number) - expectedFloatValue); + expect(difference).toBeLessThan(allowedPrecision); + }); + + test('Character types mappings', async () => { + await setupTable(); + await connectionManager.query(` +INSERT INTO test_data ( + char_col, + varchar_col, + binary_col, + varbinary_col, + tinyblob_col, + blob_col, + mediumblob_col, + longblob_col, + tinytext_col, + text_col, + mediumtext_col, + longtext_col, + enum_col +) VALUES ( + 'CharData', -- CHAR(10) with padding spaces + 'Variable character data',-- VARCHAR(255) + 'ShortBin', -- BINARY(16) + 'VariableBinaryData', -- VARBINARY(256) + 'TinyBlobData', -- TINYBLOB + 'BlobData', -- BLOB + 'MediumBlobData', -- MEDIUMBLOB + 'LongBlobData', -- LONGBLOB + 'TinyTextData', -- TINYTEXT + 'TextData', -- TEXT + 'MediumTextData', -- MEDIUMTEXT + 'LongTextData', -- LONGTEXT + 'value1' -- ENUM('value1', 'value2', 'value3') +);`); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(); + const expectedResult = { + char_col: 'CharData', + varchar_col: 'Variable character data', + binary_col: new Uint8Array([83, 104, 111, 114, 116, 66, 105, 110, 0, 0, 0, 0, 0, 0, 0, 0]), // Pad with 0 + varbinary_col: new Uint8Array([ + 0x56, 0x61, 0x72, 0x69, 0x61, 0x62, 0x6c, 0x65, 0x42, 0x69, 0x6e, 0x61, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61 + ]), + tinyblob_col: new Uint8Array([0x54, 0x69, 0x6e, 0x79, 0x42, 0x6c, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61]), + blob_col: new Uint8Array([0x42, 0x6c, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61]), + mediumblob_col: new Uint8Array([ + 0x4d, 0x65, 0x64, 0x69, 0x75, 0x6d, 0x42, 0x6c, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61 + ]), + longblob_col: new Uint8Array([0x4c, 0x6f, 0x6e, 0x67, 0x42, 0x6c, 0x6f, 0x62, 0x44, 0x61, 0x74, 0x61]), + tinytext_col: 'TinyTextData', + text_col: 'TextData', + mediumtext_col: 'MediumTextData', + longtext_col: 'LongTextData', + enum_col: 'value1' + }; + + expect(databaseRows[0]).toMatchObject(expectedResult); + expect(replicatedRows[0]).toMatchObject(expectedResult); + }); + + test('Date types mappings', async () => { + await setupTable(); + await connectionManager.query(` + INSERT INTO test_data(date_col, datetime_col, timestamp_col, time_col, year_col) + VALUES('2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47', '15:47:00', '2023'); + `); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(); + const expectedResult = { + date_col: '2023-03-06', + datetime_col: '2023-03-06T15:47:00.000Z', + timestamp_col: '2023-03-06T15:47:00.000Z', + time_col: '15:47:00', + year_col: 2023 + }; + + expect(databaseRows[0]).toMatchObject(expectedResult); + expect(replicatedRows[0]).toMatchObject(expectedResult); + }); + + test('Date types edge cases mappings', async () => { + await setupTable(); + + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('1970-01-01 00:00:01')`); + await connectionManager.query(`INSERT INTO test_data(timestamp_col) VALUES('2038-01-19 03:14:07.499')`); + await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('1000-01-01 00:00:00')`); + await connectionManager.query(`INSERT INTO test_data(datetime_col) VALUES('9999-12-31 23:59:59.499')`); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(4); + const expectedResults = [ + { timestamp_col: '1970-01-01T00:00:01.000Z' }, + { timestamp_col: '2038-01-19T03:14:07.499Z' }, + { datetime_col: '1000-01-01T00:00:00.000Z' }, + { datetime_col: '9999-12-31T23:59:59.499Z' } + ]; + + for (let i = 0; i < expectedResults.length; i++) { + expect(databaseRows[i]).toMatchObject(expectedResults[i]); + expect(replicatedRows[i]).toMatchObject(expectedResults[i]); + } + }); + + test('Json types mappings', async () => { + await setupTable(); + + const expectedJSON = { name: 'John Doe', age: 30, married: true }; + const expectedSet = ['value1', 'value3']; + + // For convenience, we map the SET data type to a JSON Array + await connectionManager.query( + `INSERT INTO test_data (json_col, set_col) VALUES ('${JSON.stringify(expectedJSON)}', '${expectedSet.join(',')}')` + ); + + const databaseRows = await getDatabaseRows(connectionManager, 'test_data'); + const replicatedRows = await getReplicatedRows(); + + const actualDBJSONValue = JSON.parse(databaseRows[0].json_col as string); + const actualReplicatedJSONValue = JSON.parse(replicatedRows[0].json_col as string); + expect(actualDBJSONValue).toEqual(expectedJSON); + expect(actualReplicatedJSONValue).toEqual(expectedJSON); + + const actualDBSetValue = JSON.parse(databaseRows[0].set_col as string); + const actualReplicatedSetValue = JSON.parse(replicatedRows[0].set_col as string); + expect(actualDBSetValue).toEqual(expectedSet); + expect(actualReplicatedSetValue).toEqual(expectedSet); + }); +}); + +async function getDatabaseRows(connection: MySQLConnectionManager, tableName: string): Promise { + const [results, fields] = await connection.query(`SELECT * FROM ${tableName}`); + const columns = toColumnDescriptors(fields); + return results.map((row) => common.toSQLiteRow(row, columns)); +} + +/** + * Return all the inserts from the first transaction in the binlog stream. + */ +async function getReplicatedRows(expectedTransactionsCount?: number): Promise { + let transformed: SqliteRow[] = []; + const zongji = new ZongJi({ + host: TEST_CONNECTION_OPTIONS.hostname, + user: TEST_CONNECTION_OPTIONS.username, + password: TEST_CONNECTION_OPTIONS.password, + timeZone: 'Z' // Ensure no auto timezone manipulation of the dates occur + }); + + const completionPromise = new Promise((resolve, reject) => { + zongji.on('binlog', (evt: BinLogEvent) => { + try { + if (eventIsWriteMutation(evt)) { + const tableMapEntry = evt.tableMap[evt.tableId]; + const columns = toColumnDescriptors(tableMapEntry); + const records = evt.rows.map((row: Record) => common.toSQLiteRow(row, columns)); + transformed.push(...records); + } else if (eventIsXid(evt)) { + if (expectedTransactionsCount !== undefined) { + expectedTransactionsCount--; + if (expectedTransactionsCount == 0) { + zongji.stop(); + resolve(transformed); + } + } else { + zongji.stop(); + resolve(transformed); + } + } + } catch (e) { + reject(e); + } + }); + }); + + zongji.start({ + includeEvents: ['tablemap', 'writerows', 'xid'], + filename: 'mysql-bin.000001', + position: 0 + }); + + return completionPromise; +} diff --git a/modules/module-mysql/test/src/env.ts b/modules/module-mysql/test/src/env.ts index 3dad20a22..05fc76c42 100644 --- a/modules/module-mysql/test/src/env.ts +++ b/modules/module-mysql/test/src/env.ts @@ -1,7 +1,7 @@ import { utils } from '@powersync/lib-services-framework'; export const env = utils.collectEnvironmentVariables({ - MYSQL_TEST_URI: utils.type.string.default('mysql://myuser:mypassword@localhost:3306/mydatabase'), + MYSQL_TEST_URI: utils.type.string.default('mysql://root:mypassword@localhost:3306/mydatabase'), CI: utils.type.boolean.default('false'), SLOW_TESTS: utils.type.boolean.default('false') }); diff --git a/modules/module-mysql/test/src/util.ts b/modules/module-mysql/test/src/util.ts index a489e0b1b..135cb7e08 100644 --- a/modules/module-mysql/test/src/util.ts +++ b/modules/module-mysql/test/src/util.ts @@ -5,7 +5,13 @@ import mysqlPromise from 'mysql2/promise'; import { connectMongo } from '@core-tests/util.js'; import { getMySQLVersion } from '@module/common/check-source-configuration.js'; import { gte } from 'semver'; -import { RowDataPacket } from 'mysql2'; + +export const TEST_URI = env.MYSQL_TEST_URI; + +export const TEST_CONNECTION_OPTIONS = types.normalizeConnectionConfig({ + type: 'mysql', + uri: TEST_URI +}); // The metrics need to be initialized before they can be used await Metrics.initialise({ @@ -15,13 +21,6 @@ await Metrics.initialise({ }); Metrics.getInstance().resetCounters(); -export const TEST_URI = env.MYSQL_TEST_URI; - -export const TEST_CONNECTION_OPTIONS = types.normalizeConnectionConfig({ - type: 'mysql', - uri: TEST_URI -}); - export type StorageFactory = () => Promise; export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => { @@ -37,7 +36,7 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => { return new MongoBucketStorage(db, { slot_name_prefix: 'test_' }); }; -export async function clearAndRecreateTestDb(connection: mysqlPromise.Connection) { +export async function clearTestDb(connection: mysqlPromise.Connection) { const version = await getMySQLVersion(connection); if (gte(version, '8.4.0')) { await connection.query('RESET BINARY LOGS AND GTIDS'); @@ -45,11 +44,7 @@ export async function clearAndRecreateTestDb(connection: mysqlPromise.Connection await connection.query('RESET MASTER'); } - // await connection.query(`DROP DATABASE IF EXISTS ${TEST_CONNECTION_OPTIONS.database}`); - // - // await connection.query(`CREATE DATABASE IF NOT EXISTS ${TEST_CONNECTION_OPTIONS.database}`); - - const [result] = await connection.query( + const [result] = await connection.query( `SELECT TABLE_NAME FROM information_schema.tables WHERE TABLE_SCHEMA = '${TEST_CONNECTION_OPTIONS.database}'` ); @@ -60,3 +55,7 @@ export async function clearAndRecreateTestDb(connection: mysqlPromise.Connection } } } + +export function connectMySQLPool(): mysqlPromise.Pool { + return mysqlPromise.createPool(TEST_URI); +} diff --git a/modules/module-mysql/test/tsconfig.json b/modules/module-mysql/test/tsconfig.json index 18898c4ee..5257b2739 100644 --- a/modules/module-mysql/test/tsconfig.json +++ b/modules/module-mysql/test/tsconfig.json @@ -13,7 +13,7 @@ "@core-tests/*": ["../../../packages/service-core/test/src/*"] } }, - "include": ["src"], + "include": ["src", "../src/replication/zongji/zongji.d.ts"], "references": [ { "path": "../" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c4f1bd0e4..cd892d264 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -143,11 +143,14 @@ importers: specifier: workspace:* version: link:../../libs/lib-services '@powersync/mysql-zongji': - specifier: 0.0.0-dev-20241023144335 - version: 0.0.0-dev-20241023144335 + specifier: 0.0.0-dev-20241031142605 + version: 0.0.0-dev-20241031142605 '@powersync/service-core': specifier: workspace:* version: link:../../packages/service-core + '@powersync/service-jsonbig': + specifier: workspace:* + version: link:../../packages/jsonbig '@powersync/service-sync-rules': specifier: workspace:* version: link:../../packages/sync-rules @@ -1178,8 +1181,8 @@ packages: resolution: {integrity: sha512-UA91GwWPhFExt3IizW6bOeY/pQ0BkuNwKjk9iQW9KqxluGCrg4VenZ0/L+2Y0+ZOtme72EVvg6v0zo3AMQRCeA==} engines: {node: '>=12'} - '@powersync/mysql-zongji@0.0.0-dev-20241023144335': - resolution: {integrity: sha512-77A5ld4Egm0KTHDUzBSP+MHH1+ibp1Es1jlaGZmHWNzmXKNiScd0jxkPDxna9CTfRvOoAu8R0T3MTAuK0aDQpg==} + '@powersync/mysql-zongji@0.0.0-dev-20241031142605': + resolution: {integrity: sha512-LATx5xfJjXcbvW3s9yCy2tlhhsgOuaUoUk0/EH67sgbTQQFZTfBVO/2gzdJH5GvGgONdqoPBQ7skWNwBF/dEXw==} engines: {node: '>=20.0.0'} '@prisma/instrumentation@5.16.1': @@ -4579,7 +4582,7 @@ snapshots: '@pnpm/network.ca-file': 1.0.2 config-chain: 1.1.13 - '@powersync/mysql-zongji@0.0.0-dev-20241023144335': + '@powersync/mysql-zongji@0.0.0-dev-20241031142605': dependencies: '@vlasky/mysql': 2.18.6 big-integer: 1.6.51 diff --git a/service/src/runners/unified-runner.ts b/service/src/runners/unified-runner.ts index 687e8f38c..4997d8933 100644 --- a/service/src/runners/unified-runner.ts +++ b/service/src/runners/unified-runner.ts @@ -31,9 +31,9 @@ export const startUnifiedRunner = async (runnerConfig: core.utils.RunnerConfig) const moduleManager = container.getImplementation(core.modules.ModuleManager); await moduleManager.initialize(serviceContext); - logger.info('Starting service'); + logger.info('Starting service...'); await serviceContext.lifeCycleEngine.start(); - logger.info('service started'); + logger.info('Service started'); await container.probes.ready();