Skip to content

Commit 5da2232

Browse files
committed
Merge remote-tracking branch 'origin/main' into compact-checksums
2 parents c821420 + d5c4691 commit 5da2232

File tree

12 files changed

+125
-56
lines changed

12 files changed

+125
-56
lines changed

modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
2-
import { SqlEventDescriptor, SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules';
2+
import { SqlEventDescriptor, SqliteRow, SqliteValue, SqlSyncRules } from '@powersync/service-sync-rules';
33
import * as bson from 'bson';
44

55
import {
@@ -319,7 +319,8 @@ export class MongoBucketBatch
319319
const record = operation.record;
320320
const beforeId = operation.beforeId;
321321
const afterId = operation.afterId;
322-
let after = record.after;
322+
let sourceAfter = record.after;
323+
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
323324
const sourceTable = record.sourceTable;
324325

325326
let existing_buckets: CurrentBucket[] = [];
@@ -367,7 +368,7 @@ export class MongoBucketBatch
367368
existing_lookups = result.lookups;
368369
if (this.storeCurrentData) {
369370
const data = deserializeBson((result.data as mongo.Binary).buffer) as SqliteRow;
370-
after = storage.mergeToast(after!, data);
371+
after = storage.mergeToast<SqliteValue>(after!, data);
371372
}
372373
}
373374
} else if (record.tag == SaveOperationTag.DELETE) {

modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -687,7 +687,8 @@ export class PostgresBucketBatch
687687
// We store bytea colums for source keys
688688
const beforeId = operation.beforeId;
689689
const afterId = operation.afterId;
690-
let after = record.after;
690+
let sourceAfter = record.after;
691+
let after = sourceAfter && this.sync_rules.applyRowContext(sourceAfter);
691692
const sourceTable = record.sourceTable;
692693

693694
let existingBuckets: CurrentBucket[] = [];

packages/service-core-tests/src/tests/register-data-storage-tests.ts

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import {
33
getUuidReplicaIdentityBson,
44
InternalOpId,
55
OplogEntry,
6+
SaveOptions,
67
storage
78
} from '@powersync/service-core';
8-
import { ParameterLookup, RequestParameters } from '@powersync/service-sync-rules';
9+
import { DateTimeValue, ParameterLookup, RequestParameters } from '@powersync/service-sync-rules';
910
import { expect, test, describe, beforeEach } from 'vitest';
1011
import * as test_utils from '../test-utils/test-utils-index.js';
1112
import { SqlBucketDescriptor } from '@powersync/service-sync-rules/src/SqlBucketDescriptor.js';
@@ -1995,4 +1996,70 @@ bucket_definitions:
19951996
// we expect 0n and 1n, or 1n and 2n.
19961997
expect(checkpoint2).toBeGreaterThan(checkpoint1);
19971998
});
1999+
2000+
test('data with custom types', async () => {
2001+
await using factory = await generateStorageFactory();
2002+
const testValue = {
2003+
sourceTable: TEST_TABLE,
2004+
tag: storage.SaveOperationTag.INSERT,
2005+
after: {
2006+
id: 't1',
2007+
description: new DateTimeValue('2025-08-28T11:30:00')
2008+
},
2009+
afterReplicaId: test_utils.rid('t1')
2010+
} satisfies SaveOptions;
2011+
2012+
{
2013+
// First, deploy old sync rules and row with date time value
2014+
const syncRules = await factory.updateSyncRules({
2015+
content: `
2016+
bucket_definitions:
2017+
global:
2018+
data:
2019+
- SELECT id, description FROM test
2020+
`
2021+
});
2022+
const bucketStorage = factory.getInstance(syncRules);
2023+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
2024+
await batch.save(testValue);
2025+
await batch.commit('1/1');
2026+
});
2027+
2028+
const { checkpoint } = await bucketStorage.getCheckpoint();
2029+
const batch = await test_utils.fromAsync(
2030+
bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))
2031+
);
2032+
expect(batch[0].chunkData.data).toMatchObject([
2033+
{
2034+
data: '{"id":"t1","description":"2025-08-28 11:30:00"}'
2035+
}
2036+
]);
2037+
}
2038+
2039+
const syncRules = await factory.updateSyncRules({
2040+
content: `
2041+
bucket_definitions:
2042+
global:
2043+
data:
2044+
- SELECT id, description FROM test
2045+
2046+
config:
2047+
edition: 2
2048+
`
2049+
});
2050+
const bucketStorage = factory.getInstance(syncRules);
2051+
await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
2052+
await batch.save(testValue);
2053+
await batch.commit('1/2');
2054+
});
2055+
const { checkpoint } = await bucketStorage.getCheckpoint();
2056+
const batch = await test_utils.fromAsync(
2057+
bucketStorage.getBucketDataBatch(checkpoint, new Map([['2#global[]', 0n]]))
2058+
);
2059+
expect(batch[0].chunkData.data).toMatchObject([
2060+
{
2061+
data: '{"id":"t1","description":"2025-08-28T11:30:00"}'
2062+
}
2063+
]);
2064+
});
19982065
}

packages/service-core/src/storage/BucketStorage.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ export enum SyncRuleState {
3939
export const DEFAULT_DOCUMENT_BATCH_LIMIT = 1000;
4040
export const DEFAULT_DOCUMENT_CHUNK_LIMIT_BYTES = 1 * 1024 * 1024;
4141

42-
export function mergeToast(record: ToastableSqliteRow, persisted: ToastableSqliteRow): ToastableSqliteRow {
43-
const newRecord: ToastableSqliteRow = {};
42+
export function mergeToast<V>(record: ToastableSqliteRow<V>, persisted: ToastableSqliteRow<V>): ToastableSqliteRow<V> {
43+
const newRecord: ToastableSqliteRow<V> = {};
4444
for (let key in record) {
4545
if (typeof record[key] == 'undefined') {
4646
newRecord[key] = persisted[key];

packages/sync-rules/src/SqlBucketDescriptor.ts

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,7 @@ export class SqlBucketDescriptor implements BucketSource {
3737
name: string;
3838
bucketParameters?: string[];
3939

40-
constructor(
41-
name: string,
42-
private readonly compatibility: CompatibilityContext
43-
) {
40+
constructor(name: string) {
4441
this.name = name;
4542
}
4643

@@ -61,11 +58,11 @@ export class SqlBucketDescriptor implements BucketSource {
6158

6259
parameterIdSequence = new IdSequence();
6360

64-
addDataQuery(sql: string, options: SyncRulesOptions): QueryParseResult {
61+
addDataQuery(sql: string, options: SyncRulesOptions, compatibility: CompatibilityContext): QueryParseResult {
6562
if (this.bucketParameters == null) {
6663
throw new Error('Bucket parameters must be defined');
6764
}
68-
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParameters, sql, options, this.compatibility);
65+
const dataRows = SqlDataQuery.fromSql(this.name, this.bucketParameters, sql, options, compatibility);
6966

7067
this.dataQueries.push(dataRows);
7168

@@ -105,13 +102,7 @@ export class SqlBucketDescriptor implements BucketSource {
105102
continue;
106103
}
107104

108-
results.push(
109-
...query.evaluateRow(
110-
options.sourceTable,
111-
applyRowContext(options.record, this.compatibility),
112-
options.bucketIdTransformer
113-
)
114-
);
105+
results.push(...query.evaluateRow(options.sourceTable, options.record, options.bucketIdTransformer));
115106
}
116107
return results;
117108
}

packages/sync-rules/src/SqlSyncRules.ts

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,13 +22,17 @@ import {
2222
RequestParameters,
2323
SourceSchema,
2424
SqliteInputRow,
25+
SqliteInputValue,
2526
SqliteJsonRow,
27+
SqliteRow,
28+
SqliteValue,
2629
StreamParseOptions,
2730
SyncRules
2831
} from './types.js';
2932
import { BucketSource } from './BucketSource.js';
3033
import { syncStreamFromSql } from './streams/from_sql.js';
3134
import { CompatibilityContext, CompatibilityEdition, CompatibilityOption } from './compatibility.js';
35+
import { applyRowContext } from './utils.js';
3236

3337
const ACCEPT_POTENTIALLY_DANGEROUS_QUERIES = Symbol('ACCEPT_POTENTIALLY_DANGEROUS_QUERIES');
3438

@@ -217,7 +221,7 @@ export class SqlSyncRules implements SyncRules {
217221
const parameters = value.get('parameters', true) as unknown;
218222
const dataQueries = value.get('data', true) as unknown;
219223

220-
const descriptor = new SqlBucketDescriptor(key, compatibility);
224+
const descriptor = new SqlBucketDescriptor(key);
221225

222226
if (parameters instanceof Scalar) {
223227
rules.withScalar(parameters, (q) => {
@@ -239,7 +243,7 @@ export class SqlSyncRules implements SyncRules {
239243
}
240244
for (let query of dataQueries.items) {
241245
rules.withScalar(query, (q) => {
242-
return descriptor.addDataQuery(q, queryOptions);
246+
return descriptor.addDataQuery(q, queryOptions, compatibility);
243247
});
244248
}
245249
rules.bucketSources.push(descriptor);
@@ -382,6 +386,12 @@ export class SqlSyncRules implements SyncRules {
382386
this.content = content;
383387
}
384388

389+
applyRowContext<MaybeToast extends undefined = never>(
390+
source: SqliteRow<SqliteInputValue | MaybeToast>
391+
): SqliteRow<SqliteValue | MaybeToast> {
392+
return applyRowContext(source, this.compatibility);
393+
}
394+
385395
/**
386396
* Throws errors.
387397
*/

packages/sync-rules/src/events/SqlEventDescriptor.ts

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,7 @@ export class SqlEventDescriptor {
5151
};
5252
}
5353

54-
return matchingQuery.evaluateRowWithErrors(
55-
options.sourceTable,
56-
applyRowContext(options.record, this.compatibility)
57-
);
54+
return matchingQuery.evaluateRowWithErrors(options.sourceTable, options.record);
5855
}
5956

6057
getSourceTables(): Set<TablePattern> {

packages/sync-rules/src/streams/from_sql.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -100,8 +100,7 @@ class SyncStreamCompiler {
100100

101101
const stream = new SyncStream(
102102
this.descriptorName,
103-
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable)),
104-
this.options.compatibility
103+
new BaseSqlDataQuery(this.compileDataQuery(tools, query, alias, sourceTable))
105104
);
106105
stream.subscribedToByDefault = this.options.auto_subscribe ?? false;
107106
if (filter.isValid(tools)) {

packages/sync-rules/src/streams/stream.ts

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,7 @@ export class SyncStream implements BucketSource {
2727
variants: StreamVariant[];
2828
data: BaseSqlDataQuery;
2929

30-
constructor(
31-
name: string,
32-
data: BaseSqlDataQuery,
33-
private readonly compatibility: CompatibilityContext
34-
) {
30+
constructor(name: string, data: BaseSqlDataQuery) {
3531
this.name = name;
3632
this.subscribedToByDefault = false;
3733
this.priority = DEFAULT_BUCKET_PRIORITY;
@@ -172,15 +168,14 @@ export class SyncStream implements BucketSource {
172168
}
173169

174170
const stream = this;
175-
const mappedRow = applyRowContext(options.record, this.compatibility);
176171
const row: TableRow = {
177172
sourceTable: options.sourceTable,
178-
record: mappedRow
173+
record: options.record
179174
};
180175

181176
return this.data.evaluateRowWithOptions({
182177
table: options.sourceTable,
183-
row: applyRowContext(options.record, this.compatibility),
178+
row: options.record,
184179
bucketIds() {
185180
const bucketIds: string[] = [];
186181
for (const variant of stream.variants) {

packages/sync-rules/src/types.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ export type SqliteInputRow = SqliteRow<SqliteInputValue>;
218218
*
219219
* Toasted values are `undefined`.
220220
*/
221-
export type ToastableSqliteRow = SqliteRow<SqliteInputValue | undefined>;
221+
export type ToastableSqliteRow<V = SqliteInputValue> = SqliteRow<V | undefined>;
222222

223223
/**
224224
* A value as received from the database.
@@ -297,7 +297,7 @@ export interface InputParameter {
297297
*/
298298
export type BucketIdTransformer = (regularId: string) => string;
299299

300-
export interface EvaluateRowOptions extends TableRow<SqliteInputRow> {
300+
export interface EvaluateRowOptions extends TableRow {
301301
bucketIdTransformer: BucketIdTransformer;
302302
}
303303

0 commit comments

Comments
 (0)