Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ jobs:
strategy:
fail-fast: false
matrix:
mysql-version: [8.0, 8.4]
mysql-version: [5.7, 8.0, 8.4]

steps:
- uses: actions/checkout@v4
Expand All @@ -167,7 +167,8 @@ jobs:
-d mysql:${{ matrix.mysql-version }} \
--log-bin=/var/lib/mysql/mysql-bin.log \
--gtid_mode=ON \
--enforce_gtid_consistency=ON
--enforce_gtid_consistency=ON \
--server-id=1

- name: Start MongoDB
uses: supercharge/[email protected]
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
import * as service_types from '@powersync/service-types';
import mysql from 'mysql2/promise';
import * as common from '../common/common-index.js';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';
import * as types from '../types/types.js';
import { toExpressionTypeFromMySQLType } from '../common/common-index.js';

Expand Down Expand Up @@ -326,7 +326,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
name: result.schema_name,
tables: []
});

const columns = JSON.parse(result.columns).map((column: { data_type: string; column_name: string }) => ({
name: column.column_name,
type: column.data_type,
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/src/common/ReplicatedGTID.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import mysql from 'mysql2/promise';
import * as uuid from 'uuid';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

export type BinLogPosition = {
filename: string;
Expand Down
19 changes: 9 additions & 10 deletions modules/module-mysql/src/common/check-source-configuration.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,16 @@
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

const MIN_SUPPORTED_VERSION = '5.7.0';

export async function checkSourceConfiguration(connection: mysqlPromise.Connection): Promise<string[]> {
const errors: string[] = [];

const version = await mysql_utils.getMySQLVersion(connection);
if (!mysql_utils.isVersionAtLeast(version, MIN_SUPPORTED_VERSION)) {
errors.push(`MySQL versions older than ${MIN_SUPPORTED_VERSION} are not supported. Your version is: ${version}.`);
}

const [[result]] = await mysql_utils.retriedQuery({
connection,
query: `
Expand Down Expand Up @@ -48,12 +56,3 @@ export async function checkSourceConfiguration(connection: mysqlPromise.Connecti

return errors;
}

export async function getMySQLVersion(connection: mysqlPromise.Connection): Promise<string> {
const [[versionResult]] = await mysql_utils.retriedQuery({
connection,
query: `SELECT VERSION() as version`
});

return versionResult.version as string;
}
2 changes: 1 addition & 1 deletion modules/module-mysql/src/common/get-replication-columns.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { storage } from '@powersync/service-core';
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';

export type GetReplicationColumnsOptions = {
connection: mysqlPromise.Connection;
Expand Down
14 changes: 5 additions & 9 deletions modules/module-mysql/src/common/read-executed-gtid.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,22 @@
import mysqlPromise from 'mysql2/promise';
import * as mysql_utils from '../utils/mysql_utils.js';
import { gte } from 'semver';

import * as mysql_utils from '../utils/mysql-utils.js';
import { ReplicatedGTID } from './ReplicatedGTID.js';
import { getMySQLVersion } from './check-source-configuration.js';

/**
* Gets the current master HEAD GTID
*/
export async function readExecutedGtid(connection: mysqlPromise.Connection): Promise<ReplicatedGTID> {
const version = await getMySQLVersion(connection);
const version = await mysql_utils.getMySQLVersion(connection);

let binlogStatus: mysqlPromise.RowDataPacket;
if (gte(version, '8.4.0')) {
// Get the BinLog status
if (mysql_utils.isVersionAtLeast(version, '8.4.0')) {
// Syntax for the below query changed in 8.4.0
const [[binLogResult]] = await mysql_utils.retriedQuery({
connection,
query: `SHOW BINARY LOG STATUS`
});
binlogStatus = binLogResult;
} else {
// TODO Check if this works for version 5.7
// Get the BinLog status
const [[binLogResult]] = await mysql_utils.retriedQuery({
connection,
query: `SHOW MASTER STATUS`
Expand Down
2 changes: 1 addition & 1 deletion modules/module-mysql/src/replication/BinLogStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import * as zongji_utils from './zongji/zongji-utils.js';
import { MySQLConnectionManager } from './MySQLConnectionManager.js';
import { isBinlogStillAvailable, ReplicatedGTID, toColumnDescriptors } from '../common/common-index.js';
import mysqlPromise from 'mysql2/promise';
import { createRandomServerId } from '../utils/mysql_utils.js';
import { createRandomServerId } from '../utils/mysql-utils.js';

export interface BinLogStreamOptions {
connections: MySQLConnectionManager;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { NormalizedMySQLConnectionConfig } from '../types/types.js';
import mysqlPromise from 'mysql2/promise';
import mysql, { FieldPacket, RowDataPacket } from 'mysql2';
import * as mysql_utils from '../utils/mysql_utils.js';
import * as mysql_utils from '../utils/mysql-utils.js';
import ZongJi from '@powersync/mysql-zongji';
import { logger } from '@powersync/lib-services-framework';

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { logger } from '@powersync/lib-services-framework';
import mysql from 'mysql2';
import mysqlPromise from 'mysql2/promise';
import * as types from '../types/types.js';
import { coerce, gte } from 'semver';

export type RetriedQueryOptions = {
connection: mysqlPromise.Connection;
Expand Down Expand Up @@ -60,3 +61,24 @@ export function createPool(config: types.NormalizedMySQLConnectionConfig, option
export function createRandomServerId(syncRuleId: number): number {
return Number.parseInt(`${syncRuleId}00${Math.floor(Math.random() * 10000)}`);
}

export async function getMySQLVersion(connection: mysqlPromise.Connection): Promise<string> {
const [[versionResult]] = await retriedQuery({
connection,
query: `SELECT VERSION() as version`
});

return versionResult.version as string;
}

/**
* Check if the current MySQL version is newer or equal to the target version.
* @param version
* @param minimumVersion
*/
export function isVersionAtLeast(version: string, minimumVersion: string): boolean {
const coercedVersion = coerce(version);
const coercedMinimumVersion = coerce(minimumVersion);

return gte(coercedVersion!, coercedMinimumVersion!, { loose: true });
}
76 changes: 25 additions & 51 deletions modules/module-mysql/test/src/BinLogStream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
import { BucketStorageFactory, Metrics } from '@powersync/service-core';
import { describe, expect, test } from 'vitest';
import { binlogStreamTest } from './BinlogStreamUtils.js';
import { v4 as uuid } from 'uuid';

type StorageFactory = () => Promise<BucketStorageFactory>;

Expand Down Expand Up @@ -32,9 +33,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
data:
- SELECT id, description, num FROM "test_data"`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, num BIGINT)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, num BIGINT)`);

await context.replicateSnapshot();

Expand All @@ -44,11 +43,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {
(await Metrics.getInstance().getMetricValueForTests('powersync_transactions_replicated_total')) ?? 0;

context.startStreaming();
await connectionManager.query(`INSERT INTO test_data(description, num) VALUES('test1', 1152921504606846976)`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1' AND num = 1152921504606846976`
const testId = uuid();
await connectionManager.query(
`INSERT INTO test_data(id, description, num) VALUES('${testId}', 'test1', 1152921504606846976)`
);
const testId = result.test_id;
const data = await context.getBucketData('global[]');

expect(data).toMatchObject([putOp('test_data', { id: testId, description: 'test1', num: 1152921504606846976n })]);
Expand All @@ -71,9 +69,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
- SELECT id, description FROM "test_DATA"
`);

await connectionManager.query(
`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_DATA (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();

Expand All @@ -84,11 +80,8 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

await connectionManager.query(`INSERT INTO test_DATA(description) VALUES('test1')`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_DATA WHERE description = 'test1'`
);
const testId = result.test_id;
const testId = uuid();
await connectionManager.query(`INSERT INTO test_DATA(id, description) VALUES('${testId}','test1')`);

const data = await context.getBucketData('global[]');

Expand Down Expand Up @@ -144,24 +137,18 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();
context.startStreaming();

await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`);
const [[result1]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1'`
);
const testId1 = result1.test_id;
const testId1 = uuid();
await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId1}','test1')`);

await connectionManager.query(`UPDATE test_data SET id = UUID(), description = 'test2a' WHERE id = '${testId1}'`);
const [[result2]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test2a'`
const testId2 = uuid();
await connectionManager.query(
`UPDATE test_data SET id = '${testId2}', description = 'test2a' WHERE id = '${testId1}'`
);
const testId2 = result2.test_id;

// This update may fail replicating with:
// Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"}
Expand Down Expand Up @@ -192,15 +179,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description text)`);

await connectionManager.query(`INSERT INTO test_data(description) VALUES('test1')`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'test1'`
);
const testId = result.test_id;
const testId = uuid();
await connectionManager.query(`INSERT INTO test_data(id, description) VALUES('${testId}','test1')`);

await context.replicateSnapshot();

Expand All @@ -221,16 +203,13 @@ function defineBinlogStreamTests(factory: StorageFactory) {
`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
);

const testId = uuid();
await connectionManager.query(`
INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'testDates'`
);
const testId = result.test_id;

await context.replicateSnapshot();

Expand Down Expand Up @@ -259,7 +238,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
`);

await connectionManager.query(
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
`CREATE TABLE test_data (id CHAR(36) PRIMARY KEY, description TEXT, date DATE, datetime DATETIME, timestamp TIMESTAMP)`
);

await context.replicateSnapshot();
Expand All @@ -271,13 +250,10 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

const testId = uuid();
await connectionManager.query(`
INSERT INTO test_data(description, date, datetime, timestamp) VALUES('testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
INSERT INTO test_data(id, description, date, datetime, timestamp) VALUES('${testId}','testDates', '2023-03-06', '2023-03-06 15:47', '2023-03-06 15:47')
`);
const [[result]] = await connectionManager.query(
`SELECT id AS test_id FROM test_data WHERE description = 'testDates'`
);
const testId = result.test_id;

const data = await context.getBucketData('global[]');
expect(data).toMatchObject([
Expand All @@ -303,9 +279,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {
const { connectionManager } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

await connectionManager.query(
`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY DEFAULT (UUID()), description text)`
);
await connectionManager.query(`CREATE TABLE test_donotsync (id CHAR(36) PRIMARY KEY, description text)`);

await context.replicateSnapshot();

Expand All @@ -316,7 +290,7 @@ function defineBinlogStreamTests(factory: StorageFactory) {

context.startStreaming();

await connectionManager.query(`INSERT INTO test_donotsync(description) VALUES('test1')`);
await connectionManager.query(`INSERT INTO test_donotsync(id, description) VALUES('${uuid()}','test1')`);
const data = await context.getBucketData('global[]');

expect(data).toMatchObject([]);
Expand Down
17 changes: 17 additions & 0 deletions modules/module-mysql/test/src/mysql-utils.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
import { describe, expect, test } from 'vitest';
import { isVersionAtLeast } from '@module/utils/mysql-utils.js';

describe('MySQL Utility Tests', () => {
test('Minimum version checking ', () => {
const newerVersion = '8.4.0';
const olderVersion = '5.7';
const sameVersion = '8.0';
// Improperly formatted semantic versions should be handled gracefully if possible
const improperSemver = '5.7.42-0ubuntu0.18.04.1-log';

expect(isVersionAtLeast(newerVersion, '8.0')).toBeTruthy();
expect(isVersionAtLeast(sameVersion, '8.0')).toBeTruthy();
expect(isVersionAtLeast(olderVersion, '8.0')).toBeFalsy();
expect(isVersionAtLeast(improperSemver, '5.7')).toBeTruthy();
});
});
9 changes: 2 additions & 7 deletions modules/module-mysql/test/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,7 @@ import { BucketStorageFactory, Metrics, MongoBucketStorage } from '@powersync/se
import { env } from './env.js';
import mysqlPromise from 'mysql2/promise';
import { connectMongo } from '@core-tests/util.js';
import { getMySQLVersion } from '@module/common/check-source-configuration.js';
import { gte } from 'semver';
import { getMySQLVersion, isVersionAtLeast } from '@module/utils/mysql-utils.js';

export const TEST_URI = env.MYSQL_TEST_URI;

Expand Down Expand Up @@ -38,7 +37,7 @@ export const INITIALIZED_MONGO_STORAGE_FACTORY: StorageFactory = async () => {

export async function clearTestDb(connection: mysqlPromise.Connection) {
const version = await getMySQLVersion(connection);
if (gte(version, '8.4.0')) {
if (isVersionAtLeast(version, '8.4.0')) {
await connection.query('RESET BINARY LOGS AND GTIDS');
} else {
await connection.query('RESET MASTER');
Expand All @@ -55,7 +54,3 @@ export async function clearTestDb(connection: mysqlPromise.Connection) {
}
}
}

export function connectMySQLPool(): mysqlPromise.Pool {
return mysqlPromise.createPool(TEST_URI);
}
Loading