Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/olive-bags-wave.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-sync-rules': minor
'@powersync/service-image': minor
---

Introduce the `config` option on sync rules which can be used to opt-in to new features and backwards-incompatible fixes of historical issues with the PowerSync service.
6 changes: 6 additions & 0 deletions .changeset/popular-zoos-hang.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-sync-rules': minor
'@powersync/service-image': minor
---

Add the `timestamps_iso8601` option in the `config:` block for sync rules. When enabled, timestamps are consistently formatted using ISO 8601 format.
1 change: 1 addition & 0 deletions libs/lib-services/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"keywords": [],
"dependencies": {
"@powersync/service-errors": "workspace:*",
"@powersync/service-sync-rules": "workspace:*",
"ajv": "^8.12.0",
"better-ajv-errors": "^1.2.0",
"bson": "^6.10.3",
Expand Down
7 changes: 5 additions & 2 deletions libs/lib-services/src/codec/codecs.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as t from 'ts-codec';
import * as bson from 'bson';
import { DateTimeValue } from '@powersync/service-sync-rules';

export const buffer = t.codec<Buffer, string>(
'Buffer',
Expand All @@ -12,7 +13,7 @@ export const buffer = t.codec<Buffer, string>(
(buffer) => Buffer.from(buffer, 'base64')
);

export const date = t.codec<Date, string>(
export const date = t.codec<Date, string | DateTimeValue>(
'Date',
(date) => {
if (!(date instanceof Date)) {
Expand All @@ -21,7 +22,9 @@ export const date = t.codec<Date, string>(
return date.toISOString();
},
(date) => {
const parsed = new Date(date);
// In our jpgwire wrapper, we patch the row decoding logic to map timestamps into TimeValue instances, so we need to
// support those here.
const parsed = new Date(date instanceof DateTimeValue ? date.iso8601Representation : date);
if (isNaN(parsed.getTime())) {
throw new t.TransformError([`Invalid date`]);
}
Expand Down
4 changes: 2 additions & 2 deletions modules/module-mongodb/src/replication/ChangeStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import {
SourceTable,
storage
} from '@powersync/service-core';
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
import { ReplicationMetric } from '@powersync/service-types';
import { MongoLSN } from '../common/MongoLSN.js';
import { PostImagesOption } from '../types/types.js';
Expand Down Expand Up @@ -439,7 +439,7 @@ export class ChangeStream {
return { $match: { ns: { $in: $inFilters } }, multipleDatabases };
}

static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteRow> {
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
for (let row of results) {
yield constructAfterRecord(row);
}
Expand Down
42 changes: 26 additions & 16 deletions modules/module-mongodb/src/replication/MongoRelation.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { storage } from '@powersync/service-core';
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules';
import {
CompatibilityContext,
CustomArray,
CustomObject,
CustomSqliteValue,
DatabaseInputValue,
SqliteInputRow,
SqliteInputValue,
SqliteRow,
SqliteValue,
DateTimeValue
} from '@powersync/service-sync-rules';

import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
import { MongoLSN } from '../common/MongoLSN.js';
Expand All @@ -27,15 +38,15 @@ export function getCacheIdentifier(source: storage.SourceEntityDescriptor | stor
return `${source.schema}.${source.name}`;
}

export function constructAfterRecord(document: mongo.Document): SqliteRow {
let record: SqliteRow = {};
export function constructAfterRecord(document: mongo.Document): SqliteInputRow {
let record: SqliteInputRow = {};
for (let key of Object.keys(document)) {
record[key] = toMongoSyncRulesValue(document[key]);
}
return record;
}

export function toMongoSyncRulesValue(data: any): SqliteValue {
export function toMongoSyncRulesValue(data: any): SqliteInputValue {
const autoBigNum = true;
if (data === null) {
return null;
Expand All @@ -60,7 +71,8 @@ export function toMongoSyncRulesValue(data: any): SqliteValue {
} else if (data instanceof mongo.UUID) {
return data.toHexString();
} else if (data instanceof Date) {
return data.toISOString().replace('T', ' ');
const isoString = data.toISOString();
return new DateTimeValue(isoString);
} else if (data instanceof mongo.Binary) {
return new Uint8Array(data.buffer);
} else if (data instanceof mongo.Long) {
Expand All @@ -72,26 +84,21 @@ export function toMongoSyncRulesValue(data: any): SqliteValue {
} else if (data instanceof RegExp) {
return JSON.stringify({ pattern: data.source, options: data.flags });
} else if (Array.isArray(data)) {
// We may be able to avoid some parse + stringify cycles here for JsonSqliteContainer.
return JSONBig.stringify(data.map((element) => filterJsonData(element)));
return new CustomArray(data, filterJsonData);
} else if (data instanceof Uint8Array) {
return data;
} else if (data instanceof JsonContainer) {
return data.toString();
} else if (typeof data == 'object') {
let record: Record<string, any> = {};
for (let key of Object.keys(data)) {
record[key] = filterJsonData(data[key]);
}
return JSONBig.stringify(record);
return new CustomObject(data, filterJsonData);
} else {
return null;
}
}

const DEPTH_LIMIT = 20;

function filterJsonData(data: any, depth = 0): any {
function filterJsonData(data: any, context: CompatibilityContext, depth = 0): any {
const autoBigNum = true;
if (depth > DEPTH_LIMIT) {
// This is primarily to prevent infinite recursion
Expand All @@ -117,7 +124,8 @@ function filterJsonData(data: any, depth = 0): any {
} else if (typeof data == 'bigint') {
return data;
} else if (data instanceof Date) {
return data.toISOString().replace('T', ' ');
const isoString = data.toISOString();
return new DateTimeValue(isoString).toSqliteValue(context);
} else if (data instanceof mongo.ObjectId) {
return data.toHexString();
} else if (data instanceof mongo.UUID) {
Expand All @@ -133,16 +141,18 @@ function filterJsonData(data: any, depth = 0): any {
} else if (data instanceof RegExp) {
return { pattern: data.source, options: data.flags };
} else if (Array.isArray(data)) {
return data.map((element) => filterJsonData(element, depth + 1));
return data.map((element) => filterJsonData(element, context, depth + 1));
} else if (ArrayBuffer.isView(data)) {
return undefined;
} else if (data instanceof CustomSqliteValue) {
return data.toSqliteValue(context);
} else if (data instanceof JsonContainer) {
// Can be stringified directly when using our JSONBig implementation
return data;
} else if (typeof data == 'object') {
let record: Record<string, any> = {};
for (let key of Object.keys(data)) {
record[key] = filterJsonData(data[key], depth + 1);
record[key] = filterJsonData(data[key], context, depth + 1);
}
return record;
} else {
Expand Down
74 changes: 58 additions & 16 deletions modules/module-mongodb/test/src/mongo_test.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules';
import {
applyRowContext,
CompatibilityContext,
CompatibilityEdition,
SqliteInputRow,
SqlSyncRules
} from '@powersync/service-sync-rules';
import { describe, expect, test } from 'vitest';

import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js';
Expand Down Expand Up @@ -138,8 +144,10 @@ describe('mongo data types', () => {
]);
}

function checkResults(transformed: Record<string, any>[]) {
expect(transformed[0]).toMatchObject({
function checkResults(transformed: SqliteInputRow[]) {
const sqliteValue = transformed.map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));

expect(sqliteValue[0]).toMatchObject({
_id: 1n,
text: 'text',
uuid: 'baeb2514-4c57-436d-b3cc-c1256211656d',
Expand All @@ -152,17 +160,17 @@ describe('mongo data types', () => {
null: null,
decimal: '3.14'
});
expect(transformed[1]).toMatchObject({
expect(sqliteValue[1]).toMatchObject({
_id: 2n,
nested: '{"test":"thing"}'
});

expect(transformed[2]).toMatchObject({
expect(sqliteValue[2]).toMatchObject({
_id: 3n,
date: '2023-03-06 13:47:00.000Z'
});

expect(transformed[3]).toMatchObject({
expect(sqliteValue[3]).toMatchObject({
_id: 4n,
objectId: '66e834cc91d805df11fa0ecb',
timestamp: 1958505087099n,
Expand All @@ -177,9 +185,9 @@ describe('mongo data types', () => {
});

// This must specifically be null, and not undefined.
expect(transformed[4].undefined).toBeNull();
expect(sqliteValue[4].undefined).toBeNull();

expect(transformed[5]).toMatchObject({
expect(sqliteValue[5]).toMatchObject({
_id: 6n,
int4: -1n,
int8: -9007199254740993n,
Expand All @@ -188,8 +196,10 @@ describe('mongo data types', () => {
});
}

function checkResultsNested(transformed: Record<string, any>[]) {
expect(transformed[0]).toMatchObject({
function checkResultsNested(transformed: SqliteInputRow[]) {
const sqliteValue = transformed.map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY));

expect(sqliteValue[0]).toMatchObject({
_id: 1n,
text: `["text"]`,
uuid: '["baeb2514-4c57-436d-b3cc-c1256211656d"]',
Expand All @@ -204,30 +214,30 @@ describe('mongo data types', () => {

// Note: Depending on to what extent we use the original postgres value, the whitespace may change, and order may change.
// We do expect that decimals and big numbers are preserved.
expect(transformed[1]).toMatchObject({
expect(sqliteValue[1]).toMatchObject({
_id: 2n,
nested: '[{"test":"thing"}]'
});

expect(transformed[2]).toMatchObject({
expect(sqliteValue[2]).toMatchObject({
_id: 3n,
date: '["2023-03-06 13:47:00.000Z"]'
});

expect(transformed[3]).toMatchObject({
expect(sqliteValue[3]).toMatchObject({
_id: 5n,
undefined: '[null]'
});

expect(transformed[4]).toMatchObject({
expect(sqliteValue[4]).toMatchObject({
_id: 6n,
int4: '[-1]',
int8: '[-9007199254740993]',
float: '[-3.14]',
decimal: '["-3.14"]'
});

expect(transformed[5]).toMatchObject({
expect(sqliteValue[5]).toMatchObject({
_id: 10n,
objectId: '["66e834cc91d805df11fa0ecb"]',
timestamp: '[1958505087099]',
Expand Down Expand Up @@ -522,13 +532,45 @@ bucket_definitions:
errors: []
});
});

test('date format', async () => {
const { db, client } = await connectMongoData();
const collection = db.collection('test_data');
try {
await setupTable(db);
await collection.insertOne({
fraction: new Date('2023-03-06 15:47:01.123+02'),
noFraction: new Date('2023-03-06 15:47:01+02')
});

const rawResults = await db
.collection('test_data')
.find({}, { sort: { _id: 1 } })
.toArray();
const [row] = [...ChangeStream.getQueryData(rawResults)];

const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
expect(oldFormat).toMatchObject({
fraction: '2023-03-06 13:47:01.123Z',
noFraction: '2023-03-06 13:47:01.000Z'
});

const newFormat = applyRowContext(row, new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS));
expect(newFormat).toMatchObject({
fraction: '2023-03-06T13:47:01.123Z',
noFraction: '2023-03-06T13:47:01.000Z'
});
} finally {
await client.close();
}
});
});

/**
* Return all the inserts from the first transaction in the replication stream.
*/
async function getReplicationTx(replicationStream: mongo.ChangeStream, count: number) {
let transformed: SqliteRow[] = [];
let transformed: SqliteInputRow[] = [];
for await (const doc of replicationStream) {
// Specifically filter out map_input / map_output collections
if (!(doc as any)?.ns?.coll?.startsWith('test_data')) {
Expand Down
5 changes: 4 additions & 1 deletion modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,10 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
*/
return fields.map((c) => {
const value = row[c.name];
const sqlValue = sync_rules.toSyncRulesValue(value);
const sqlValue = sync_rules.applyValueContext(
sync_rules.toSyncRulesValue(value),
sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY
);
if (typeof sqlValue == 'bigint') {
return Number(value);
} else if (value instanceof Date) {
Expand Down
5 changes: 4 additions & 1 deletion modules/module-mysql/src/common/mysql-to-sqlite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,10 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu
};
}

export function toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
export function toSQLiteRow(
row: Record<string, any>,
columns: Map<string, ColumnDescriptor>
): sync_rules.SqliteInputRow {
let result: sync_rules.DatabaseInputRow = {};
for (let key in row) {
// We are very much expecting the column to be there
Expand Down
10 changes: 5 additions & 5 deletions modules/module-mysql/test/src/mysql-to-sqlite.test.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { SqliteRow } from '@powersync/service-sync-rules';
import { SqliteInputRow, SqliteRow } from '@powersync/service-sync-rules';
import { afterAll, describe, expect, test } from 'vitest';
import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js';
import { eventIsWriteMutation, eventIsXid } from '@module/replication/zongji/zongji-utils.js';
Expand Down Expand Up @@ -298,7 +298,7 @@ INSERT INTO test_data (
});
});

async function getDatabaseRows(connection: MySQLConnectionManager, tableName: string): Promise<SqliteRow[]> {
async function getDatabaseRows(connection: MySQLConnectionManager, tableName: string): Promise<SqliteInputRow[]> {
const [results, fields] = await connection.query(`SELECT * FROM ${tableName}`);
const columns = toColumnDescriptors(fields);
return results.map((row) => common.toSQLiteRow(row, columns));
Expand All @@ -307,15 +307,15 @@ async function getDatabaseRows(connection: MySQLConnectionManager, tableName: st
/**
* Return all the inserts from the first transaction in the binlog stream.
*/
async function getReplicatedRows(expectedTransactionsCount?: number): Promise<SqliteRow[]> {
let transformed: SqliteRow[] = [];
async function getReplicatedRows(expectedTransactionsCount?: number): Promise<SqliteInputRow[]> {
let transformed: SqliteInputRow[] = [];
const zongji = new ZongJi({
host: TEST_CONNECTION_OPTIONS.hostname,
user: TEST_CONNECTION_OPTIONS.username,
password: TEST_CONNECTION_OPTIONS.password
});

const completionPromise = new Promise<SqliteRow[]>((resolve, reject) => {
const completionPromise = new Promise<SqliteInputRow[]>((resolve, reject) => {
zongji.on('binlog', (evt: BinLogEvent) => {
try {
if (eventIsWriteMutation(evt)) {
Expand Down
Loading