Skip to content
Merged
Show file tree
Hide file tree
Changes from 64 commits
Commits
Show all changes
71 commits
Select commit Hold shift + click to select a range
ebabbbd
add scaffolding for mysql module
stevensJourney Aug 12, 2024
7ee096e
add more mysql functions
stevensJourney Aug 12, 2024
729013d
add execute sql api
stevensJourney Aug 12, 2024
1f007c3
fix conditional replication and router engines. Add more API functions
stevensJourney Aug 13, 2024
e095f3e
added table debug info calls
stevensJourney Aug 14, 2024
3188c45
added connection schema api method and initial workings for replicati…
stevensJourney Aug 15, 2024
d738bce
update from base
stevensJourney Aug 15, 2024
338d490
update replication lag API. Cleanup queries
stevensJourney Aug 15, 2024
f174f01
cleanup
stevensJourney Aug 15, 2024
196f21c
cleanup retried queries. Add SSL config for MySQL config
stevensJourney Aug 15, 2024
a6614f1
Use framework errors. Update DataSourceConfig and ResolvedDataSourceC…
stevensJourney Aug 15, 2024
4a0b5ce
more mysql cleanup
stevensJourney Aug 15, 2024
b3a5c08
update failing Github action name to be clearer
stevensJourney Aug 15, 2024
76bac2f
share some utilities. Playing around with replication
stevensJourney Aug 15, 2024
ce78ee8
unfortunate implementation of replication lag
stevensJourney Aug 15, 2024
78e4f23
track binlog positions in replicated GTID identifier
stevensJourney Aug 19, 2024
bef4198
wip
stevensJourney Aug 21, 2024
ec4f71d
more hacks to get writerows to work
stevensJourney Aug 21, 2024
7155d40
wip: before implementing update
stevensJourney Aug 22, 2024
df50b5c
added zongji types
stevensJourney Aug 22, 2024
ff8e557
update typescript
stevensJourney Aug 22, 2024
3aaa90e
shuffle files arround
stevensJourney Aug 22, 2024
cc1ca9f
update after shuffle
stevensJourney Aug 22, 2024
39dfdef
merge in changes
stevensJourney Aug 22, 2024
e2422e8
use new replicator interfaces
stevensJourney Aug 22, 2024
9250b19
move toSQLite function to common
stevensJourney Aug 22, 2024
bf2725a
update types
stevensJourney Aug 22, 2024
4378c77
move
stevensJourney Aug 22, 2024
e6da973
cleanup
stevensJourney Aug 27, 2024
e3bf112
fix build error
stevensJourney Aug 27, 2024
9fd9308
add some helpers
stevensJourney Sep 6, 2024
b433625
Merge branch 'refs/heads/feat/rework-teardown' into module-mysql
Rentacookie Sep 9, 2024
5b5792d
Updated lockfile
Rentacookie Sep 10, 2024
1858430
Merge branch 'feat/modular-replication-architecture' into module-mysql
Rentacookie Sep 17, 2024
f2963a7
Merge branch 'feat/modular-replication-architecture' into module-mysql
Rentacookie Oct 8, 2024
39638f4
Lockfile update and merge conflict fixes
Rentacookie Oct 8, 2024
aec43b9
Added dev docker compose for mysql
Rentacookie Oct 9, 2024
09cbcf3
Added MySQL connection manager and use appropriate connections
Rentacookie Oct 9, 2024
c8751d7
Added stricter type definition for checkpoints
Rentacookie Oct 9, 2024
9e7b41d
Fixed diagnostics route merge conflict
Rentacookie Oct 10, 2024
12191af
Add MySQLConnection management
Rentacookie Oct 10, 2024
cf002be
Made streamable mysql connections available
Rentacookie Oct 10, 2024
9941e20
Updated BinlogStream to use appropriate connections for snapshot stre…
Rentacookie Oct 10, 2024
49d01cd
Lockfile update
Rentacookie Oct 10, 2024
0932afb
Updated dev/test mysql docker compose
Rentacookie Oct 15, 2024
18ce275
Added BinlogStream tests (WiP)
Rentacookie Oct 15, 2024
20d4b0d
Initializing batch in constructor of MongoBucketBatch
Rentacookie Oct 15, 2024
9c4a14e
Merge branch 'feat/modular-replication-architecture' into module-mysql
Rentacookie Oct 15, 2024
203a737
Some MongoModule merge conflict handling and cleanup
Rentacookie Oct 15, 2024
e218216
Made mysql module publishing public
Rentacookie Oct 15, 2024
8befdce
Updated dockerfile
Rentacookie Oct 15, 2024
c2bd479
Added mysql module to service tsconfig
Rentacookie Oct 15, 2024
38737d4
Fixed dockerfile mysql module copy
Rentacookie Oct 16, 2024
62ffaac
Updated zongji dev package version
Rentacookie Oct 16, 2024
1ff485d
Updated vitest
Rentacookie Oct 18, 2024
415019b
Added configuration error handling
Rentacookie Oct 18, 2024
186199d
Merge branch 'feat/modular-replication-architecture' into feat/mysql-…
Rentacookie Oct 18, 2024
b4b1f6e
Don't start replication if already aborted
Rentacookie Oct 18, 2024
049669b
Exposed DateStrings connection option in Zongji constructor
Rentacookie Oct 21, 2024
5758003
Lockfile
Rentacookie Oct 21, 2024
2f9e24f
Added ConnectionTester interface
Rentacookie Oct 22, 2024
cabcb93
Made it possible to specify timezone on zongji listener configuration…
Rentacookie Oct 23, 2024
5ce7ebf
Correctly handle date parsing on binlog events and table snapshotting
Rentacookie Oct 23, 2024
80f6c05
Merge branch 'feat/modular-replication-architecture' into feat/mysql-…
Rentacookie Oct 23, 2024
71f316f
Some cleanup
Rentacookie Oct 23, 2024
618b10e
fix esm stuff
dylanvorster Oct 23, 2024
00b079a
Using syncrule id for MySQL serverId
Rentacookie Oct 28, 2024
8610431
Changeset
Rentacookie Oct 28, 2024
8e7d223
Prevent mysql connection manager throwing errors on shutdown.
Rentacookie Oct 28, 2024
6d8aec9
Improved shutdown logic of binlog stream
Rentacookie Oct 28, 2024
7c073b1
Renamed Checkpoint to ReplicationCheckpoint
Rentacookie Oct 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3

- name: Build and push
- name: Test Build Docker Image
uses: docker/build-push-action@v5
with:
cache-from: type=registry,ref=stevenontong/${{vars.DOCKER_REGISTRY}}:cache
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
"scripts": {
"build": "tsc -b",
"build:tests": "tsc -b test/tsconfig.json",
"clean": "rm -rf ./lib && tsc -b --clean",
"clean": "rm -rf ./dist && tsc -b --clean",
"test": "vitest --no-threads"
},
"exports": {
Expand Down Expand Up @@ -41,7 +41,7 @@
"devDependencies": {
"@types/uuid": "^9.0.4",
"typescript": "^5.2.2",
"vitest": "^0.34.6",
"vitest": "^2.1.1",
"vite-tsconfig-paths": "^4.3.2"
}
}
4 changes: 2 additions & 2 deletions modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
await this.client.close();
}

async getSourceConfig(): Promise<service_types.configFile.DataSourceConfig> {
async getSourceConfig(): Promise<service_types.configFile.ResolvedDataSourceConfig> {
return this.config;
}

Expand Down Expand Up @@ -165,7 +165,7 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
return result;
}

async getReplicationLag(syncRulesId: string): Promise<number | undefined> {
async getReplicationLag(options: api.ReplicationLagOptions): Promise<number | undefined> {
// There is no fast way to get replication lag in bytes in MongoDB.
// We can get replication lag in seconds, but need a different API for that.
return undefined;
Expand Down
6 changes: 1 addition & 5 deletions modules/module-mongodb/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1 @@
import { MongoModule } from './module/MongoModule.js';

export const module = new MongoModule();

export default module;
export * from './module/MongoModule.js';
13 changes: 13 additions & 0 deletions modules/module-mongodb/src/module/MongoModule.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import { ConnectionManagerFactory } from '../replication/ConnectionManagerFactor
import { MongoErrorRateLimiter } from '../replication/MongoErrorRateLimiter.js';
import { ChangeStreamReplicator } from '../replication/ChangeStreamReplicator.js';
import * as types from '../types/types.js';
import { MongoManager } from '../replication/MongoManager.js';
import { checkSourceConfiguration } from '../replication/replication-utils.js';

export class MongoModule extends replication.ReplicationModule<types.MongoConnectionConfig> {
constructor() {
Expand Down Expand Up @@ -49,4 +51,15 @@ export class MongoModule extends replication.ReplicationModule<types.MongoConnec
async teardown(options: TearDownOptions): Promise<void> {
// TODO: Implement?
}

async testConnection(config: types.MongoConnectionConfig): Promise<void> {
this.decodeConfig(config);
const normalisedConfig = this.resolveConfig(this.decodedConfig!);
const connectionManager = new MongoManager(normalisedConfig);
try {
return checkSourceConfiguration(connectionManager);
} finally {
await connectionManager.end();
}
}
}
14 changes: 7 additions & 7 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { container, logger } from '@powersync/lib-services-framework';
import { Metrics, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
import { Metrics, SaveOperationTag, SourceEntityDescriptor, SourceTable, storage } from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import * as mongo from 'mongodb';
import { MongoManager } from './MongoManager.js';
import {
Expand Down Expand Up @@ -248,7 +248,7 @@ export class ChangeStream {

// This auto-flushes when the batch reaches its size limit
await batch.save({
tag: 'insert',
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
Expand Down Expand Up @@ -330,7 +330,7 @@ export class ChangeStream {
if (change.operationType == 'insert') {
const baseRecord = constructAfterRecord(change.fullDocument);
return await batch.save({
tag: 'insert',
tag: SaveOperationTag.INSERT,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
Expand All @@ -341,15 +341,15 @@ export class ChangeStream {
if (change.fullDocument == null) {
// Treat as delete
return await batch.save({
tag: 'delete',
tag: SaveOperationTag.DELETE,
sourceTable: table,
before: undefined,
beforeReplicaId: change.documentKey._id
});
}
const after = constructAfterRecord(change.fullDocument!);
return await batch.save({
tag: 'update',
tag: SaveOperationTag.UPDATE,
sourceTable: table,
before: undefined,
beforeReplicaId: undefined,
Expand All @@ -358,7 +358,7 @@ export class ChangeStream {
});
} else if (change.operationType == 'delete') {
return await batch.save({
tag: 'delete',
tag: SaveOperationTag.DELETE,
sourceTable: table,
before: undefined,
beforeReplicaId: change.documentKey._id
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { container } from '@powersync/lib-services-framework';
import { MongoManager } from './MongoManager.js';
import { MissingReplicationSlotError, ChangeStream } from './ChangeStream.js';

import { replication } from '@powersync/service-core';
Expand All @@ -13,12 +12,10 @@ export interface ChangeStreamReplicationJobOptions extends replication.AbstractR

export class ChangeStreamReplicationJob extends replication.AbstractReplicationJob {
private connectionFactory: ConnectionManagerFactory;
private readonly connectionManager: MongoManager;

constructor(options: ChangeStreamReplicationJobOptions) {
super(options);
this.connectionFactory = options.connectionFactory;
this.connectionManager = this.connectionFactory.create();
}

async cleanUp(): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ import { ChangeStreamReplicationJob } from './ChangeStreamReplicationJob.js';
import { ConnectionManagerFactory } from './ConnectionManagerFactory.js';
import { MongoErrorRateLimiter } from './MongoErrorRateLimiter.js';

export interface WalStreamReplicatorOptions extends replication.AbstractReplicatorOptions {
export interface ChangeStreamReplicatorOptions extends replication.AbstractReplicatorOptions {
connectionFactory: ConnectionManagerFactory;
}

export class ChangeStreamReplicator extends replication.AbstractReplicator<ChangeStreamReplicationJob> {
private readonly connectionFactory: ConnectionManagerFactory;

constructor(options: WalStreamReplicatorOptions) {
constructor(options: ChangeStreamReplicatorOptions) {
super(options);
this.connectionFactory = options.connectionFactory;
}
Expand Down
12 changes: 12 additions & 0 deletions modules/module-mongodb/src/replication/replication-utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import * as mongo from 'mongodb';
import { MongoManager } from './MongoManager.js';

export async function checkSourceConfiguration(connectionManager: MongoManager): Promise<void> {
const db = connectionManager.db;
const hello = await db.command({ hello: 1 });
if (hello.msg == 'isdbgrid') {
throw new Error('Sharded MongoDB Clusters are not supported yet (including MongoDB Serverless instances).');
} else if (hello.setName == null) {
throw new Error('Standalone MongoDB instances are not supported - use a replicaset.');
}
}
2 changes: 1 addition & 1 deletion modules/module-mongodb/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export interface NormalizedMongoConnectionConfig {
password?: string;
}

export const MongoConnectionConfig = service_types.configFile.dataSourceConfig.and(
export const MongoConnectionConfig = service_types.configFile.DataSourceConfig.and(
t.object({
type: t.literal(MONGO_CONNECTION_TYPE),
/** Unique identifier for the connection - optional when a single connection is present. */
Expand Down
24 changes: 12 additions & 12 deletions modules/module-mongodb/test/src/change_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { MONGO_STORAGE_FACTORY } from '@core-tests/util.js';
import { BucketStorageFactory } from '@powersync/service-core';
import * as crypto from 'crypto';
import { describe, expect, test } from 'vitest';
import { walStreamTest } from './change_stream_utils.js';
import { changeStreamTest } from './change_stream_utils.js';
import * as mongo from 'mongodb';
import { setTimeout } from 'node:timers/promises';

Expand All @@ -19,15 +19,15 @@ bucket_definitions:
describe(
'change stream - mongodb',
function () {
defineWalStreamTests(MONGO_STORAGE_FACTORY);
defineChangeStreamTests(MONGO_STORAGE_FACTORY);
},
{ timeout: 20_000 }
);

function defineWalStreamTests(factory: StorageFactory) {
function defineChangeStreamTests(factory: StorageFactory) {
test(
'replicating basic values',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
Expand Down Expand Up @@ -66,7 +66,7 @@ bucket_definitions:

test(
'no fullDocument available',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db, client } = context;
await context.updateSyncRules(`
bucket_definitions:
Expand Down Expand Up @@ -111,7 +111,7 @@ bucket_definitions:

test(
'replicating case sensitive table',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
Expand All @@ -136,7 +136,7 @@ bucket_definitions:

test(
'replicating large values',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
await context.updateSyncRules(`
bucket_definitions:
Expand Down Expand Up @@ -168,7 +168,7 @@ bucket_definitions:

test(
'replicating dropCollection',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
const syncRuleContent = `
bucket_definitions:
Expand Down Expand Up @@ -200,7 +200,7 @@ bucket_definitions:

test(
'replicating renameCollection',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
const syncRuleContent = `
bucket_definitions:
Expand Down Expand Up @@ -232,7 +232,7 @@ bucket_definitions:

test(
'initial sync',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

Expand All @@ -251,7 +251,7 @@ bucket_definitions:
// Not correctly implemented yet
test.skip(
'large record',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
await context.updateSyncRules(`bucket_definitions:
global:
data:
Expand Down Expand Up @@ -287,7 +287,7 @@ bucket_definitions:

test(
'table not in sync rules',
walStreamTest(factory, async (context) => {
changeStreamTest(factory, async (context) => {
const { db } = context;
await context.updateSyncRules(BASIC_SYNC_RULES);

Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ import * as mongo from 'mongodb';
import { createCheckpoint } from '@module/replication/MongoRelation.js';

/**
* Tests operating on the wal stream need to configure the stream and manage asynchronous
* Tests operating on the mongo change stream need to configure the stream and manage asynchronous
* replication, which gets a little tricky.
*
* This wraps a test in a function that configures all the context, and tears it down afterwards.
*/
export function walStreamTest(
export function changeStreamTest(
factory: () => Promise<BucketStorageFactory>,
test: (context: ChangeStreamTestContext) => Promise<void>
): () => Promise<void> {
Expand Down
67 changes: 67 additions & 0 deletions modules/module-mysql/LICENSE
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Functional Source License, Version 1.1, Apache 2.0 Future License

## Abbreviation

FSL-1.1-Apache-2.0

## Notice

Copyright 2023-2024 Journey Mobile, Inc.

## Terms and Conditions

### Licensor ("We")

The party offering the Software under these Terms and Conditions.

### The Software

The "Software" is each version of the software that we make available under these Terms and Conditions, as indicated by our inclusion of these Terms and Conditions with the Software.

### License Grant

Subject to your compliance with this License Grant and the Patents, Redistribution and Trademark clauses below, we hereby grant you the right to use, copy, modify, create derivative works, publicly perform, publicly display and redistribute the Software for any Permitted Purpose identified below.

### Permitted Purpose

A Permitted Purpose is any purpose other than a Competing Use. A Competing Use means making the Software available to others in a commercial product or service that:

1. substitutes for the Software;
2. substitutes for any other product or service we offer using the Software that exists as of the date we make the Software available; or
3. offers the same or substantially similar functionality as the Software.

Permitted Purposes specifically include using the Software:

1. for your internal use and access;
2. for non-commercial education;
3. for non-commercial research; and
4. in connection with professional services that you provide to a licensee using the Software in accordance with these Terms and Conditions.

### Patents

To the extent your use for a Permitted Purpose would necessarily infringe our patents, the license grant above includes a license under our patents. If you make a claim against any party that the Software infringes or contributes to the infringement of any patent, then your patent license to the Software ends immediately.

### Redistribution

The Terms and Conditions apply to all copies, modifications and derivatives of the Software.
If you redistribute any copies, modifications or derivatives of the Software, you must include a copy of or a link to these Terms and Conditions and not remove any copyright notices provided in or with the Software.

### Disclaimer

THE SOFTWARE IS PROVIDED "AS IS" AND WITHOUT WARRANTIES OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING WITHOUT LIMITATION WARRANTIES OF FITNESS FOR A PARTICULAR PURPOSE, MERCHANTABILITY, TITLE OR NON-INFRINGEMENT.
IN NO EVENT WILL WE HAVE ANY LIABILITY TO YOU ARISING OUT OF OR RELATED TO THE SOFTWARE, INCLUDING INDIRECT, SPECIAL, INCIDENTAL OR CONSEQUENTIAL DAMAGES, EVEN IF WE HAVE BEEN INFORMED OF THEIR POSSIBILITY IN ADVANCE.

### Trademarks

Except for displaying the License Details and identifying us as the origin of the Software, you have no right under these Terms and Conditions to use our trademarks, trade names, service marks or product names.

## Grant of Future License

We hereby irrevocably grant you an additional license to use the Software under the Apache License, Version 2.0 that is effective on the second anniversary of the date we make the Software available. On or after that date, you may use the Software under the Apache License, Version 2.0, in which case the following will apply:

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
3 changes: 3 additions & 0 deletions modules/module-mysql/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# PowerSync MySQL Module

This is a module which provides MySQL replication to PowerSync.
2 changes: 2 additions & 0 deletions modules/module-mysql/dev/.env.template
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
PS_MONGO_URI=mongodb://mongo:27017/powersync_demo
PS_PORT=8080
9 changes: 9 additions & 0 deletions modules/module-mysql/dev/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# MySQL Development Helpers

This folder contains some helpers for developing with MySQL.

- `./.env.template` contains basic settings to be applied to a root `.env` file
- `./config` contains YAML configuration files for a MySQL todo list application
- `./docker/mysql` contains a docker compose file for starting Mysql

TODO this does not contain any auth or backend functionality.
Loading
Loading