diff --git a/.changeset/olive-bags-wave.md b/.changeset/olive-bags-wave.md new file mode 100644 index 000000000..d0aecc8d9 --- /dev/null +++ b/.changeset/olive-bags-wave.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-sync-rules': minor +'@powersync/service-image': minor +--- + +Introduce the `config` option on sync rules which can be used to opt-in to new features and backwards-incompatible fixes of historical issues with the PowerSync service. diff --git a/.changeset/popular-zoos-hang.md b/.changeset/popular-zoos-hang.md new file mode 100644 index 000000000..32512150e --- /dev/null +++ b/.changeset/popular-zoos-hang.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-sync-rules': minor +'@powersync/service-image': minor +--- + +Add the `timestamps_iso8601` option in the `config:` block for sync rules. When enabled, timestamps are consistently formatted using ISO 8601 format. \ No newline at end of file diff --git a/libs/lib-services/package.json b/libs/lib-services/package.json index 0cf81bf42..042f56b75 100644 --- a/libs/lib-services/package.json +++ b/libs/lib-services/package.json @@ -21,6 +21,7 @@ "keywords": [], "dependencies": { "@powersync/service-errors": "workspace:*", + "@powersync/service-sync-rules": "workspace:*", "ajv": "^8.12.0", "better-ajv-errors": "^1.2.0", "bson": "^6.10.3", diff --git a/libs/lib-services/src/codec/codecs.ts b/libs/lib-services/src/codec/codecs.ts index b584de036..5f1888dc1 100644 --- a/libs/lib-services/src/codec/codecs.ts +++ b/libs/lib-services/src/codec/codecs.ts @@ -1,5 +1,6 @@ import * as t from 'ts-codec'; import * as bson from 'bson'; +import { DateTimeValue } from '@powersync/service-sync-rules'; export const buffer = t.codec( 'Buffer', @@ -12,7 +13,7 @@ export const buffer = t.codec( (buffer) => Buffer.from(buffer, 'base64') ); -export const date = t.codec( +export const date = t.codec( 'Date', (date) => { if (!(date instanceof Date)) { @@ -21,7 +22,9 @@ export const date = t.codec( return date.toISOString(); }, (date) => { - const parsed = new Date(date); + // In our jpgwire wrapper, we patch the row decoding logic to map timestamps into TimeValue instances, so we need to + // support those here. + const parsed = new Date(date instanceof DateTimeValue ? date.iso8601Representation : date); if (isNaN(parsed.getTime())) { throw new t.TransformError([`Invalid date`]); } diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 0a516af82..e22a2f713 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -17,7 +17,7 @@ import { SourceTable, storage } from '@powersync/service-core'; -import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; +import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules'; import { ReplicationMetric } from '@powersync/service-types'; import { MongoLSN } from '../common/MongoLSN.js'; import { PostImagesOption } from '../types/types.js'; @@ -439,7 +439,7 @@ export class ChangeStream { return { $match: { ns: { $in: $inFilters } }, multipleDatabases }; } - static *getQueryData(results: Iterable): Generator { + static *getQueryData(results: Iterable): Generator { for (let row of results) { yield constructAfterRecord(row); } diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 4ac11efb7..20e78a413 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -1,7 +1,18 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { storage } from '@powersync/service-core'; import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; -import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules'; +import { + CompatibilityContext, + CustomArray, + CustomObject, + CustomSqliteValue, + DatabaseInputValue, + SqliteInputRow, + SqliteInputValue, + SqliteRow, + SqliteValue, + DateTimeValue +} from '@powersync/service-sync-rules'; import { ErrorCode, ServiceError } from '@powersync/lib-services-framework'; import { MongoLSN } from '../common/MongoLSN.js'; @@ -27,15 +38,15 @@ export function getCacheIdentifier(source: storage.SourceEntityDescriptor | stor return `${source.schema}.${source.name}`; } -export function constructAfterRecord(document: mongo.Document): SqliteRow { - let record: SqliteRow = {}; +export function constructAfterRecord(document: mongo.Document): SqliteInputRow { + let record: SqliteInputRow = {}; for (let key of Object.keys(document)) { record[key] = toMongoSyncRulesValue(document[key]); } return record; } -export function toMongoSyncRulesValue(data: any): SqliteValue { +export function toMongoSyncRulesValue(data: any): SqliteInputValue { const autoBigNum = true; if (data === null) { return null; @@ -60,7 +71,8 @@ export function toMongoSyncRulesValue(data: any): SqliteValue { } else if (data instanceof mongo.UUID) { return data.toHexString(); } else if (data instanceof Date) { - return data.toISOString().replace('T', ' '); + const isoString = data.toISOString(); + return new DateTimeValue(isoString); } else if (data instanceof mongo.Binary) { return new Uint8Array(data.buffer); } else if (data instanceof mongo.Long) { @@ -72,18 +84,13 @@ export function toMongoSyncRulesValue(data: any): SqliteValue { } else if (data instanceof RegExp) { return JSON.stringify({ pattern: data.source, options: data.flags }); } else if (Array.isArray(data)) { - // We may be able to avoid some parse + stringify cycles here for JsonSqliteContainer. - return JSONBig.stringify(data.map((element) => filterJsonData(element))); + return new CustomArray(data, filterJsonData); } else if (data instanceof Uint8Array) { return data; } else if (data instanceof JsonContainer) { return data.toString(); } else if (typeof data == 'object') { - let record: Record = {}; - for (let key of Object.keys(data)) { - record[key] = filterJsonData(data[key]); - } - return JSONBig.stringify(record); + return new CustomObject(data, filterJsonData); } else { return null; } @@ -91,7 +98,7 @@ export function toMongoSyncRulesValue(data: any): SqliteValue { const DEPTH_LIMIT = 20; -function filterJsonData(data: any, depth = 0): any { +function filterJsonData(data: any, context: CompatibilityContext, depth = 0): any { const autoBigNum = true; if (depth > DEPTH_LIMIT) { // This is primarily to prevent infinite recursion @@ -117,7 +124,8 @@ function filterJsonData(data: any, depth = 0): any { } else if (typeof data == 'bigint') { return data; } else if (data instanceof Date) { - return data.toISOString().replace('T', ' '); + const isoString = data.toISOString(); + return new DateTimeValue(isoString).toSqliteValue(context); } else if (data instanceof mongo.ObjectId) { return data.toHexString(); } else if (data instanceof mongo.UUID) { @@ -133,16 +141,18 @@ function filterJsonData(data: any, depth = 0): any { } else if (data instanceof RegExp) { return { pattern: data.source, options: data.flags }; } else if (Array.isArray(data)) { - return data.map((element) => filterJsonData(element, depth + 1)); + return data.map((element) => filterJsonData(element, context, depth + 1)); } else if (ArrayBuffer.isView(data)) { return undefined; + } else if (data instanceof CustomSqliteValue) { + return data.toSqliteValue(context); } else if (data instanceof JsonContainer) { // Can be stringified directly when using our JSONBig implementation return data; } else if (typeof data == 'object') { let record: Record = {}; for (let key of Object.keys(data)) { - record[key] = filterJsonData(data[key], depth + 1); + record[key] = filterJsonData(data[key], context, depth + 1); } return record; } else { diff --git a/modules/module-mongodb/test/src/mongo_test.test.ts b/modules/module-mongodb/test/src/mongo_test.test.ts index 864feef66..5da05b7cb 100644 --- a/modules/module-mongodb/test/src/mongo_test.test.ts +++ b/modules/module-mongodb/test/src/mongo_test.test.ts @@ -1,5 +1,11 @@ import { mongo } from '@powersync/lib-service-mongodb'; -import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules'; +import { + applyRowContext, + CompatibilityContext, + CompatibilityEdition, + SqliteInputRow, + SqlSyncRules +} from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js'; @@ -138,8 +144,10 @@ describe('mongo data types', () => { ]); } - function checkResults(transformed: Record[]) { - expect(transformed[0]).toMatchObject({ + function checkResults(transformed: SqliteInputRow[]) { + const sqliteValue = transformed.map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)); + + expect(sqliteValue[0]).toMatchObject({ _id: 1n, text: 'text', uuid: 'baeb2514-4c57-436d-b3cc-c1256211656d', @@ -152,17 +160,17 @@ describe('mongo data types', () => { null: null, decimal: '3.14' }); - expect(transformed[1]).toMatchObject({ + expect(sqliteValue[1]).toMatchObject({ _id: 2n, nested: '{"test":"thing"}' }); - expect(transformed[2]).toMatchObject({ + expect(sqliteValue[2]).toMatchObject({ _id: 3n, date: '2023-03-06 13:47:00.000Z' }); - expect(transformed[3]).toMatchObject({ + expect(sqliteValue[3]).toMatchObject({ _id: 4n, objectId: '66e834cc91d805df11fa0ecb', timestamp: 1958505087099n, @@ -177,9 +185,9 @@ describe('mongo data types', () => { }); // This must specifically be null, and not undefined. - expect(transformed[4].undefined).toBeNull(); + expect(sqliteValue[4].undefined).toBeNull(); - expect(transformed[5]).toMatchObject({ + expect(sqliteValue[5]).toMatchObject({ _id: 6n, int4: -1n, int8: -9007199254740993n, @@ -188,8 +196,10 @@ describe('mongo data types', () => { }); } - function checkResultsNested(transformed: Record[]) { - expect(transformed[0]).toMatchObject({ + function checkResultsNested(transformed: SqliteInputRow[]) { + const sqliteValue = transformed.map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)); + + expect(sqliteValue[0]).toMatchObject({ _id: 1n, text: `["text"]`, uuid: '["baeb2514-4c57-436d-b3cc-c1256211656d"]', @@ -204,22 +214,22 @@ describe('mongo data types', () => { // Note: Depending on to what extent we use the original postgres value, the whitespace may change, and order may change. // We do expect that decimals and big numbers are preserved. - expect(transformed[1]).toMatchObject({ + expect(sqliteValue[1]).toMatchObject({ _id: 2n, nested: '[{"test":"thing"}]' }); - expect(transformed[2]).toMatchObject({ + expect(sqliteValue[2]).toMatchObject({ _id: 3n, date: '["2023-03-06 13:47:00.000Z"]' }); - expect(transformed[3]).toMatchObject({ + expect(sqliteValue[3]).toMatchObject({ _id: 5n, undefined: '[null]' }); - expect(transformed[4]).toMatchObject({ + expect(sqliteValue[4]).toMatchObject({ _id: 6n, int4: '[-1]', int8: '[-9007199254740993]', @@ -227,7 +237,7 @@ describe('mongo data types', () => { decimal: '["-3.14"]' }); - expect(transformed[5]).toMatchObject({ + expect(sqliteValue[5]).toMatchObject({ _id: 10n, objectId: '["66e834cc91d805df11fa0ecb"]', timestamp: '[1958505087099]', @@ -522,13 +532,45 @@ bucket_definitions: errors: [] }); }); + + test('date format', async () => { + const { db, client } = await connectMongoData(); + const collection = db.collection('test_data'); + try { + await setupTable(db); + await collection.insertOne({ + fraction: new Date('2023-03-06 15:47:01.123+02'), + noFraction: new Date('2023-03-06 15:47:01+02') + }); + + const rawResults = await db + .collection('test_data') + .find({}, { sort: { _id: 1 } }) + .toArray(); + const [row] = [...ChangeStream.getQueryData(rawResults)]; + + const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormat).toMatchObject({ + fraction: '2023-03-06 13:47:01.123Z', + noFraction: '2023-03-06 13:47:01.000Z' + }); + + const newFormat = applyRowContext(row, new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS)); + expect(newFormat).toMatchObject({ + fraction: '2023-03-06T13:47:01.123Z', + noFraction: '2023-03-06T13:47:01.000Z' + }); + } finally { + await client.close(); + } + }); }); /** * Return all the inserts from the first transaction in the replication stream. */ async function getReplicationTx(replicationStream: mongo.ChangeStream, count: number) { - let transformed: SqliteRow[] = []; + let transformed: SqliteInputRow[] = []; for await (const doc of replicationStream) { // Specifically filter out map_input / map_output collections if (!(doc as any)?.ns?.coll?.startsWith('test_data')) { diff --git a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts index 19f1cde60..ef0b7642c 100644 --- a/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts +++ b/modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts @@ -102,7 +102,10 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI { */ return fields.map((c) => { const value = row[c.name]; - const sqlValue = sync_rules.toSyncRulesValue(value); + const sqlValue = sync_rules.applyValueContext( + sync_rules.toSyncRulesValue(value), + sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY + ); if (typeof sqlValue == 'bigint') { return Number(value); } else if (value instanceof Date) { diff --git a/modules/module-mysql/src/common/mysql-to-sqlite.ts b/modules/module-mysql/src/common/mysql-to-sqlite.ts index 46a4b0581..e60e928c4 100644 --- a/modules/module-mysql/src/common/mysql-to-sqlite.ts +++ b/modules/module-mysql/src/common/mysql-to-sqlite.ts @@ -103,7 +103,10 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu }; } -export function toSQLiteRow(row: Record, columns: Map): sync_rules.SqliteRow { +export function toSQLiteRow( + row: Record, + columns: Map +): sync_rules.SqliteInputRow { let result: sync_rules.DatabaseInputRow = {}; for (let key in row) { // We are very much expecting the column to be there diff --git a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts index 97c5db93f..31d63342c 100644 --- a/modules/module-mysql/test/src/mysql-to-sqlite.test.ts +++ b/modules/module-mysql/test/src/mysql-to-sqlite.test.ts @@ -1,4 +1,4 @@ -import { SqliteRow } from '@powersync/service-sync-rules'; +import { SqliteInputRow, SqliteRow } from '@powersync/service-sync-rules'; import { afterAll, describe, expect, test } from 'vitest'; import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; import { eventIsWriteMutation, eventIsXid } from '@module/replication/zongji/zongji-utils.js'; @@ -298,7 +298,7 @@ INSERT INTO test_data ( }); }); -async function getDatabaseRows(connection: MySQLConnectionManager, tableName: string): Promise { +async function getDatabaseRows(connection: MySQLConnectionManager, tableName: string): Promise { const [results, fields] = await connection.query(`SELECT * FROM ${tableName}`); const columns = toColumnDescriptors(fields); return results.map((row) => common.toSQLiteRow(row, columns)); @@ -307,15 +307,15 @@ async function getDatabaseRows(connection: MySQLConnectionManager, tableName: st /** * Return all the inserts from the first transaction in the binlog stream. */ -async function getReplicatedRows(expectedTransactionsCount?: number): Promise { - let transformed: SqliteRow[] = []; +async function getReplicatedRows(expectedTransactionsCount?: number): Promise { + let transformed: SqliteInputRow[] = []; const zongji = new ZongJi({ host: TEST_CONNECTION_OPTIONS.hostname, user: TEST_CONNECTION_OPTIONS.username, password: TEST_CONNECTION_OPTIONS.password }); - const completionPromise = new Promise((resolve, reject) => { + const completionPromise = new Promise((resolve, reject) => { zongji.on('binlog', (evt: BinLogEvent) => { try { if (eventIsWriteMutation(evt)) { diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index fc9324686..e35f2e4f2 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -105,7 +105,10 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI { columns: result.columns.map((c) => c.name), rows: result.rows.map((row) => { return row.map((value) => { - const sqlValue = sync_rules.toSyncRulesValue(value); + const sqlValue = sync_rules.applyValueContext( + sync_rules.toSyncRulesValue(value), + sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY + ); if (typeof sqlValue == 'bigint') { return Number(value); } else if (sync_rules.isJsonValue(sqlValue)) { diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 230906ec2..ad088f296 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -20,7 +20,15 @@ import { storage } from '@powersync/service-core'; import * as pgwire from '@powersync/service-jpgwire'; -import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules'; +import { + applyValueContext, + CompatibilityContext, + DatabaseInputRow, + SqliteInputRow, + SqlSyncRules, + TablePattern, + toSyncRulesRow +} from '@powersync/service-sync-rules'; import * as pg_utils from '../utils/pgwire_utils.js'; import { PgManager } from './PgManager.js'; @@ -500,7 +508,7 @@ WHERE oid = $1::regclass`, await sendKeepAlive(db); } - static *getQueryData(results: Iterable): Generator { + static *getQueryData(results: Iterable): Generator { for (let row of results) { yield toSyncRulesRow(row); } @@ -885,7 +893,8 @@ WHERE oid = $1::regclass`, // The key should always be present in the "after" record. return; } - key[name] = value; + // We just need a consistent representation of the primary key, and don't care about fixed quirks. + key[name] = applyValueContext(value, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); } resnapshot.push({ table: record.sourceTable, diff --git a/modules/module-postgres/src/utils/pgwire_utils.ts b/modules/module-postgres/src/utils/pgwire_utils.ts index 3e3a76721..40c60b216 100644 --- a/modules/module-postgres/src/utils/pgwire_utils.ts +++ b/modules/module-postgres/src/utils/pgwire_utils.ts @@ -1,13 +1,13 @@ // Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218 import * as pgwire from '@powersync/service-jpgwire'; -import { DatabaseInputRow, SqliteRow, toSyncRulesRow } from '@powersync/service-sync-rules'; +import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules'; /** * pgwire message -> SQLite row. * @param message */ -export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.PgoutputUpdate): SqliteRow { +export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.PgoutputUpdate): SqliteInputRow { const rawData = (message as any).afterRaw; const record = decodeTuple(message.relation, rawData); @@ -18,7 +18,9 @@ export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.Pgo * pgwire message -> SQLite row. * @param message */ -export function constructBeforeRecord(message: pgwire.PgoutputDelete | pgwire.PgoutputUpdate): SqliteRow | undefined { +export function constructBeforeRecord( + message: pgwire.PgoutputDelete | pgwire.PgoutputUpdate +): SqliteInputRow | undefined { const rawData = (message as any).beforeRaw; if (rawData == null) { return undefined; diff --git a/modules/module-postgres/test/src/pg_test.test.ts b/modules/module-postgres/test/src/pg_test.test.ts index 866adb3de..116d95cbc 100644 --- a/modules/module-postgres/test/src/pg_test.test.ts +++ b/modules/module-postgres/test/src/pg_test.test.ts @@ -1,6 +1,13 @@ import { constructAfterRecord } from '@module/utils/pgwire_utils.js'; import * as pgwire from '@powersync/service-jpgwire'; -import { SqliteRow } from '@powersync/service-sync-rules'; +import { + applyRowContext, + CompatibilityContext, + SqliteInputRow, + DateTimeValue, + TimeValue, + CompatibilityEdition +} from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js'; import { WalStream } from '@module/replication/WalStream.js'; @@ -158,9 +165,9 @@ VALUES(10, ARRAY['null']::TEXT[]); expect(transformed[2]).toMatchObject({ id: 3n, date: '2023-03-06', - time: '15:47:00', - timestamp: '2023-03-06 15:47:00', - timestamptz: '2023-03-06 13:47:00Z' + time: new TimeValue('15:47:00'), + timestamp: new DateTimeValue('2023-03-06T15:47:00.000000', '2023-03-06 15:47:00'), + timestamptz: new DateTimeValue('2023-03-06T13:47:00.000000Z', '2023-03-06 13:47:00Z') }); expect(transformed[3]).toMatchObject({ @@ -175,26 +182,26 @@ VALUES(10, ARRAY['null']::TEXT[]); expect(transformed[4]).toMatchObject({ id: 5n, date: '0000-01-01', - time: '00:00:00', - timestamp: '0000-01-01 00:00:00', - timestamptz: '0000-01-01 00:00:00Z' + time: new TimeValue('00:00:00'), + timestamp: new DateTimeValue('0000-01-01T00:00:00'), + timestamptz: new DateTimeValue('0000-01-01T00:00:00Z') }); expect(transformed[5]).toMatchObject({ id: 6n, - timestamp: '1970-01-01 00:00:00', - timestamptz: '1970-01-01 00:00:00Z' + timestamp: new DateTimeValue('1970-01-01T00:00:00.000000', '1970-01-01 00:00:00'), + timestamptz: new DateTimeValue('1970-01-01T00:00:00.000000Z', '1970-01-01 00:00:00Z') }); expect(transformed[6]).toMatchObject({ id: 7n, - timestamp: '9999-12-31 23:59:59', - timestamptz: '9999-12-31 23:59:59Z' + timestamp: new DateTimeValue('9999-12-31T23:59:59'), + timestamptz: new DateTimeValue('9999-12-31T23:59:59Z') }); expect(transformed[7]).toMatchObject({ id: 8n, - timestamptz: '0022-02-03 09:13:14Z' + timestamptz: new DateTimeValue('0022-02-03T09:13:14.000000Z', '0022-02-03 09:13:14Z') }); expect(transformed[8]).toMatchObject({ @@ -235,8 +242,8 @@ VALUES(10, ARRAY['null']::TEXT[]); id: 3n, date: `["2023-03-06"]`, time: `["15:47:00"]`, - timestamp: `["2023-03-06 15:47:00"]`, - timestamptz: `["2023-03-06 13:47:00Z","2023-03-06 13:47:00.12345Z"]` + timestamp: '["2023-03-06 15:47:00"]', + timestamptz: '["2023-03-06 13:47:00Z","2023-03-06 13:47:00.12345Z"]' }); expect(transformed[3]).toMatchObject({ @@ -339,7 +346,7 @@ VALUES(10, ARRAY['null']::TEXT[]); const transformed = [ ...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`))) - ]; + ].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)); checkResultArrays(transformed); } finally { @@ -415,7 +422,7 @@ VALUES(10, ARRAY['null']::TEXT[]); const transformed = await getReplicationTx(replicationStream); await pg.end(); - checkResultArrays(transformed); + checkResultArrays(transformed.map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY))); } finally { await db.end(); } @@ -430,13 +437,46 @@ VALUES(10, ARRAY['null']::TEXT[]); // const schema = await api.getConnectionsSchema(db); // expect(schema).toMatchSnapshot(); }); + + test('date formats', async () => { + const db = await connectPgWire(); + try { + await setupTable(db); + + await db.query(` +INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12', '2023-03-06 15:47:12.4', '2023-03-06 15:47+02'); +`); + + const [row] = [ + ...WalStream.getQueryData( + pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`)) + ) + ]; + + const oldFormat = applyRowContext(row, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); + expect(oldFormat).toMatchObject({ + time: '17:42:01.12', + timestamp: '2023-03-06 15:47:12.4', + timestamptz: '2023-03-06 13:47:00Z' + }); + + const newFormat = applyRowContext(row, new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS)); + expect(newFormat).toMatchObject({ + time: '17:42:01.120000', + timestamp: '2023-03-06T15:47:12.400000', + timestamptz: '2023-03-06T13:47:00.000000Z' + }); + } finally { + await db.end(); + } + }); }); /** * Return all the inserts from the first transaction in the replication stream. */ async function getReplicationTx(replicationStream: pgwire.ReplicationStream) { - let transformed: SqliteRow[] = []; + let transformed: SqliteInputRow[] = []; for await (const batch of replicationStream.pgoutputDecode()) { for (const msg of batch.messages) { if (msg.tag == 'insert') { diff --git a/packages/jpgwire/package.json b/packages/jpgwire/package.json index 0619dc9d9..762333c68 100644 --- a/packages/jpgwire/package.json +++ b/packages/jpgwire/package.json @@ -19,6 +19,7 @@ }, "dependencies": { "@powersync/service-jsonbig": "workspace:^", + "@powersync/service-sync-rules": "workspace:^", "date-fns": "^4.1.0", "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87" } diff --git a/packages/jpgwire/src/pgwire_types.ts b/packages/jpgwire/src/pgwire_types.ts index dbb924595..d71a14d34 100644 --- a/packages/jpgwire/src/pgwire_types.ts +++ b/packages/jpgwire/src/pgwire_types.ts @@ -1,6 +1,7 @@ // Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218 import { JsonContainer } from '@powersync/service-jsonbig'; +import { TimeValue, type DatabaseInputValue } from '@powersync/service-sync-rules'; import { dateToSqlite, lsnMakeComparable, timestampToSqlite, timestamptzToSqlite } from './util.js'; export enum PgTypeOid { @@ -18,6 +19,7 @@ export enum PgTypeOid { DATE = 1082, TIMESTAMP = 1114, TIMESTAMPTZ = 1184, + TIME = 1083, JSON = 114, JSONB = 3802, PG_LSN = 3220 @@ -105,7 +107,7 @@ export class PgType { return ELEM_OID_TO_ARRAY.get(typeOid); } - static decode(text: string, typeOid: number) { + static decode(text: string, typeOid: number): DatabaseInputValue { switch (typeOid) { // add line here when register new type case PgTypeOid.TEXT: @@ -130,6 +132,8 @@ export class PgType { return timestampToSqlite(text); case PgTypeOid.TIMESTAMPTZ: return timestamptzToSqlite(text); + case PgTypeOid.TIME: + return TimeValue.parse(text); case PgTypeOid.JSON: case PgTypeOid.JSONB: // Don't parse the contents diff --git a/packages/jpgwire/src/util.ts b/packages/jpgwire/src/util.ts index 625cc5a60..e11844b2b 100644 --- a/packages/jpgwire/src/util.ts +++ b/packages/jpgwire/src/util.ts @@ -5,6 +5,7 @@ import { DEFAULT_CERTS } from './certs.js'; import * as pgwire from './pgwire.js'; import { PgType } from './pgwire_types.js'; import { ConnectOptions } from './socket_adapter.js'; +import { DatabaseInputValue, DateTimeValue } from '@powersync/service-sync-rules'; // TODO this is duplicated, but maybe that is ok export interface NormalizedConnectionConfig { @@ -132,7 +133,7 @@ export async function connectPgWire( return connection; } -function _recvDataRow(this: any, _message: any, row: Uint8Array[], batch: any) { +function _recvDataRow(this: any, _message: any, row: (Uint8Array | DatabaseInputValue)[], batch: any) { for (let i = 0; i < this._rowColumns.length; i++) { const valbuf = row[i]; if (valbuf == null) { @@ -221,6 +222,8 @@ export function lsnMakeComparable(text: string) { return h.padStart(8, '0') + '/' + l.padStart(8, '0'); } +const timeRegex = /^([\d\-]+) ([\d:]+)(\.\d+)?([+-][\d:]+)?$/; + /** * Convert a postgres timestamptz to a SQLite-compatible/normalized timestamp. * @@ -232,17 +235,17 @@ export function lsnMakeComparable(text: string) { * * We have specific exceptions for -infinity and infinity. */ -export function timestamptzToSqlite(source?: string) { +export function timestamptzToSqlite(source?: string): DateTimeValue | null { if (source == null) { return null; } // Make compatible with SQLite - const match = /^([\d\-]+) ([\d:]+)(\.\d+)?([+-][\d:]+)$/.exec(source); + const match = timeRegex.exec(source); if (match == null) { if (source == 'infinity') { - return '9999-12-31 23:59:59Z'; + return new DateTimeValue('9999-12-31T23:59:59Z'); } else if (source == '-infinity') { - return '0000-01-01 00:00:00Z'; + return new DateTimeValue('0000-01-01T00:00:00Z'); } else { return null; } @@ -256,9 +259,16 @@ export function timestamptzToSqlite(source?: string) { if (isNaN(parsed.getTime())) { return null; } - const text = parsed.toISOString().replace('T', ' ').replace('.000', '').replace('Z', ''); - return `${text}${precision ?? ''}Z`; + const baseValue = parsed.toISOString().replace('.000', '').replace('Z', ''); + + // In the new format, we always use ISO 8601. Since Postgres drops zeroes from the fractional seconds, we also pad + // that back to the highest theoretical precision (microseconds). This ensures that sorting returned values as text + // returns them in order of the time value they represent. + // + // In the old format, we keep the sub-second precision only if it's not `.000`. + const missingPrecision = precision?.padEnd(7, '0') ?? '.000000'; + return new DateTimeValue(`${baseValue}${missingPrecision}Z`, `${baseValue.replace('T', ' ')}${precision ?? ''}Z`); } /** @@ -268,17 +278,26 @@ export function timestamptzToSqlite(source?: string) { * * https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-SPECIAL-VALUES */ -export function timestampToSqlite(source?: string) { +export function timestampToSqlite(source?: string): DateTimeValue | null { if (source == null) { return null; } - if (source == 'infinity') { - return '9999-12-31 23:59:59'; - } else if (source == '-infinity') { - return '0000-01-01 00:00:00'; - } else { - return source; + + const match = timeRegex.exec(source); + if (match == null) { + if (source == 'infinity') { + return new DateTimeValue('9999-12-31T23:59:59'); + } else if (source == '-infinity') { + return new DateTimeValue('0000-01-01T00:00:00'); + } else { + return null; + } } + + const [_, date, time, precision, __] = match as any; + const missingPrecision = precision?.padEnd(7, '0') ?? '.000000'; + + return new DateTimeValue(`${date}T${time}${missingPrecision}`, source); } /** * For date, we keep it mostly as-is. diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index 62db7dd43..614f28406 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -1,5 +1,11 @@ import { ObserverClient } from '@powersync/lib-services-framework'; -import { EvaluatedParameters, EvaluatedRow, SqliteRow, ToastableSqliteRow } from '@powersync/service-sync-rules'; +import { + EvaluatedParameters, + EvaluatedRow, + SqliteInputRow, + SqliteRow, + ToastableSqliteRow +} from '@powersync/service-sync-rules'; import { BSON } from 'bson'; import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { SourceTable, TableSnapshotStatus } from './SourceTable.js'; @@ -132,7 +138,7 @@ export interface SaveInsert { sourceTable: SourceTable; before?: undefined; beforeReplicaId?: undefined; - after: SqliteRow; + after: SqliteInputRow; afterReplicaId: ReplicaId; } @@ -143,7 +149,7 @@ export interface SaveUpdate { /** * This is only present when the id has changed, and will only contain replica identity columns. */ - before?: SqliteRow; + before?: SqliteInputRow; beforeReplicaId?: ReplicaId; /** @@ -158,7 +164,7 @@ export interface SaveUpdate { export interface SaveDelete { tag: SaveOperationTag.DELETE; sourceTable: SourceTable; - before?: SqliteRow; + before?: SqliteInputRow; beforeReplicaId: ReplicaId; after?: undefined; afterReplicaId?: undefined; diff --git a/packages/service-core/src/storage/ReplicationEventPayload.ts b/packages/service-core/src/storage/ReplicationEventPayload.ts index bd9ee01f8..d86ea50ef 100644 --- a/packages/service-core/src/storage/ReplicationEventPayload.ts +++ b/packages/service-core/src/storage/ReplicationEventPayload.ts @@ -4,8 +4,8 @@ import { BucketStorageBatch, SaveOp } from './BucketStorageBatch.js'; export type EventData = { op: SaveOp; - before?: sync_rules.SqliteRow; - after?: sync_rules.SqliteRow; + before?: sync_rules.SqliteInputRow; + after?: sync_rules.SqliteInputRow; }; export type ReplicationEventPayload = { diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index abe6316df..a7275fb84 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -148,7 +148,10 @@ export function hasToastedValues(row: sync_rules.ToastableSqliteRow) { * * If we don't store data, we assume we always have a complete row. */ -export function isCompleteRow(storeData: boolean, row: sync_rules.ToastableSqliteRow): row is sync_rules.SqliteRow { +export function isCompleteRow( + storeData: boolean, + row: sync_rules.ToastableSqliteRow +): row is sync_rules.SqliteInputRow { if (!storeData) { // Assume the row is complete - no need to check return true; diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index 717c017d2..ff35cbcb9 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -615,6 +615,9 @@ streams: stream: auto_subscribe: ${source} query: SELECT * FROM assets WHERE id IN ifnull(subscription.parameter('ids'), '["default"]'); + +config: + edition: 2 `; } diff --git a/packages/sync-rules/src/BucketSource.ts b/packages/sync-rules/src/BucketSource.ts index 6131d78d7..deab23373 100644 --- a/packages/sync-rules/src/BucketSource.ts +++ b/packages/sync-rules/src/BucketSource.ts @@ -3,7 +3,13 @@ import { ColumnDefinition } from './ExpressionType.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { GetQuerierOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; -import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js'; +import { + EvaluatedParametersResult, + EvaluateRowOptions, + EvaluationResult, + SourceSchema, + SqliteInputRow +} from './types.js'; /** * An interface declaring @@ -28,7 +34,7 @@ export interface BucketSource { * The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage * system to find buckets. */ - evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[]; + evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteInputRow): EvaluatedParametersResult[]; /** * Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed diff --git a/packages/sync-rules/src/ExpressionType.ts b/packages/sync-rules/src/ExpressionType.ts index ad46c4408..3e625c717 100644 --- a/packages/sync-rules/src/ExpressionType.ts +++ b/packages/sync-rules/src/ExpressionType.ts @@ -6,6 +6,8 @@ export const TYPE_REAL = 8; export type SqliteType = 'null' | 'blob' | 'text' | 'integer' | 'real' | 'numeric'; +export type SqliteValueType = 'null' | 'blob' | 'text' | 'integer' | 'real'; + export interface ColumnDefinition { name: string; type: ExpressionType; diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index bc3e5987a..beee77a80 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -11,6 +11,7 @@ import { StaticSqlParameterQuery } from './StaticSqlParameterQuery.js'; import { TablePattern } from './TablePattern.js'; import { TableValuedFunctionSqlParameterQuery } from './TableValuedFunctionSqlParameterQuery.js'; import { SqlRuleError } from './errors.js'; +import { CompatibilityContext } from './compatibility.js'; import { EvaluatedParametersResult, EvaluateRowOptions, @@ -20,6 +21,7 @@ import { SourceSchema, SqliteRow } from './types.js'; +import { applyRowContext } from './utils.js'; export interface QueryParseResult { /** @@ -34,7 +36,10 @@ export class SqlBucketDescriptor implements BucketSource { name: string; bucketParameters?: string[]; - constructor(name: string) { + constructor( + name: string, + private readonly compatibility: CompatibilityContext + ) { this.name = name; } @@ -99,7 +104,7 @@ export class SqlBucketDescriptor implements BucketSource { continue; } - results.push(...query.evaluateRow(options.sourceTable, options.record)); + results.push(...query.evaluateRow(options.sourceTable, applyRowContext(options.record, this.compatibility))); } return results; } diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index 2aea4a34a..bca6330ca 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -20,14 +20,14 @@ import { QueryParseOptions, RequestParameters, SourceSchema, + SqliteInputRow, SqliteJsonRow, - SqliteRow, StreamParseOptions, SyncRules } from './types.js'; import { BucketSource } from './BucketSource.js'; -import { SyncStream } from './streams/stream.js'; import { syncStreamFromSql } from './streams/from_sql.js'; +import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js'; const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES'); @@ -138,6 +138,27 @@ export class SqlSyncRules implements SyncRules { return rules; } + const declaredOptions = parsed.get('config') as YAMLMap | null; + let compatibility = CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY; + if (declaredOptions != null) { + const edition = (declaredOptions.get('edition') ?? CompatibilityEdition.LEGACY) as CompatibilityEdition; + const options = new Map(); + + for (const entry of declaredOptions.items) { + const { + key: { value: key }, + value: { value } + } = entry as { key: Scalar; value: Scalar }; + + const option = CompatibilityOption.byName[key]; + if (option) { + options.set(option, value); + } + } + + compatibility = new CompatibilityContext(edition, options); + } + // Bucket definitions using explicit parameter and data queries. const bucketMap = parsed.get('bucket_definitions') as YAMLMap; const streamMap = parsed.get('streams') as YAMLMap | null; @@ -180,12 +201,13 @@ export class SqlSyncRules implements SyncRules { const queryOptions: QueryParseOptions = { ...options, accept_potentially_dangerous_queries, - priority: parseOptionPriority + priority: parseOptionPriority, + compatibility }; const parameters = value.get('parameters', true) as unknown; const dataQueries = value.get('data', true) as unknown; - const descriptor = new SqlBucketDescriptor(key); + const descriptor = new SqlBucketDescriptor(key, compatibility); if (parameters instanceof Scalar) { rules.withScalar(parameters, (q) => { @@ -227,7 +249,8 @@ export class SqlSyncRules implements SyncRules { ...options, accept_potentially_dangerous_queries, priority: rules.parsePriority(value), - auto_subscribe: value.get('auto_subscribe', true)?.value == true + auto_subscribe: value.get('auto_subscribe', true)?.value == true, + compatibility }; const data = value.get('query', true) as unknown; @@ -261,7 +284,7 @@ export class SqlSyncRules implements SyncRules { continue; } - const eventDescriptor = new SqlEventDescriptor(key.toString()); + const eventDescriptor = new SqlEventDescriptor(key.toString(), compatibility); for (let item of payloads.items) { if (!isScalar(item)) { rules.errors.push(new YamlError(new Error(`Payload queries for events must be scalar.`))); @@ -375,7 +398,7 @@ export class SqlSyncRules implements SyncRules { /** * Throws errors. */ - evaluateParameterRow(table: SourceTableInterface, row: SqliteRow): EvaluatedParameters[] { + evaluateParameterRow(table: SourceTableInterface, row: SqliteInputRow): EvaluatedParameters[] { const { results, errors } = this.evaluateParameterRowWithErrors(table, row); if (errors.length > 0) { throw new Error(errors[0].error); @@ -385,7 +408,7 @@ export class SqlSyncRules implements SyncRules { evaluateParameterRowWithErrors( table: SourceTableInterface, - row: SqliteRow + row: SqliteInputRow ): { results: EvaluatedParameters[]; errors: EvaluationError[] } { let rawResults: EvaluatedParametersResult[] = []; for (let source of this.bucketSources) { diff --git a/packages/sync-rules/src/compatibility.ts b/packages/sync-rules/src/compatibility.ts new file mode 100644 index 000000000..89459f05b --- /dev/null +++ b/packages/sync-rules/src/compatibility.ts @@ -0,0 +1,57 @@ +export enum CompatibilityEdition { + LEGACY = 1, + SYNC_STREAMS = 2 +} + +/** + * A historical issue of the PowerSync service that can only be changed in a backwards-incompatible manner. + * + * To avoid breaking existing users, fixes to those quirks are opt-in: Users either have to use `fixed_quirks` list when + * defining sync rules or use a new feature such as sync streams where these issues are fixed by default. + */ +export class CompatibilityOption { + private constructor( + readonly name: string, + readonly description: string, + readonly fixedIn: CompatibilityEdition + ) {} + + static timestampsIso8601 = new CompatibilityOption( + 'timestamps_iso8601', + 'Consistently renders timestamps with an ISO 8601-compatible format (previous versions used a space instead of a T to separate date and time).', + CompatibilityEdition.SYNC_STREAMS + ); + + static byName: Record = Object.freeze({ + timestamps_iso8601: this.timestampsIso8601 + }); +} + +export class CompatibilityContext { + /** + * The general compatibility level we're operating under. + * + * This is {@link CompatibilityEdition.LEGACY} by default, but can be changed when defining sync rules to allow newer + * features. + */ + readonly edition: CompatibilityEdition; + + /** + * Overrides to customize used compatibility options to deviate from defaults at the given {@link edition}. + */ + readonly overrides: Map; + + constructor(edition: CompatibilityEdition, overrides?: Map) { + this.edition = edition; + this.overrides = overrides ?? new Map(); + } + + isEnabled(option: CompatibilityOption) { + return this.overrides.get(option) ?? option.fixedIn <= this.edition; + } + + /** + * A {@link CompatibilityContext} in which no fixes are applied. + */ + static FULL_BACKWARDS_COMPATIBILITY: CompatibilityContext = new CompatibilityContext(CompatibilityEdition.LEGACY); +} diff --git a/packages/sync-rules/src/events/SqlEventDescriptor.ts b/packages/sync-rules/src/events/SqlEventDescriptor.ts index ff09ab256..827c92d66 100644 --- a/packages/sync-rules/src/events/SqlEventDescriptor.ts +++ b/packages/sync-rules/src/events/SqlEventDescriptor.ts @@ -1,9 +1,11 @@ import { SqlRuleError } from '../errors.js'; +import { CompatibilityContext } from '../compatibility.js'; import { SourceTableInterface } from '../SourceTableInterface.js'; import { QueryParseResult } from '../SqlBucketDescriptor.js'; import { SyncRulesOptions } from '../SqlSyncRules.js'; import { TablePattern } from '../TablePattern.js'; import { EvaluateRowOptions } from '../types.js'; +import { applyRowContext } from '../utils.js'; import { EvaluatedEventRowWithErrors, SqlEventSourceQuery } from './SqlEventSourceQuery.js'; /** @@ -13,7 +15,10 @@ export class SqlEventDescriptor { name: string; sourceQueries: SqlEventSourceQuery[] = []; - constructor(name: string) { + constructor( + name: string, + private readonly compatibility: CompatibilityContext + ) { this.name = name; } @@ -46,7 +51,10 @@ export class SqlEventDescriptor { }; } - return matchingQuery.evaluateRowWithErrors(options.sourceTable, options.record); + return matchingQuery.evaluateRowWithErrors( + options.sourceTable, + applyRowContext(options.record, this.compatibility) + ); } getSourceTables(): Set { diff --git a/packages/sync-rules/src/index.ts b/packages/sync-rules/src/index.ts index 18734ff1e..f3cf5af05 100644 --- a/packages/sync-rules/src/index.ts +++ b/packages/sync-rules/src/index.ts @@ -1,6 +1,7 @@ export * from './BucketDescription.js'; export * from './BucketParameterQuerier.js'; export * from './BucketSource.js'; +export * from './compatibility.js'; export * from './errors.js'; export * from './events/SqlEventDescriptor.js'; export * from './events/SqlEventSourceQuery.js'; @@ -21,4 +22,6 @@ export { SyncStream } from './streams/stream.js'; export { syncStreamFromSql } from './streams/from_sql.js'; export * from './TablePattern.js'; export * from './types.js'; +export * from './types/custom_sqlite_value.js'; +export * from './types/time.js'; export * from './utils.js'; diff --git a/packages/sync-rules/src/json_schema.ts b/packages/sync-rules/src/json_schema.ts index 982a82462..930ea7123 100644 --- a/packages/sync-rules/src/json_schema.ts +++ b/packages/sync-rules/src/json_schema.ts @@ -1,4 +1,5 @@ import ajvModule from 'ajv'; +import { CompatibilityEdition, CompatibilityOption } from './compatibility.js'; // Hack to make this work both in NodeJS and a browser const Ajv = ajvModule.default ?? ajvModule; const ajv = new Ajv({ allErrors: true, verbose: true }); @@ -107,6 +108,30 @@ export const syncRulesSchema: ajvModule.Schema = { } } } + }, + config: { + type: 'object', + description: 'Config declaring the compatibility level used to parse these definitions.', + properties: { + edition: { + type: 'integer', + default: CompatibilityEdition.LEGACY, + minimum: CompatibilityEdition.LEGACY, + exclusiveMaximum: CompatibilityEdition.SYNC_STREAMS + 1 + }, + ...Object.fromEntries( + Object.entries(CompatibilityOption.byName).map((e) => { + return [ + e[0], + { + type: 'boolean', + description: `Enabled by default starting from edition ${e[1].fixedIn}: ${e[1].description}` + } + ]; + }) + ) + }, + additionalProperties: false } }, anyOf: [{ required: ['bucket_definitions'] }, { required: ['streams'] }], diff --git a/packages/sync-rules/src/sql_functions.ts b/packages/sync-rules/src/sql_functions.ts index 962e31116..34bef6e0f 100644 --- a/packages/sync-rules/src/sql_functions.ts +++ b/packages/sync-rules/src/sql_functions.ts @@ -1,13 +1,14 @@ import { JSONBig } from '@powersync/service-jsonbig'; import { SQLITE_FALSE, SQLITE_TRUE, sqliteBool, sqliteNot } from './sql_support.js'; -import { SqliteRow, SqliteValue } from './types.js'; +import { SqliteInputValue, SqliteValue } from './types.js'; import { jsonValueToSqlite } from './utils.js'; // Declares @syncpoint/wkx module // This allows for consumers of this lib to resolve types correctly /// import wkx from '@syncpoint/wkx'; -import { ExpressionType, SqliteType, TYPE_INTEGER } from './ExpressionType.js'; +import { ExpressionType, SqliteType, SqliteValueType, TYPE_INTEGER } from './ExpressionType.js'; import * as uuid from 'uuid'; +import { CustomSqliteValue } from './types/custom_sqlite_value.js'; export const BASIC_OPERATORS = new Set([ '=', @@ -635,7 +636,7 @@ export function cast(value: SqliteValue, to: string) { } } -export function sqliteTypeOf(arg: SqliteValue) { +export function sqliteTypeOf(arg: SqliteInputValue): SqliteValueType { if (arg == null) { return 'null'; } else if (typeof arg == 'string') { @@ -646,6 +647,8 @@ export function sqliteTypeOf(arg: SqliteValue) { return 'real'; } else if (arg instanceof Uint8Array) { return 'blob'; + } else if (arg instanceof CustomSqliteValue) { + return arg.sqliteType; } else { // Should not happen throw new Error(`Unknown type: ${arg}`); diff --git a/packages/sync-rules/src/streams/from_sql.ts b/packages/sync-rules/src/streams/from_sql.ts index bf9717f40..e8494a8a4 100644 --- a/packages/sync-rules/src/streams/from_sql.ts +++ b/packages/sync-rules/src/streams/from_sql.ts @@ -41,6 +41,7 @@ import { Statement } from 'pgsql-ast-parser'; import { STREAM_FUNCTIONS } from './functions.js'; +import { CompatibilityContext, CompatibilityEdition } from '../compatibility.js'; export function syncStreamFromSql( descriptorName: string, @@ -66,6 +67,13 @@ class SyncStreamCompiler { } compile(): SyncStream { + if (this.options.compatibility.edition < CompatibilityEdition.SYNC_STREAMS) { + throw new SqlRuleError( + 'Sync streams require edition 2 or later. Try adding a `config: {edition: 2} block to the end of the file.`', + this.sql + ); + } + const [stmt, ...illegalRest] = parse(this.sql, { locationTracking: true }); // TODO: Share more of this code with SqlDataQuery @@ -91,7 +99,8 @@ class SyncStreamCompiler { const stream = new SyncStream( this.descriptorName, - new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)) + new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)), + this.options.compatibility ); stream.subscribedToByDefault = this.options.auto_subscribe ?? false; if (filter.isValid(tools)) { diff --git a/packages/sync-rules/src/streams/parameter.ts b/packages/sync-rules/src/streams/parameter.ts index 73dc9f086..873216385 100644 --- a/packages/sync-rules/src/streams/parameter.ts +++ b/packages/sync-rules/src/streams/parameter.ts @@ -7,7 +7,8 @@ import { RequestParameters, SqliteJsonValue, SqliteRow, - SqliteValue + SqliteValue, + TableRow } from '../types.js'; /** @@ -31,7 +32,7 @@ export interface BucketParameter { * When a user connects, {@link StaticLookup.fromRequest} would return the user ID from the token. A matching bucket would * then contain the oplog data for assets with the matching `owner` column. */ - filterRow(options: EvaluateRowOptions): SqliteJsonValue[]; + filterRow(row: TableRow): SqliteJsonValue[]; } export interface SubqueryEvaluator { diff --git a/packages/sync-rules/src/streams/stream.ts b/packages/sync-rules/src/streams/stream.ts index 7d1cfdb05..359fe63db 100644 --- a/packages/sync-rules/src/streams/stream.ts +++ b/packages/sync-rules/src/streams/stream.ts @@ -3,6 +3,7 @@ import { BucketInclusionReason, BucketPriority, DEFAULT_BUCKET_PRIORITY } from ' import { BucketParameterQuerier, PendingQueriers } from '../BucketParameterQuerier.js'; import { BucketSource, BucketSourceType, ResultSetDescription } from '../BucketSource.js'; import { ColumnDefinition } from '../ExpressionType.js'; +import { CompatibilityContext } from '../compatibility.js'; import { SourceTableInterface } from '../SourceTableInterface.js'; import { GetQuerierOptions, RequestedStream } from '../SqlSyncRules.js'; import { TablePattern } from '../TablePattern.js'; @@ -12,8 +13,10 @@ import { EvaluationResult, RequestParameters, SourceSchema, - SqliteRow + SqliteRow, + TableRow } from '../types.js'; +import { applyRowContext } from '../utils.js'; import { StreamVariant } from './variant.js'; export class SyncStream implements BucketSource { @@ -23,7 +26,11 @@ export class SyncStream implements BucketSource { variants: StreamVariant[]; data: BaseSqlDataQuery; - constructor(name: string, data: BaseSqlDataQuery) { + constructor( + name: string, + data: BaseSqlDataQuery, + private readonly compatibility: CompatibilityContext + ) { this.name = name; this.subscribedToByDefault = false; this.priority = DEFAULT_BUCKET_PRIORITY; @@ -165,13 +172,19 @@ export class SyncStream implements BucketSource { } const stream = this; + const mappedRow = applyRowContext(options.record, this.compatibility); + const row: TableRow = { + sourceTable: options.sourceTable, + record: mappedRow + }; + return this.data.evaluateRowWithOptions({ table: options.sourceTable, - row: options.record, + row: applyRowContext(options.record, this.compatibility), bucketIds() { const bucketIds: string[] = []; for (const variant of stream.variants) { - bucketIds.push(...variant.bucketIdsForRow(stream.name, options)); + bucketIds.push(...variant.bucketIdsForRow(stream.name, row)); } return bucketIds; diff --git a/packages/sync-rules/src/streams/variant.ts b/packages/sync-rules/src/streams/variant.ts index b217de358..6b509aa1e 100644 --- a/packages/sync-rules/src/streams/variant.ts +++ b/packages/sync-rules/src/streams/variant.ts @@ -6,7 +6,8 @@ import { EvaluateRowOptions, RequestParameters, SqliteJsonValue, - SqliteRow + SqliteRow, + TableRow } from '../types.js'; import { isJsonValue, JSONBucketNameSerialize, normalizeParameterValue } from '../utils.js'; import { BucketParameter, SubqueryEvaluator } from './parameter.js'; @@ -45,7 +46,7 @@ export class StreamVariant { * * This is introduced for streams like `SELECT * FROM assets WHERE LENGTH(assets.name < 10)`. */ - additionalRowFilters: ((options: EvaluateRowOptions) => boolean)[]; + additionalRowFilters: ((row: TableRow) => boolean)[]; /** * Additional filters that are evaluated against the request of the stream subscription. @@ -66,7 +67,7 @@ export class StreamVariant { /** * Given a row in the table this stream selects from, returns all ids of buckets to which that row belongs to. */ - bucketIdsForRow(streamName: string, options: EvaluateRowOptions): string[] { + bucketIdsForRow(streamName: string, options: TableRow): string[] { return this.instantiationsForRow(options).map((values) => this.buildBucketId(streamName, values)); } @@ -75,7 +76,7 @@ export class StreamVariant { * * The inner arrays will have a length equal to the amount of parameters in this variant. */ - instantiationsForRow(options: EvaluateRowOptions): SqliteJsonValue[][] { + instantiationsForRow(options: TableRow): SqliteJsonValue[][] { for (const additional of this.additionalRowFilters) { if (!additional(options)) { return []; diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index 430ee2b9c..fc50df7b6 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -6,6 +6,8 @@ import { TablePattern } from './TablePattern.js'; import { toSyncRulesParameters } from './utils.js'; import { BucketPriority } from './BucketDescription.js'; import { ParameterLookup } from './BucketParameterQuerier.js'; +import { CustomSqliteValue } from './types/custom_sqlite_value.js'; +import { CompatibilityContext } from './compatibility.js'; export interface SyncRules { evaluateRow(options: EvaluateRowOptions): EvaluationResult[]; @@ -16,6 +18,7 @@ export interface SyncRules { export interface QueryParseOptions extends SyncRulesOptions { accept_potentially_dangerous_queries?: boolean; priority?: BucketPriority; + compatibility: CompatibilityContext; } export interface StreamParseOptions extends QueryParseOptions { @@ -144,12 +147,14 @@ export class RequestParameters implements ParameterValueSet { user_id: tokenPayload.sub }; - this.tokenParameters = toSyncRulesParameters(tokenParameters); + // Client and token parameters don't contain DateTime values or other custom types, so we don't need to consider + // compatibility. + this.tokenParameters = toSyncRulesParameters(tokenParameters, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); this.userId = tokenPayload.sub; this.rawTokenPayload = JSONBig.stringify(tokenPayload); this.rawUserParameters = JSONBig.stringify(clientParameters); - this.userParameters = toSyncRulesParameters(clientParameters!); + this.userParameters = toSyncRulesParameters(clientParameters!, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY); this.streamParameters = null; this.rawStreamParameters = null; } @@ -186,6 +191,12 @@ export type SqliteJsonValue = number | string | bigint | null; */ export type SqliteValue = number | string | null | bigint | Uint8Array; +/** + * A value that is either supported by SQLite natively, or one that can be lowered into a SQLite-value given additional + * context. + */ +export type SqliteInputValue = SqliteValue | CustomSqliteValue; + /** * A set of values that are both SQLite and JSON-compatible. * @@ -197,7 +208,9 @@ export type SqliteJsonRow = { [column: string]: SqliteJsonValue }; * SQLite-compatible row (NULL, TEXT, INTEGER, REAL, BLOB). * JSON is represented as TEXT. */ -export type SqliteRow = { [column: string]: SqliteValue }; +export type SqliteRow = { [column: string]: T }; + +export type SqliteInputRow = SqliteRow; /** * SQLite-compatible row (NULL, TEXT, INTEGER, REAL, BLOB). @@ -205,7 +218,7 @@ export type SqliteRow = { [column: string]: SqliteValue }; * * Toasted values are `undefined`. */ -export type ToastableSqliteRow = { [column: string]: SqliteValue | undefined }; +export type ToastableSqliteRow = SqliteRow; /** * A value as received from the database. @@ -215,6 +228,7 @@ export type DatabaseInputValue = | boolean | DatabaseInputValue[] | JsonContainer + | CustomSqliteValue | { [key: string]: DatabaseInputValue }; /** @@ -269,9 +283,14 @@ export interface InputParameter { parametersToLookupValue(parameters: ParameterValueSet): SqliteValue; } -export interface EvaluateRowOptions { +export interface EvaluateRowOptions extends TableRow {} + +/** + * A row associated with the table it's coming from. + */ +export interface TableRow { sourceTable: SourceTableInterface; - record: SqliteRow; + record: R; } /** diff --git a/packages/sync-rules/src/types/custom_sqlite_value.ts b/packages/sync-rules/src/types/custom_sqlite_value.ts new file mode 100644 index 000000000..d26af5390 --- /dev/null +++ b/packages/sync-rules/src/types/custom_sqlite_value.ts @@ -0,0 +1,65 @@ +import { JSONBig } from '@powersync/service-jsonbig'; +import { CompatibilityContext } from '../compatibility.js'; +import { SqliteValue, EvaluatedRow, SqliteInputValue, DatabaseInputValue } from '../types.js'; +import { SqliteValueType } from '../ExpressionType.js'; + +/** + * A value that decays into a {@link SqliteValue} in a context-specific way. + * + * This is used to conditionally render some values in different formats depending on compatibility options. For + * instance, old versions of the sync service used to [encode timestamp values incorrectly](https://github.com/powersync-ja/powersync-service/issues/286). + * To fix this without breaking backwards-compatibility, we now represent timestamp values as a {@link CustomSqliteValue} + * subtype where `toSqliteValue` returns the old or the new format depending on options. + * + * Instances of {@link CustomSqliteValue} are always temporary structures that aren't persisted. They are created by the + * replicator implementations, the sync rule implementation will invoke {@link toSqliteValue} to ensure that an + * {@link EvaluatedRow} only consists of proper SQLite values. + */ +export abstract class CustomSqliteValue { + /** + * Renders this custom value into a {@link SqliteValue}. + * + * @param context The current compatibility options. + */ + abstract toSqliteValue(context: CompatibilityContext): SqliteValue; + + abstract get sqliteType(): SqliteValueType; +} + +export class CustomArray extends CustomSqliteValue { + constructor( + private readonly elements: any[], + private readonly map: (element: any, context: CompatibilityContext) => void + ) { + super(); + } + + get sqliteType(): SqliteValueType { + return 'text'; + } + + toSqliteValue(context: CompatibilityContext): SqliteValue { + return JSONBig.stringify(this.elements.map((element) => this.map(element, context))); + } +} + +export class CustomObject extends CustomSqliteValue { + constructor( + private readonly source: Record, + private readonly map: (element: any, context: CompatibilityContext) => void + ) { + super(); + } + + get sqliteType(): SqliteValueType { + return 'text'; + } + + toSqliteValue(context: CompatibilityContext): SqliteValue { + let record: Record = {}; + for (let key of Object.keys(this.source)) { + record[key] = this.map(this.source[key], context); + } + return JSONBig.stringify(record); + } +} diff --git a/packages/sync-rules/src/types/time.ts b/packages/sync-rules/src/types/time.ts new file mode 100644 index 000000000..e4a1188fa --- /dev/null +++ b/packages/sync-rules/src/types/time.ts @@ -0,0 +1,75 @@ +import { SqliteValueType } from '../ExpressionType.js'; +import { CompatibilityContext, CompatibilityOption } from '../compatibility.js'; +import { SqliteValue } from '../types.js'; +import { CustomSqliteValue } from './custom_sqlite_value.js'; + +/** + * In old versions of the sync service, timestamp values were formatted with a space between the date and time + * components. + * + * This is not ISO 6801 compatible, but changing it would be breaking existing users. So, this option is opt-in and + * disabled by default until a major upgrade. + */ +export class DateTimeValue extends CustomSqliteValue { + // YYYY-MM-DDThh:mm:ss.sss / YYYY-MM-DDThh:mm:ss.sssZ + + constructor( + readonly iso8601Representation: string, + private readonly fixedLegacyRepresentation: string | undefined = undefined + ) { + super(); + } + + // YYYY-MM-DD hh:mm:ss.sss / YYYY-MM-DD hh:mm:ss.sssZ + public get legacyRepresentation(): string { + return this.fixedLegacyRepresentation ?? this.iso8601Representation.replace('T', ' '); + } + + get sqliteType(): SqliteValueType { + return 'text'; + } + + toSqliteValue(context: CompatibilityContext) { + return context.isEnabled(CompatibilityOption.timestampsIso8601) + ? this.iso8601Representation + : this.legacyRepresentation; + } +} + +/** + * In old versions of the sync service, time values didn't consistently contain a sub-second interval. + * + * A value like `12:13:14.156789` would be represented as-is, but `12:13:14.000000` would be synced as `12:13:14`. This + * is undesirable because it means that sorting values alphabetically doesn't preserve their value. + */ +export class TimeValue extends CustomSqliteValue { + constructor( + readonly timeSeconds: string, + readonly fraction: string | undefined = undefined + ) { + super(); + } + + static parse(value: string): TimeValue | null { + const match = /^([\d:]+)(\.\d+)?$/.exec(value); + if (match == null) { + return null; + } + + const [_, timeSeconds, fraction] = match as any; + return new TimeValue(timeSeconds, fraction); + } + + toSqliteValue(context: CompatibilityContext): SqliteValue { + if (context.isEnabled(CompatibilityOption.timestampsIso8601)) { + const fraction = this.fraction?.padEnd(7, '0') ?? '.000000'; + return `${this.timeSeconds}${fraction}`; + } else { + return `${this.timeSeconds}${this.fraction ?? ''}`; + } + } + + get sqliteType(): SqliteValueType { + return 'text'; + } +} diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index dd1cfb558..e58f33b04 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -1,8 +1,19 @@ import { JSONBig, JsonContainer, Replacer, stringifyRaw } from '@powersync/service-jsonbig'; import { SelectFromStatement, Statement } from 'pgsql-ast-parser'; import { SQLITE_FALSE, SQLITE_TRUE } from './sql_support.js'; -import { DatabaseInputRow, SqliteJsonRow, SqliteJsonValue, SqliteRow, SqliteValue } from './types.js'; +import { + DatabaseInputRow, + DatabaseInputValue, + SqliteInputValue, + SqliteInputRow, + SqliteJsonRow, + SqliteJsonValue, + SqliteRow, + SqliteValue +} from './types.js'; import { SyncRuleProcessingError as SyncRulesProcessingError } from './errors.js'; +import { CustomArray, CustomObject, CustomSqliteValue } from './types/custom_sqlite_value.js'; +import { CompatibilityContext } from './compatibility.js'; export function isSelectStatement(q: Statement): q is SelectFromStatement { return q.type == 'select'; @@ -58,7 +69,7 @@ export function isJsonValue(value: SqliteValue): value is SqliteJsonValue { return value == null || typeof value == 'string' || typeof value == 'number' || typeof value == 'bigint'; } -function filterJsonData(data: any, depth = 0): any { +function filterJsonData(data: any, context: CompatibilityContext, depth = 0): any { if (depth > DEPTH_LIMIT) { // This is primarily to prevent infinite recursion // TODO: Proper error class @@ -77,18 +88,20 @@ function filterJsonData(data: any, depth = 0): any { } else if (typeof data == 'bigint') { return data; } else if (Array.isArray(data)) { - return data.map((element) => filterJsonData(element, depth + 1)); + return data.map((element) => filterJsonData(element, context, depth + 1)); } else if (ArrayBuffer.isView(data)) { return undefined; } else if (data instanceof JsonContainer) { // Can be stringified directly when using our JSONBig implementation - return data; + return data as any; + } else if (data instanceof CustomSqliteValue) { + return data.toSqliteValue(context); } else if (typeof data == 'object') { let record: Record = {}; for (let key of Object.keys(data)) { - record[key] = filterJsonData(data[key], depth + 1); + record[key] = filterJsonData(data[key], context, depth + 1); } - return record; + return record as any; } else { return undefined; } @@ -97,8 +110,8 @@ function filterJsonData(data: any, depth = 0): any { /** * Map database row to SqliteRow for use in sync rules. */ -export function toSyncRulesRow(row: DatabaseInputRow): SqliteRow { - let record: SqliteRow = {}; +export function toSyncRulesRow(row: DatabaseInputRow): SqliteInputRow { + let record: SqliteInputRow = {}; for (let key of Object.keys(row)) { record[key] = toSyncRulesValue(row[key], false, true); } @@ -110,10 +123,10 @@ export function toSyncRulesRow(row: DatabaseInputRow): SqliteRow { * * @param parameters Generic JSON input */ -export function toSyncRulesParameters(parameters: Record): SqliteJsonRow { +export function toSyncRulesParameters(parameters: Record, context: CompatibilityContext): SqliteJsonRow { let record: SqliteJsonRow = {}; for (let key of Object.keys(parameters)) { - record[key] = toSyncRulesValue(parameters[key], true, false) as SqliteJsonValue; + record[key] = applyValueContext(toSyncRulesValue(parameters[key], true, false), context) as SqliteJsonValue; } return record; } @@ -123,7 +136,11 @@ export function toSyncRulesParameters(parameters: Record): SqliteJs * * Any object or array is converted to JSON TEXT. */ -export function toSyncRulesValue(data: any, autoBigNum?: boolean, keepUndefined?: boolean): SqliteValue { +export function toSyncRulesValue( + data: DatabaseInputValue, + autoBigNum?: boolean, + keepUndefined?: boolean +): SqliteInputValue { if (data == null) { // null or undefined if (keepUndefined) { @@ -143,23 +160,34 @@ export function toSyncRulesValue(data: any, autoBigNum?: boolean, keepUndefined? } else if (typeof data == 'boolean') { return data ? SQLITE_TRUE : SQLITE_FALSE; } else if (Array.isArray(data)) { - // We may be able to avoid some parse + stringify cycles here for JsonSqliteContainer. - return JSONBig.stringify(data.map((element) => filterJsonData(element))); - } else if (data instanceof Uint8Array) { + return new CustomArray(data, filterJsonData); + } else if (data instanceof Uint8Array || data instanceof CustomSqliteValue) { return data; } else if (data instanceof JsonContainer) { return data.toString(); } else if (typeof data == 'object') { - let record: Record = {}; - for (let key of Object.keys(data)) { - record[key] = filterJsonData(data[key]); - } - return JSONBig.stringify(record); + return new CustomObject(data, filterJsonData); } else { return null; } } +export function applyValueContext(value: SqliteInputValue, context: CompatibilityContext): SqliteValue { + if (value instanceof CustomSqliteValue) { + return value.toSqliteValue(context); + } else { + return value; + } +} + +export function applyRowContext(value: SqliteInputRow, context: CompatibilityContext): SqliteRow { + let record: SqliteRow = {}; + for (let key of Object.keys(value)) { + record[key] = applyValueContext(value[key], context); + } + return record; +} + /** * Only use this for serializing bucket names. Bucket names should never be parsed except perhaps for debug purposes. * diff --git a/packages/sync-rules/test/src/compatibility.test.ts b/packages/sync-rules/test/src/compatibility.test.ts new file mode 100644 index 000000000..0d15360c2 --- /dev/null +++ b/packages/sync-rules/test/src/compatibility.test.ts @@ -0,0 +1,174 @@ +import { describe, expect, test } from 'vitest'; +import { CustomSqliteValue, SqlSyncRules, DateTimeValue, toSyncRulesValue } from '../../src/index.js'; + +import { ASSETS, PARSE_OPTIONS } from './util.js'; + +describe('compatibility options', () => { + describe('timestamps', () => { + const value = new DateTimeValue('2025-08-19T09:21:00Z'); + + test('uses old format by default', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + mybucket: + data: + - SELECT id, description FROM assets + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + record: { + id: 'id', + description: value + } + }) + ).toStrictEqual([ + { bucket: 'mybucket[]', data: { description: '2025-08-19 09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + }); + + test('can opt-in to new format', () => { + const rules = SqlSyncRules.fromYaml( + ` +bucket_definitions: + mybucket: + data: + - SELECT id, description FROM assets + +config: + timestamps_iso8601: true + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + record: { + id: 'id', + description: value + } + }) + ).toStrictEqual([ + { bucket: 'mybucket[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + }); + + test('streams use new format by default', () => { + const rules = SqlSyncRules.fromYaml( + ` +streams: + stream: + query: SELECT id, description FROM assets + +config: + edition: 2 + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + record: { + id: 'id', + description: value + } + }) + ).toStrictEqual([ + { bucket: 'stream|0[]', data: { description: '2025-08-19T09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + }); + + test('streams can disable new format', () => { + const rules = SqlSyncRules.fromYaml( + ` +streams: + stream: + query: SELECT id, description FROM assets + +config: + edition: 2 + timestamps_iso8601: false + `, + PARSE_OPTIONS + ); + + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + record: { + id: 'id', + description: value + } + }) + ).toStrictEqual([ + { bucket: 'stream|0[]', data: { description: '2025-08-19 09:21:00Z', id: 'id' }, id: 'id', table: 'assets' } + ]); + }); + }); + + test('warning for unknown option', () => { + expect(() => { + SqlSyncRules.fromYaml( + ` +bucket_definitions: + mybucket: + data: + - SELECT id, description FROM assets + +config: + unknown_option: true + `, + PARSE_OPTIONS + ); + }).toThrow(/must NOT have additional properties/); + }); + + test('arrays', () => { + const data = toSyncRulesValue(['static value', new DateTimeValue('2025-08-19T09:21:00Z')]); + + for (const withFixedQuirk of [false, true]) { + let syncRules = ` +bucket_definitions: + mybucket: + data: + - SELECT id, description FROM assets + `; + + if (withFixedQuirk) { + syncRules += ` +config: + edition: 2 + `; + } + + const rules = SqlSyncRules.fromYaml(syncRules, PARSE_OPTIONS); + expect( + rules.evaluateRow({ + sourceTable: ASSETS, + record: { + id: 'id', + description: data + } + }) + ).toStrictEqual([ + { + bucket: 'mybucket[]', + data: { + description: withFixedQuirk + ? '["static value","2025-08-19T09:21:00Z"]' + : '["static value","2025-08-19 09:21:00Z"]', + id: 'id' + }, + id: 'id', + table: 'assets' + } + ]); + } + }); +}); diff --git a/packages/sync-rules/test/src/streams.test.ts b/packages/sync-rules/test/src/streams.test.ts index ab70ff42d..9352f16cb 100644 --- a/packages/sync-rules/test/src/streams.test.ts +++ b/packages/sync-rules/test/src/streams.test.ts @@ -2,6 +2,8 @@ import { describe, expect, test } from 'vitest'; import { BucketParameterQuerier, + CompatibilityContext, + CompatibilityEdition, DEFAULT_TAG, GetBucketParameterQuerierResult, mergeBucketParameterQueriers, @@ -11,12 +13,22 @@ import { SqliteJsonRow, SqliteRow, StaticSchema, + StreamParseOptions, SyncStream, syncStreamFromSql } from '../../src/index.js'; import { normalizeQuerierOptions, PARSE_OPTIONS, TestSourceTable } from './util.js'; describe('streams', () => { + test('refuses edition: 1', () => { + expect(() => + syncStreamFromSql('stream', 'SELECT * FROM comments', { + compatibility: CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY, + defaultSchema: 'public' + }) + ).throws('Sync streams require edition 2 or later'); + }); + test('without filter', () => { const desc = parseStream('SELECT * FROM comments'); @@ -610,7 +622,11 @@ const schema = new StaticSchema([ } ]); -const options = { schema: schema, ...PARSE_OPTIONS }; +const options: StreamParseOptions = { + schema: schema, + ...PARSE_OPTIONS, + compatibility: new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS) +}; function evaluateBucketIds(stream: SyncStream, sourceTable: SourceTableInterface, record: SqliteRow) { return stream.evaluateRow({ sourceTable, record }).map((r) => { diff --git a/packages/sync-rules/test/src/util.ts b/packages/sync-rules/test/src/util.ts index 42fe8ac2a..d5f2f6a7f 100644 --- a/packages/sync-rules/test/src/util.ts +++ b/packages/sync-rules/test/src/util.ts @@ -1,4 +1,5 @@ import { + CompatibilityContext, DEFAULT_TAG, GetQuerierOptions, RequestedStream, @@ -16,7 +17,8 @@ export class TestSourceTable implements SourceTableInterface { } export const PARSE_OPTIONS = { - defaultSchema: 'test_schema' + defaultSchema: 'test_schema', + compatibility: CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY }; export const ASSETS = new TestSourceTable('assets'); diff --git a/packages/sync-rules/test/src/utils.test.ts b/packages/sync-rules/test/src/utils.test.ts new file mode 100644 index 000000000..648a5e7ef --- /dev/null +++ b/packages/sync-rules/test/src/utils.test.ts @@ -0,0 +1,43 @@ +import { + applyValueContext, + CompatibilityContext, + DateTimeValue, + toSyncRulesValue, + TimeValue, + CompatibilityEdition +} from '../../src/index.js'; +import { describe, expect, test } from 'vitest'; + +describe('toSyncRulesValue', () => { + test('custom value', () => { + expect( + applyValueContext( + toSyncRulesValue([1n, 'two', [new DateTimeValue('2025-08-19T00:00:00')]]), + CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY + ) + ).toStrictEqual('[1,"two",["2025-08-19 00:00:00"]]'); + + expect( + applyValueContext( + toSyncRulesValue({ foo: { bar: new DateTimeValue('2025-08-19T00:00:00') } }), + CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY + ) + ).toStrictEqual('{"foo":{"bar":"2025-08-19 00:00:00"}}'); + }); + + test('time value', () => { + expect( + TimeValue.parse('12:13:14')?.toSqliteValue(new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS)) + ).toStrictEqual('12:13:14.000000'); + expect( + TimeValue.parse('12:13:14')?.toSqliteValue(new CompatibilityContext(CompatibilityEdition.LEGACY)) + ).toStrictEqual('12:13:14'); + + expect( + TimeValue.parse('12:13:14.15')?.toSqliteValue(new CompatibilityContext(CompatibilityEdition.SYNC_STREAMS)) + ).toStrictEqual('12:13:14.150000'); + expect( + TimeValue.parse('12:13:14.15')?.toSqliteValue(new CompatibilityContext(CompatibilityEdition.LEGACY)) + ).toStrictEqual('12:13:14.15'); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index aef6017fa..fdf514281 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -113,6 +113,9 @@ importers: '@powersync/service-errors': specifier: workspace:* version: link:../../packages/service-errors + '@powersync/service-sync-rules': + specifier: workspace:* + version: link:../../packages/sync-rules ajv: specifier: ^8.12.0 version: 8.16.0 @@ -416,6 +419,9 @@ importers: '@powersync/service-jsonbig': specifier: workspace:^ version: link:../jsonbig + '@powersync/service-sync-rules': + specifier: workspace:^ + version: link:../sync-rules date-fns: specifier: ^4.1.0 version: 4.1.0