Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .changeset/lemon-terms-play.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
---
'@powersync/service-module-mysql': minor
---

Generate random serverId based on syncrule id for MySQL replication client
Consolidated type mappings between snapshot and replicated values
Enabled MySQL tests in CI

65 changes: 65 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,68 @@ jobs:

- name: Test
run: pnpm test --filter='./modules/module-postgres'

run-mysql-tests:
name: MySQL Test
runs-on: ubuntu-latest
needs: run-core-tests

strategy:
fail-fast: false
matrix:
mysql-version: [ 8.0, 8.4 ]

steps:
- uses: actions/checkout@v4

- name: Start MySQL
run: |
docker run \
--name MySQLTestDatabase \
-e MYSQL_ROOT_PASSWORD=mypassword \
-e MYSQL_DATABASE=mydatabase \
-p 3306:3306 \
-d mysql:${{ matrix.mysql-version }} \
--log-bin=/var/lib/mysql/mysql-bin.log \
--gtid_mode=ON \
--enforce_gtid_consistency=ON

- name: Start MongoDB
uses: supercharge/[email protected]
with:
mongodb-version: '6.0'
mongodb-replica-set: test-rs

- name: Setup NodeJS
uses: actions/setup-node@v4
with:
node-version-file: '.nvmrc'

- uses: pnpm/action-setup@v4
name: Install pnpm
with:
version: 9
run_install: false

- name: Get pnpm store directory
shell: bash
run: |
echo "STORE_PATH=$(pnpm store path --silent)" >> $GITHUB_ENV

- uses: actions/cache@v3
name: Setup pnpm cache
with:
path: ${{ env.STORE_PATH }}
key: ${{ runner.os }}-pnpm-store-${{ hashFiles('**/pnpm-lock.yaml') }}
restore-keys: |
${{ runner.os }}-pnpm-store-

- name: Install dependencies
run: pnpm install

- name: Build
shell: bash
run: pnpm build

- name: Test
run: pnpm test --filter='./modules/module-mysql'
6 changes: 2 additions & 4 deletions modules/module-mysql/dev/config/sync_rules.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@
# Note that changes to this file are not watched.
# The service needs to be restarted for changes to take effect.

# Note that specifying the schema is currently required due to the default
# schema being specified as `public`, but in mysql the schema is the database name
bucket_definitions:
global:
data:
- SELECT * FROM mydatabase.lists
- SELECT * FROM mydatabase.todos
- SELECT * FROM lists
- SELECT * FROM todos
3 changes: 2 additions & 1 deletion modules/module-mysql/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@
"@powersync/service-core": "workspace:*",
"@powersync/service-sync-rules": "workspace:*",
"@powersync/service-types": "workspace:*",
"@powersync/mysql-zongji": "0.0.0-dev-20241023144335",
"@powersync/service-jsonbig": "workspace:*",
"@powersync/mysql-zongji": "0.0.0-dev-20241031142605",
"semver": "^7.5.4",
"async": "^3.2.4",
"mysql2": "^3.11.0",
Expand Down
155 changes: 147 additions & 8 deletions modules/module-mysql/src/common/mysql-to-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,155 @@ import * as sync_rules from '@powersync/service-sync-rules';
import { ExpressionType } from '@powersync/service-sync-rules';
import { ColumnDescriptor } from '@powersync/service-core';
import mysql from 'mysql2';
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { ColumnDefinition, TableMapEntry } from '@powersync/mysql-zongji';

export function toSQLiteRow(row: Record<string, any>, columns?: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
export enum ADDITIONAL_MYSQL_TYPES {
DATETIME2 = 18,
TIMESTAMP2 = 17,
BINARY = 100,
VARBINARY = 101,
TEXT = 102
}

export const MySQLTypesMap: { [key: number]: string } = {};
for (const [name, code] of Object.entries(mysql.Types)) {
MySQLTypesMap[code as number] = name;
}
for (const [name, code] of Object.entries(ADDITIONAL_MYSQL_TYPES)) {
MySQLTypesMap[code as number] = name;
}

export function toColumnDescriptors(columns: mysql.FieldPacket[]): Map<string, ColumnDescriptor>;
export function toColumnDescriptors(tableMap: TableMapEntry): Map<string, ColumnDescriptor>;

export function toColumnDescriptors(columns: mysql.FieldPacket[] | TableMapEntry): Map<string, ColumnDescriptor> {
const columnMap = new Map<string, ColumnDescriptor>();
if (Array.isArray(columns)) {
for (const column of columns) {
columnMap.set(column.name, toColumnDescriptorFromFieldPacket(column));
}
} else {
for (const column of columns.columns) {
columnMap.set(column.name, toColumnDescriptorFromDefinition(column));
}
}

return columnMap;
}

export function toColumnDescriptorFromFieldPacket(column: mysql.FieldPacket): ColumnDescriptor {
let typeId = column.type!;
const BINARY_FLAG = 128;
const MYSQL_ENUM_FLAG = 256;
const MYSQL_SET_FLAG = 2048;

switch (column.type) {
case mysql.Types.STRING:
if (((column.flags as number) & BINARY_FLAG) !== 0) {
typeId = ADDITIONAL_MYSQL_TYPES.BINARY;
} else if (((column.flags as number) & MYSQL_ENUM_FLAG) !== 0) {
typeId = mysql.Types.ENUM;
} else if (((column.flags as number) & MYSQL_SET_FLAG) !== 0) {
typeId = mysql.Types.SET;
}
break;

case mysql.Types.VAR_STRING:
typeId = ((column.flags as number) & BINARY_FLAG) !== 0 ? ADDITIONAL_MYSQL_TYPES.VARBINARY : column.type;
break;
case mysql.Types.BLOB:
typeId = ((column.flags as number) & BINARY_FLAG) === 0 ? ADDITIONAL_MYSQL_TYPES.TEXT : column.type;
break;
}

const columnType = MySQLTypesMap[typeId];

return {
name: column.name,
type: columnType,
typeId: typeId
};
}

export function toColumnDescriptorFromDefinition(column: ColumnDefinition): ColumnDescriptor {
let typeId = column.type;

switch (column.type) {
case mysql.Types.STRING:
typeId = !column.charset ? ADDITIONAL_MYSQL_TYPES.BINARY : column.type;
break;
case mysql.Types.VAR_STRING:
case mysql.Types.VARCHAR:
typeId = !column.charset ? ADDITIONAL_MYSQL_TYPES.VARBINARY : column.type;
break;
case mysql.Types.BLOB:
typeId = column.charset ? ADDITIONAL_MYSQL_TYPES.TEXT : column.type;
break;
}

const columnType = MySQLTypesMap[typeId];

return {
name: column.name,
type: columnType,
typeId: typeId
};
}

export function toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
for (let key in row) {
if (row[key] instanceof Date) {
const column = columns?.get(key);
if (column?.typeId == mysql.Types.DATE) {
// Only parse the date part
row[key] = row[key].toISOString().split('T')[0];
} else {
row[key] = row[key].toISOString();
// We are very much expecting the column to be there
const column = columns.get(key)!;

if (row[key] !== null) {
switch (column.typeId) {
case mysql.Types.DATE:
// Only parse the date part
row[key] = row[key].toISOString().split('T')[0];
break;
case mysql.Types.DATETIME:
case ADDITIONAL_MYSQL_TYPES.DATETIME2:
case mysql.Types.TIMESTAMP:
case ADDITIONAL_MYSQL_TYPES.TIMESTAMP2:
row[key] = row[key].toISOString();
break;
case mysql.Types.JSON:
if (typeof row[key] === 'string') {
row[key] = new JsonContainer(row[key]);
}
break;
case mysql.Types.BIT:
case mysql.Types.BLOB:
case mysql.Types.TINY_BLOB:
case mysql.Types.MEDIUM_BLOB:
case mysql.Types.LONG_BLOB:
case ADDITIONAL_MYSQL_TYPES.BINARY:
case ADDITIONAL_MYSQL_TYPES.VARBINARY:
row[key] = new Uint8Array(Object.values(row[key]));
break;
case mysql.Types.LONGLONG:
if (typeof row[key] === 'string') {
row[key] = BigInt(row[key]);
} else if (typeof row[key] === 'number') {
// Zongji returns BIGINT as a number when it can be represented as a number
row[key] = BigInt(row[key]);
}
break;
case mysql.Types.TINY:
case mysql.Types.SHORT:
case mysql.Types.LONG:
case mysql.Types.INT24:
// Handle all integer values a BigInt
if (typeof row[key] === 'number') {
row[key] = BigInt(row[key]);
}
break;
case mysql.Types.SET:
// Convert to JSON array from string
const values = row[key].split(',');
row[key] = JSONBig.stringify(values);
break;
}
}
}
Expand Down
3 changes: 0 additions & 3 deletions modules/module-mysql/src/common/read-executed-gtid.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import { gte } from 'semver';

import { ReplicatedGTID } from './ReplicatedGTID.js';
import { getMySQLVersion } from './check-source-configuration.js';
import { logger } from '@powersync/lib-services-framework';

/**
* Gets the current master HEAD GTID
Expand Down Expand Up @@ -33,8 +32,6 @@ export async function readExecutedGtid(connection: mysqlPromise.Connection): Pro
offset: parseInt(binlogStatus.Position)
};

logger.info('Succesfully read executed GTID', { position });

return new ReplicatedGTID({
// The head always points to the next position to start replication from
position,
Expand Down
Loading
Loading