diff --git a/.changeset/dry-pots-leave.md b/.changeset/dry-pots-leave.md new file mode 100644 index 000000000..fd62e6091 --- /dev/null +++ b/.changeset/dry-pots-leave.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/service-module-mongodb': patch +'@powersync/service-core': patch +'@powersync/service-module-mysql': patch +'@powersync/service-sync-rules': patch +--- + +Correctly handle custom types in primary keys. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 472565106..003f50134 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -319,8 +319,7 @@ export class MongoBucketBatch const record = operation.record; const beforeId = operation.beforeId; const afterId = operation.afterId; - let sourceAfter = record.after; - let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter); + let after = record.after; const sourceTable = record.sourceTable; let existing_buckets: CurrentBucket[] = []; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index a529c7336..9ccb1fc61 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -481,7 +481,7 @@ export class ChangeStream { // Pre-fetch next batch, so that we can read and write concurrently nextChunkPromise = query.nextChunk(); for (let document of docBatch) { - const record = constructAfterRecord(document); + const record = this.constructAfterRecord(document); // This auto-flushes when the batch reaches its size limit await batch.save({ @@ -619,6 +619,11 @@ export class ChangeStream { return result.table; } + private constructAfterRecord(document: mongo.Document): SqliteRow { + const inputRow = constructAfterRecord(document); + return this.sync_rules.applyRowContext(inputRow); + } + async writeChange( batch: storage.BucketStorageBatch, table: storage.SourceTable, @@ -631,7 +636,7 @@ export class ChangeStream { this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); if (change.operationType == 'insert') { - const baseRecord = constructAfterRecord(change.fullDocument); + const baseRecord = this.constructAfterRecord(change.fullDocument); return await batch.save({ tag: SaveOperationTag.INSERT, sourceTable: table, @@ -650,7 +655,7 @@ export class ChangeStream { beforeReplicaId: change.documentKey._id }); } - const after = constructAfterRecord(change.fullDocument!); + const after = this.constructAfterRecord(change.fullDocument!); return await batch.save({ tag: SaveOperationTag.UPDATE, sourceTable: table, diff --git a/modules/module-mongodb/src/replication/MongoRelation.ts b/modules/module-mongodb/src/replication/MongoRelation.ts index 20e78a413..d1c6a0ed3 100644 --- a/modules/module-mongodb/src/replication/MongoRelation.ts +++ b/modules/module-mongodb/src/replication/MongoRelation.ts @@ -1,16 +1,13 @@ import { mongo } from '@powersync/lib-service-mongodb'; import { storage } from '@powersync/service-core'; -import { JSONBig, JsonContainer } from '@powersync/service-jsonbig'; +import { JsonContainer } from '@powersync/service-jsonbig'; import { CompatibilityContext, CustomArray, CustomObject, CustomSqliteValue, - DatabaseInputValue, SqliteInputRow, SqliteInputValue, - SqliteRow, - SqliteValue, DateTimeValue } from '@powersync/service-sync-rules'; diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index c0d4e91e3..ef1dc057b 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -329,7 +329,7 @@ export class BinLogStream { throw new ReplicationAssertionError(`No 'fields' event emitted`); } - const record = common.toSQLiteRow(row, columns!); + const record = this.toSQLiteRow(row, columns!); await batch.save({ tag: storage.SaveOperationTag.INSERT, sourceTable: table, @@ -596,6 +596,11 @@ export class BinLogStream { return null; } + private toSQLiteRow(row: Record, columns: Map): sync_rules.SqliteRow { + const inputRecord = common.toSQLiteRow(row, columns); + return this.syncRules.applyRowContext(inputRecord); + } + private async writeChange( batch: storage.BucketStorageBatch, payload: WriteChangePayload @@ -603,7 +608,7 @@ export class BinLogStream { switch (payload.type) { case storage.SaveOperationTag.INSERT: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const record = common.toSQLiteRow(payload.row, payload.columns); + const record = this.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.INSERT, sourceTable: payload.sourceTable, @@ -617,9 +622,9 @@ export class BinLogStream { // The previous row may be null if the replica id columns are unchanged. // It's fine to treat that the same as an insert. const beforeUpdated = payload.previous_row - ? common.toSQLiteRow(payload.previous_row, payload.columns) + ? this.toSQLiteRow(payload.previous_row, payload.columns) : undefined; - const after = common.toSQLiteRow(payload.row, payload.columns); + const after = this.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.UPDATE, @@ -634,7 +639,7 @@ export class BinLogStream { case storage.SaveOperationTag.DELETE: this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const beforeDeleted = common.toSQLiteRow(payload.row, payload.columns); + const beforeDeleted = this.toSQLiteRow(payload.row, payload.columns); return await batch.save({ tag: storage.SaveOperationTag.DELETE, diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index a1c39608d..62e7f118e 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -687,8 +687,7 @@ export class PostgresBucketBatch // We store bytea colums for source keys const beforeId = operation.beforeId; const afterId = operation.afterId; - let sourceAfter = record.after; - let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter); + let after = record.after; const sourceTable = record.sourceTable; let existingBuckets: CurrentBucket[] = []; diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index ee34b6d0f..227f00a63 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -25,8 +25,11 @@ import { CompatibilityContext, DatabaseInputRow, SqliteInputRow, + SqliteInputValue, + SqliteRow, SqlSyncRules, TablePattern, + ToastableSqliteRow, toSyncRulesRow } from '@powersync/service-sync-rules'; @@ -635,7 +638,8 @@ WHERE oid = $1::regclass`, hasRemainingData = true; } - for (const record of WalStream.getQueryData(rows)) { + for (const inputRecord of WalStream.getQueryData(rows)) { + const record = this.syncRulesRecord(inputRecord); // This auto-flushes when the batch reaches its size limit await batch.save({ tag: storage.SaveOperationTag.INSERT, @@ -787,6 +791,20 @@ WHERE oid = $1::regclass`, return table; } + private syncRulesRecord(row: SqliteInputRow): SqliteRow; + private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined; + + private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined { + if (row == null) { + return undefined; + } + return this.sync_rules.applyRowContext(row); + } + + private toastableSyncRulesRecord(row: ToastableSqliteRow): ToastableSqliteRow { + return this.sync_rules.applyRowContext(row); + } + async writeChange( batch: storage.BucketStorageBatch, msg: pgwire.PgoutputMessage @@ -803,7 +821,7 @@ WHERE oid = $1::regclass`, if (msg.tag == 'insert') { this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const baseRecord = this.connections.types.constructAfterRecord(msg); + const baseRecord = this.syncRulesRecord(this.connections.types.constructAfterRecord(msg)); return await batch.save({ tag: storage.SaveOperationTag.INSERT, sourceTable: table, @@ -816,8 +834,8 @@ WHERE oid = $1::regclass`, this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); // "before" may be null if the replica id columns are unchanged // It's fine to treat that the same as an insert. - const before = this.connections.types.constructBeforeRecord(msg); - const after = this.connections.types.constructAfterRecord(msg); + const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg)); + const after = this.toastableSyncRulesRecord(this.connections.types.constructAfterRecord(msg)); return await batch.save({ tag: storage.SaveOperationTag.UPDATE, sourceTable: table, @@ -828,7 +846,7 @@ WHERE oid = $1::regclass`, }); } else if (msg.tag == 'delete') { this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1); - const before = this.connections.types.constructBeforeRecord(msg)!; + const before = this.syncRulesRecord(this.connections.types.constructBeforeRecord(msg)!); return await batch.save({ tag: storage.SaveOperationTag.DELETE, diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 80e545773..f888f3333 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -325,6 +325,42 @@ bucket_definitions: } }); + test('old date format', async () => { + await using context = await WalStreamTestContext.open(factory); + await context.updateSyncRules(BASIC_SYNC_RULES); + + const { pool } = context; + await pool.query(`DROP TABLE IF EXISTS test_data`); + await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`); + + await context.initializeReplication(); + await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`); + + let data = await context.getBucketData('global[]'); + expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10 13:17:14Z' })]); + }); + + test('new date format', async () => { + await using context = await WalStreamTestContext.open(factory); + await context.updateSyncRules(` +streams: + stream: + query: SELECT id, * FROM "test_data" + +config: + edition: 2 +`); + const { pool } = context; + await pool.query(`DROP TABLE IF EXISTS test_data`); + await pool.query(`CREATE TABLE test_data(id text primary key, description timestamptz);`); + + await context.initializeReplication(); + await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', '2025-09-10 15:17:14+02')`); + + const data = await context.getBucketData('1#stream|0[]'); + expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '2025-09-10T13:17:14.000000Z' })]); + }); + test('custom types', async () => { await using context = await WalStreamTestContext.open(factory); @@ -348,4 +384,69 @@ config: const data = await context.getBucketData('1#stream|0[]'); expect(data).toMatchObject([putOp('test_data', { id: 't1', description: '{"foo":1,"bar":2}' })]); }); + + test('custom types in primary key', async () => { + await using context = await WalStreamTestContext.open(factory); + + await context.updateSyncRules(` +streams: + stream: + query: SELECT id, * FROM "test_data" + +config: + edition: 2 +`); + + const { pool } = context; + await pool.query(`DROP TABLE IF EXISTS test_data`); + await pool.query(`CREATE DOMAIN test_id AS TEXT;`); + await pool.query(`CREATE TABLE test_data(id test_id primary key);`); + + await context.initializeReplication(); + await pool.query(`INSERT INTO test_data(id) VALUES ('t1')`); + + const data = await context.getBucketData('1#stream|0[]'); + expect(data).toMatchObject([putOp('test_data', { id: 't1' })]); + }); + + test('replica identity handling', async () => { + // This specifically test a case of timestamps being used as part of the replica identity. + // There was a regression in versions 1.15.0-1.15.5, which this tests for. + await using context = await WalStreamTestContext.open(factory); + const { pool } = context; + await context.updateSyncRules(BASIC_SYNC_RULES); + + await pool.query(`DROP TABLE IF EXISTS test_data`); + await pool.query(`CREATE TABLE test_data(id uuid primary key, description text, ts timestamptz)`); + await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`); + + const test_id = `a9798b07-84de-4297-9a8e-aafb4dd0282f`; + + await pool.query( + `INSERT INTO test_data(id, description, ts) VALUES('${test_id}', 'test1', '2025-01-01T00:00:00Z') returning id as test_id` + ); + + await context.replicateSnapshot(); + await context.startStreaming(); + + await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`); + + const data = await context.getBucketData('global[]'); + // For replica identity full, each change changes the id, making it a REMOVE+PUT + expect(data).toMatchObject([ + // Initial insert + putOp('test_data', { id: test_id, description: 'test1' }), + // Update + removeOp('test_data', test_id), + putOp('test_data', { id: test_id, description: 'test2' }) + ]); + + // subkey contains `${table id}/${replica identity}`. + // table id changes from run to run, but replica identity should always stay constant. + // This should not change if we make changes to the implementation + // (unless specifically opting in to new behavior) + expect(data[0].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056'); + expect(data[1].subkey).toContain('/c7b3f1a3-ec4d-5d44-b295-c7f2a32bb056'); + expect(data[2].subkey).toContain('/984d457a-69f0-559a-a2f9-a511c28b968d'); + }); } diff --git a/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts index 21b957098..f9729f21d 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-data-tests.ts @@ -1,11 +1,4 @@ -import { - BucketDataBatchOptions, - getUuidReplicaIdentityBson, - OplogEntry, - SaveOptions, - storage -} from '@powersync/service-core'; -import { DateTimeValue } from '@powersync/service-sync-rules'; +import { BucketDataBatchOptions, getUuidReplicaIdentityBson, OplogEntry, storage } from '@powersync/service-core'; import { describe, expect, test } from 'vitest'; import * as test_utils from '../test-utils/test-utils-index.js'; import { TEST_TABLE } from './util.js'; @@ -1137,72 +1130,6 @@ bucket_definitions: expect(checkpoint2).toBeGreaterThan(checkpoint1); }); - test('data with custom types', async () => { - await using factory = await generateStorageFactory(); - const testValue = { - sourceTable: TEST_TABLE, - tag: storage.SaveOperationTag.INSERT, - after: { - id: 't1', - description: new DateTimeValue('2025-08-28T11:30:00') - }, - afterReplicaId: test_utils.rid('t1') - } satisfies SaveOptions; - - { - // First, deploy old sync rules and row with date time value - const syncRules = await factory.updateSyncRules({ - content: ` - bucket_definitions: - global: - data: - - SELECT id, description FROM test - ` - }); - const bucketStorage = factory.getInstance(syncRules); - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save(testValue); - await batch.commit('1/1'); - }); - - const { checkpoint } = await bucketStorage.getCheckpoint(); - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]])) - ); - expect(batch[0].chunkData.data).toMatchObject([ - { - data: '{"id":"t1","description":"2025-08-28 11:30:00"}' - } - ]); - } - - const syncRules = await factory.updateSyncRules({ - content: ` - bucket_definitions: - global: - data: - - SELECT id, description FROM test - - config: - edition: 2 - ` - }); - const bucketStorage = factory.getInstance(syncRules); - await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { - await batch.save(testValue); - await batch.commit('1/2'); - }); - const { checkpoint } = await bucketStorage.getCheckpoint(); - const batch = await test_utils.fromAsync( - bucketStorage.getBucketDataBatch(checkpoint, new Map([['2#global[]', 0n]])) - ); - expect(batch[0].chunkData.data).toMatchObject([ - { - data: '{"id":"t1","description":"2025-08-28T11:30:00"}' - } - ]); - }); - test('unchanged checksums', async () => { await using factory = await generateStorageFactory(); const syncRules = await factory.updateSyncRules({ diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index 614f28406..62db7dd43 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -1,11 +1,5 @@ import { ObserverClient } from '@powersync/lib-services-framework'; -import { - EvaluatedParameters, - EvaluatedRow, - SqliteInputRow, - SqliteRow, - ToastableSqliteRow -} from '@powersync/service-sync-rules'; +import { EvaluatedParameters, EvaluatedRow, SqliteRow, ToastableSqliteRow } from '@powersync/service-sync-rules'; import { BSON } from 'bson'; import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { SourceTable, TableSnapshotStatus } from './SourceTable.js'; @@ -138,7 +132,7 @@ export interface SaveInsert { sourceTable: SourceTable; before?: undefined; beforeReplicaId?: undefined; - after: SqliteInputRow; + after: SqliteRow; afterReplicaId: ReplicaId; } @@ -149,7 +143,7 @@ export interface SaveUpdate { /** * This is only present when the id has changed, and will only contain replica identity columns. */ - before?: SqliteInputRow; + before?: SqliteRow; beforeReplicaId?: ReplicaId; /** @@ -164,7 +158,7 @@ export interface SaveUpdate { export interface SaveDelete { tag: SaveOperationTag.DELETE; sourceTable: SourceTable; - before?: SqliteInputRow; + before?: SqliteRow; beforeReplicaId: ReplicaId; after?: undefined; afterReplicaId?: undefined; diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 411f9b49d..70a7fff3b 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -182,7 +182,7 @@ export function uuidForRowBson(row: sync_rules.SqliteRow): bson.UUID { return new bson.UUID(uuid.v5(repr, ID_NAMESPACE, buffer)); } -export function hasToastedValues(row: sync_rules.ToastableSqliteRow) { +export function hasToastedValues(row: sync_rules.ToastableSqliteRow) { for (let key in row) { if (typeof row[key] == 'undefined') { return true; @@ -196,10 +196,10 @@ export function hasToastedValues(row: sync_rules.ToastableSqliteRow) { * * If we don't store data, we assume we always have a complete row. */ -export function isCompleteRow( +export function isCompleteRow( storeData: boolean, - row: sync_rules.ToastableSqliteRow -): row is sync_rules.SqliteInputRow { + row: sync_rules.ToastableSqliteRow +): row is sync_rules.SqliteRow { if (!storeData) { // Assume the row is complete - no need to check return true; diff --git a/packages/sync-rules/src/BucketSource.ts b/packages/sync-rules/src/BucketSource.ts index deab23373..6131d78d7 100644 --- a/packages/sync-rules/src/BucketSource.ts +++ b/packages/sync-rules/src/BucketSource.ts @@ -3,13 +3,7 @@ import { ColumnDefinition } from './ExpressionType.js'; import { SourceTableInterface } from './SourceTableInterface.js'; import { GetQuerierOptions } from './SqlSyncRules.js'; import { TablePattern } from './TablePattern.js'; -import { - EvaluatedParametersResult, - EvaluateRowOptions, - EvaluationResult, - SourceSchema, - SqliteInputRow -} from './types.js'; +import { EvaluatedParametersResult, EvaluateRowOptions, EvaluationResult, SourceSchema, SqliteRow } from './types.js'; /** * An interface declaring @@ -34,7 +28,7 @@ export interface BucketSource { * The returned {@link ParameterLookup} can be referenced by {@link pushBucketParameterQueriers} to allow the storage * system to find buckets. */ - evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteInputRow): EvaluatedParametersResult[]; + evaluateParameterRow(sourceTable: SourceTableInterface, row: SqliteRow): EvaluatedParametersResult[]; /** * Given a row as it appears in a table that affects sync data, return buckets, logical table names and transformed diff --git a/packages/sync-rules/src/SqlBucketDescriptor.ts b/packages/sync-rules/src/SqlBucketDescriptor.ts index 6e0a622df..353dbb21c 100644 --- a/packages/sync-rules/src/SqlBucketDescriptor.ts +++ b/packages/sync-rules/src/SqlBucketDescriptor.ts @@ -22,7 +22,6 @@ import { SourceSchema, SqliteRow } from './types.js'; -import { applyRowContext } from './utils.js'; export interface QueryParseResult { /** diff --git a/packages/sync-rules/src/SqlSyncRules.ts b/packages/sync-rules/src/SqlSyncRules.ts index 836b602d7..6c741d0bf 100644 --- a/packages/sync-rules/src/SqlSyncRules.ts +++ b/packages/sync-rules/src/SqlSyncRules.ts @@ -21,7 +21,6 @@ import { QueryParseOptions, RequestParameters, SourceSchema, - SqliteInputRow, SqliteInputValue, SqliteJsonRow, SqliteRow, @@ -426,7 +425,7 @@ export class SqlSyncRules implements SyncRules { /** * Throws errors. */ - evaluateParameterRow(table: SourceTableInterface, row: SqliteInputRow): EvaluatedParameters[] { + evaluateParameterRow(table: SourceTableInterface, row: SqliteRow): EvaluatedParameters[] { const { results, errors } = this.evaluateParameterRowWithErrors(table, row); if (errors.length > 0) { throw new Error(errors[0].error); @@ -436,7 +435,7 @@ export class SqlSyncRules implements SyncRules { evaluateParameterRowWithErrors( table: SourceTableInterface, - row: SqliteInputRow + row: SqliteRow ): { results: EvaluatedParameters[]; errors: EvaluationError[] } { let rawResults: EvaluatedParametersResult[] = []; for (let source of this.bucketSources) { diff --git a/packages/sync-rules/src/events/SqlEventDescriptor.ts b/packages/sync-rules/src/events/SqlEventDescriptor.ts index 205676a43..8ea79f67e 100644 --- a/packages/sync-rules/src/events/SqlEventDescriptor.ts +++ b/packages/sync-rules/src/events/SqlEventDescriptor.ts @@ -5,7 +5,6 @@ import { QueryParseResult } from '../SqlBucketDescriptor.js'; import { SyncRulesOptions } from '../SqlSyncRules.js'; import { TablePattern } from '../TablePattern.js'; import { EvaluateRowOptions } from '../types.js'; -import { applyRowContext } from '../utils.js'; import { EvaluatedEventRowWithErrors, SqlEventSourceQuery } from './SqlEventSourceQuery.js'; /** diff --git a/packages/sync-rules/src/types.ts b/packages/sync-rules/src/types.ts index e89cabf5d..ce9341123 100644 --- a/packages/sync-rules/src/types.ts +++ b/packages/sync-rules/src/types.ts @@ -218,7 +218,7 @@ export type SqliteInputRow = SqliteRow; * * Toasted values are `undefined`. */ -export type ToastableSqliteRow = SqliteRow; +export type ToastableSqliteRow = SqliteRow; /** * A value as received from the database. diff --git a/packages/sync-rules/src/utils.ts b/packages/sync-rules/src/utils.ts index baf52eadf..06d933ea2 100644 --- a/packages/sync-rules/src/utils.ts +++ b/packages/sync-rules/src/utils.ts @@ -197,15 +197,22 @@ export function applyRowContext( value: SqliteRow, context: CompatibilityContext ): SqliteRow { - let record: SqliteRow = {}; + let replacedCustomValues: SqliteRow = {}; + let didReplaceValue = false; + for (let [key, rawValue] of Object.entries(value)) { - if (rawValue === undefined) { - record[key] = undefined as MaybeToast; - } else { - record[key] = applyValueContext(rawValue, context); + if (rawValue instanceof CustomSqliteValue) { + replacedCustomValues[key] = rawValue.toSqliteValue(context); + didReplaceValue = true; } } - return record; + + if (didReplaceValue) { + return Object.assign({ ...value }, replacedCustomValues); + } else { + // The cast is safe - no values in the original row are CustomSqliteValues. + return value as SqliteRow; + } } /**