Skip to content

Commit 10b14f5

Browse files
committed
Merge remote-tracking branch 'origin/main' into compact-parameters
2 parents 019b098 + 4a34a51 commit 10b14f5

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2290
-555
lines changed

.changeset/green-queens-float.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/service-core': patch
3+
---
4+
5+
Support BSON lines through the HTTP endpoint via the `application/vnd.powersync.bson-stream` content-type.

.changeset/wet-berries-enjoy.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
---
2+
'@powersync/service-module-postgres-storage': minor
3+
'@powersync/service-module-mongodb-storage': minor
4+
'@powersync/service-core-tests': minor
5+
'@powersync/service-module-postgres': minor
6+
'@powersync/service-module-mongodb': minor
7+
'@powersync/service-core': minor
8+
'@powersync/service-module-mysql': minor
9+
'@powersync/service-sync-rules': minor
10+
---
11+
12+
MySQL:
13+
- Added schema change handling
14+
- Except for some edge cases, the following schema changes are now handled automatically:
15+
- Creation, renaming, dropping and truncation of tables.
16+
- Creation and dropping of unique indexes and primary keys.
17+
- Adding, modifying, dropping and renaming of table columns.
18+
- If a schema change cannot handled automatically, a warning with details will be logged.
19+
- Mismatches in table schema from the Zongji binlog listener are now handled more gracefully.
20+
- Replication of wildcard tables is now supported.
21+
- Improved logging for binlog event processing.

modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,9 @@ export class MongoSyncBucketStorage
178178
async resolveTable(options: storage.ResolveTableOptions): Promise<storage.ResolveTableResult> {
179179
const { group_id, connection_id, connection_tag, entity_descriptor } = options;
180180

181-
const { schema, name: table, objectId, replicationColumns } = entity_descriptor;
181+
const { schema, name, objectId, replicaIdColumns } = entity_descriptor;
182182

183-
const columns = replicationColumns.map((column) => ({
183+
const normalizedReplicaIdColumns = replicaIdColumns.map((column) => ({
184184
name: column.name,
185185
type: column.type,
186186
type_oid: column.typeId
@@ -192,8 +192,8 @@ export class MongoSyncBucketStorage
192192
group_id: group_id,
193193
connection_id: connection_id,
194194
schema_name: schema,
195-
table_name: table,
196-
replica_id_columns2: columns
195+
table_name: name,
196+
replica_id_columns2: normalizedReplicaIdColumns
197197
};
198198
if (objectId != null) {
199199
filter.relation_id = objectId;
@@ -206,24 +206,24 @@ export class MongoSyncBucketStorage
206206
connection_id: connection_id,
207207
relation_id: objectId,
208208
schema_name: schema,
209-
table_name: table,
209+
table_name: name,
210210
replica_id_columns: null,
211-
replica_id_columns2: columns,
211+
replica_id_columns2: normalizedReplicaIdColumns,
212212
snapshot_done: false,
213213
snapshot_status: undefined
214214
};
215215

216216
await col.insertOne(doc, { session });
217217
}
218-
const sourceTable = new storage.SourceTable(
219-
doc._id,
220-
connection_tag,
221-
objectId,
222-
schema,
223-
table,
224-
replicationColumns,
225-
doc.snapshot_done ?? true
226-
);
218+
const sourceTable = new storage.SourceTable({
219+
id: doc._id,
220+
connectionTag: connection_tag,
221+
objectId: objectId,
222+
schema: schema,
223+
name: name,
224+
replicaIdColumns: replicaIdColumns,
225+
snapshotComplete: doc.snapshot_done ?? true
226+
});
227227
sourceTable.syncEvent = options.sync_rules.tableTriggersEvent(sourceTable);
228228
sourceTable.syncData = options.sync_rules.tableSyncsData(sourceTable);
229229
sourceTable.syncParameters = options.sync_rules.tableSyncsParameters(sourceTable);
@@ -238,7 +238,7 @@ export class MongoSyncBucketStorage
238238

239239
let dropTables: storage.SourceTable[] = [];
240240
// Detect tables that are either renamed, or have different replica_id_columns
241-
let truncateFilter = [{ schema_name: schema, table_name: table }] as any[];
241+
let truncateFilter = [{ schema_name: schema, table_name: name }] as any[];
242242
if (objectId != null) {
243243
// Only detect renames if the source uses relation ids.
244244
truncateFilter.push({ relation_id: objectId });
@@ -256,15 +256,16 @@ export class MongoSyncBucketStorage
256256
.toArray();
257257
dropTables = truncate.map(
258258
(doc) =>
259-
new storage.SourceTable(
260-
doc._id,
261-
connection_tag,
262-
doc.relation_id,
263-
doc.schema_name,
264-
doc.table_name,
265-
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
266-
doc.snapshot_done ?? true
267-
)
259+
new storage.SourceTable({
260+
id: doc._id,
261+
connectionTag: connection_tag,
262+
objectId: doc.relation_id,
263+
schema: doc.schema_name,
264+
name: doc.table_name,
265+
replicaIdColumns:
266+
doc.replica_id_columns2?.map((c) => ({ name: c.name, typeOid: c.type_oid, type: c.type })) ?? [],
267+
snapshotComplete: doc.snapshot_done ?? true
268+
})
268269
);
269270

270271
result = {
@@ -620,7 +621,6 @@ export class MongoSyncBucketStorage
620621
`${this.slot_name} Cleared batch of data in ${lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS}ms, continuing...`
621622
);
622623
await timers.setTimeout(lib_mongo.db.MONGO_CLEAR_OPERATION_TIMEOUT_MS / 5);
623-
continue;
624624
} else {
625625
throw e;
626626
}

modules/module-mongodb/src/api/MongoRouteAPIAdapter.ts

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import * as sync_rules from '@powersync/service-sync-rules';
55
import * as service_types from '@powersync/service-types';
66

77
import { MongoManager } from '../replication/MongoManager.js';
8-
import { constructAfterRecord, createCheckpoint, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
8+
import { constructAfterRecord, STANDALONE_CHECKPOINT_ID } from '../replication/MongoRelation.js';
99
import { CHECKPOINTS_COLLECTION } from '../replication/replication-utils.js';
1010
import * as types from '../types/types.js';
1111
import { escapeRegExp } from '../utils.js';
@@ -137,15 +137,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
137137
if (tablePattern.isWildcard) {
138138
patternResult.tables = [];
139139
for (let collection of collections) {
140-
const sourceTable = new SourceTable(
141-
0,
142-
this.connectionTag,
143-
collection.name,
144-
schema,
145-
collection.name,
146-
[],
147-
true
148-
);
140+
const sourceTable = new SourceTable({
141+
id: 0,
142+
connectionTag: this.connectionTag,
143+
objectId: collection.name,
144+
schema: schema,
145+
name: collection.name,
146+
replicaIdColumns: [],
147+
snapshotComplete: true
148+
});
149149
let errors: service_types.ReplicationError[] = [];
150150
if (collection.type == 'view') {
151151
errors.push({ level: 'warning', message: `Collection ${schema}.${tablePattern.name} is a view` });
@@ -164,15 +164,15 @@ export class MongoRouteAPIAdapter implements api.RouteAPI {
164164
});
165165
}
166166
} else {
167-
const sourceTable = new SourceTable(
168-
0,
169-
this.connectionTag,
170-
tablePattern.name,
171-
schema,
172-
tablePattern.name,
173-
[],
174-
true
175-
);
167+
const sourceTable = new SourceTable({
168+
id: 0,
169+
connectionTag: this.connectionTag,
170+
objectId: tablePattern.name,
171+
schema: schema,
172+
name: tablePattern.name,
173+
replicaIdColumns: [],
174+
snapshotComplete: true
175+
});
176176

177177
const syncData = sqlSyncRules.tableSyncsData(sourceTable);
178178
const syncParameters = sqlSyncRules.tableSyncsParameters(sourceTable);

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ export class ChangeStream {
215215

216216
async estimatedCountNumber(table: storage.SourceTable): Promise<number> {
217217
const db = this.client.db(table.schema);
218-
return await db.collection(table.table).estimatedDocumentCount();
218+
return await db.collection(table.name).estimatedDocumentCount();
219219
}
220220

221221
/**
@@ -449,7 +449,7 @@ export class ChangeStream {
449449
const totalEstimatedCount = await this.estimatedCountNumber(table);
450450
let at = table.snapshotStatus?.replicatedCount ?? 0;
451451
const db = this.client.db(table.schema);
452-
const collection = db.collection(table.table);
452+
const collection = db.collection(table.name);
453453
await using query = new ChunkedSnapshotQuery({
454454
collection,
455455
key: table.snapshotStatus?.lastKey,

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
1313
schema: source.db,
1414
// Not relevant for MongoDB - we use db + coll name as the identifier
1515
objectId: undefined,
16-
replicationColumns: [{ name: '_id' }]
16+
replicaIdColumns: [{ name: '_id' }]
1717
} satisfies storage.SourceEntityDescriptor;
1818
}
1919

@@ -22,7 +22,7 @@ export function getMongoRelation(source: mongo.ChangeStreamNameSpace): storage.S
2222
*/
2323
export function getCacheIdentifier(source: storage.SourceEntityDescriptor | storage.SourceTable): string {
2424
if (source instanceof storage.SourceTable) {
25-
return `${source.schema}.${source.table}`;
25+
return `${source.schema}.${source.name}`;
2626
}
2727
return `${source.schema}.${source.name}`;
2828
}

modules/module-mysql/dev/docker/mysql/init-scripts/my.cnf

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,4 @@ enforce-gtid-consistency = ON
44
# Row format required for ZongJi
55
binlog_format = row
66
log_bin=mysql-bin
7-
server-id=1
8-
binlog-do-db=mydatabase
9-
replicate-do-table=mydatabase.lists
7+
server-id=1

modules/module-mysql/package.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,10 @@
3333
"@powersync/service-sync-rules": "workspace:*",
3434
"@powersync/service-types": "workspace:*",
3535
"@powersync/service-jsonbig": "workspace:*",
36-
"@powersync/mysql-zongji": "0.2.0",
36+
"@powersync/mysql-zongji": "^0.4.0",
3737
"async": "^3.2.4",
3838
"mysql2": "^3.11.0",
39+
"node-sql-parser": "^5.3.9",
3940
"semver": "^7.5.4",
4041
"ts-codec": "^1.3.0",
4142
"uri-js": "^4.4.1",

modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
208208
idColumnsResult = await common.getReplicationIdentityColumns({
209209
connection: connection,
210210
schema,
211-
table_name: tableName
211+
tableName: tableName
212212
});
213213
} catch (ex) {
214214
idColumnsError = { level: 'fatal', message: ex.message };
@@ -217,7 +217,15 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
217217
}
218218

219219
const idColumns = idColumnsResult?.columns ?? [];
220-
const sourceTable = new storage.SourceTable(0, this.config.tag, tableName, schema, tableName, idColumns, true);
220+
const sourceTable = new storage.SourceTable({
221+
id: 0,
222+
connectionTag: this.config.tag,
223+
objectId: tableName,
224+
schema: schema,
225+
name: tableName,
226+
replicaIdColumns: idColumns,
227+
snapshotComplete: true
228+
});
221229
const syncData = syncRules.tableSyncsData(sourceTable);
222230
const syncParameters = syncRules.tableSyncsParameters(sourceTable);
223231

@@ -232,7 +240,7 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
232240
let selectError: service_types.ReplicationError | null = null;
233241
try {
234242
await this.retriedQuery({
235-
query: `SELECT * FROM ${sourceTable.table} LIMIT 1`
243+
query: `SELECT * FROM ${sourceTable.name} LIMIT 1`
236244
});
237245
} catch (e) {
238246
selectError = { level: 'fatal', message: e.message };

modules/module-mysql/src/common/ReplicatedGTID.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,15 @@ export class ReplicatedGTID {
9292
* @returns A comparable string in the format
9393
* `padded_end_transaction|raw_gtid|binlog_filename|binlog_position`
9494
*/
95-
get comparable() {
95+
get comparable(): string {
9696
const { raw, position } = this;
9797
const [, transactionRanges] = this.raw.split(':');
9898

99+
// This means no transactions have been executed on the database yet
100+
if (!transactionRanges) {
101+
return ReplicatedGTID.ZERO.comparable;
102+
}
103+
99104
let maxTransactionId = 0;
100105

101106
for (const range of transactionRanges.split(',')) {

0 commit comments

Comments
 (0)