Skip to content

Commit fe4cdb3

Browse files
committed
Fix mongodb as well
1 parent d5ddee0 commit fe4cdb3

File tree

8 files changed

+48
-25
lines changed

8 files changed

+48
-25
lines changed

modules/module-mongodb/src/replication/ChangeStream.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
SourceTable,
1818
storage
1919
} from '@powersync/service-core';
20-
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
20+
import { DatabaseInputRow, SqliteInputRow, SqliteRow, SqlSyncRules, TablePattern } from '@powersync/service-sync-rules';
2121
import { ReplicationMetric } from '@powersync/service-types';
2222
import { MongoLSN } from '../common/MongoLSN.js';
2323
import { PostImagesOption } from '../types/types.js';
@@ -439,7 +439,7 @@ export class ChangeStream {
439439
return { $match: { ns: { $in: $inFilters } }, multipleDatabases };
440440
}
441441

442-
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteRow> {
442+
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
443443
for (let row of results) {
444444
yield constructAfterRecord(row);
445445
}

modules/module-mongodb/src/replication/MongoRelation.ts

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
22
import { storage } from '@powersync/service-core';
33
import { JSONBig, JsonContainer } from '@powersync/service-jsonbig';
4-
import { SqliteRow, SqliteValue } from '@powersync/service-sync-rules';
4+
import {
5+
CustomSqliteType,
6+
SqliteInputRow,
7+
SqliteInputValue,
8+
SqliteRow,
9+
SqliteValue,
10+
TimeValue
11+
} from '@powersync/service-sync-rules';
512

613
import { ErrorCode, ServiceError } from '@powersync/lib-services-framework';
714
import { MongoLSN } from '../common/MongoLSN.js';
@@ -27,15 +34,15 @@ export function getCacheIdentifier(source: storage.SourceEntityDescriptor | stor
2734
return `${source.schema}.${source.name}`;
2835
}
2936

30-
export function constructAfterRecord(document: mongo.Document): SqliteRow {
31-
let record: SqliteRow = {};
37+
export function constructAfterRecord(document: mongo.Document): SqliteInputRow {
38+
let record: SqliteInputRow = {};
3239
for (let key of Object.keys(document)) {
3340
record[key] = toMongoSyncRulesValue(document[key]);
3441
}
3542
return record;
3643
}
3744

38-
export function toMongoSyncRulesValue(data: any): SqliteValue {
45+
export function toMongoSyncRulesValue(data: any): SqliteInputValue {
3946
const autoBigNum = true;
4047
if (data === null) {
4148
return null;
@@ -60,7 +67,8 @@ export function toMongoSyncRulesValue(data: any): SqliteValue {
6067
} else if (data instanceof mongo.UUID) {
6168
return data.toHexString();
6269
} else if (data instanceof Date) {
63-
return data.toISOString().replace('T', ' ');
70+
const isoString = data.toISOString();
71+
return new TimeValue(isoString.replace('T', ' '), isoString);
6472
} else if (data instanceof mongo.Binary) {
6573
return new Uint8Array(data.buffer);
6674
} else if (data instanceof mongo.Long) {
@@ -72,8 +80,7 @@ export function toMongoSyncRulesValue(data: any): SqliteValue {
7280
} else if (data instanceof RegExp) {
7381
return JSON.stringify({ pattern: data.source, options: data.flags });
7482
} else if (Array.isArray(data)) {
75-
// We may be able to avoid some parse + stringify cycles here for JsonSqliteContainer.
76-
return JSONBig.stringify(data.map((element) => filterJsonData(element)));
83+
return CustomSqliteType.wrapArray(data.map((element) => filterJsonData(element)));
7784
} else if (data instanceof Uint8Array) {
7885
return data;
7986
} else if (data instanceof JsonContainer) {
@@ -117,7 +124,8 @@ function filterJsonData(data: any, depth = 0): any {
117124
} else if (typeof data == 'bigint') {
118125
return data;
119126
} else if (data instanceof Date) {
120-
return data.toISOString().replace('T', ' ');
127+
const isoString = data.toISOString();
128+
return new TimeValue(isoString.replace('T', ' '), isoString);
121129
} else if (data instanceof mongo.ObjectId) {
122130
return data.toHexString();
123131
} else if (data instanceof mongo.UUID) {
@@ -133,7 +141,7 @@ function filterJsonData(data: any, depth = 0): any {
133141
} else if (data instanceof RegExp) {
134142
return { pattern: data.source, options: data.flags };
135143
} else if (Array.isArray(data)) {
136-
return data.map((element) => filterJsonData(element, depth + 1));
144+
return CustomSqliteType.wrapArray(data.map((element) => filterJsonData(element, depth + 1)));
137145
} else if (ArrayBuffer.isView(data)) {
138146
return undefined;
139147
} else if (data instanceof JsonContainer) {

modules/module-mongodb/test/src/mongo_test.test.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { mongo } from '@powersync/lib-service-mongodb';
2-
import { SqliteRow, SqlSyncRules } from '@powersync/service-sync-rules';
2+
import { CustomSqliteType, SqliteInputRow, SqliteRow, SqlSyncRules, TimeValue } from '@powersync/service-sync-rules';
33
import { describe, expect, test } from 'vitest';
44

55
import { MongoRouteAPIAdapter } from '@module/api/MongoRouteAPIAdapter.js';
@@ -159,7 +159,7 @@ describe('mongo data types', () => {
159159

160160
expect(transformed[2]).toMatchObject({
161161
_id: 3n,
162-
date: '2023-03-06 13:47:00.000Z'
162+
date: new TimeValue('2023-03-06 13:47:00.000Z', '2023-03-06T13:47:00.000Z')
163163
});
164164

165165
expect(transformed[3]).toMatchObject({
@@ -211,7 +211,7 @@ describe('mongo data types', () => {
211211

212212
expect(transformed[2]).toMatchObject({
213213
_id: 3n,
214-
date: '["2023-03-06 13:47:00.000Z"]'
214+
date: CustomSqliteType.wrapArray([new TimeValue('2023-03-06 13:47:00.000Z', '2023-03-06T13:47:00.000Z')])
215215
});
216216

217217
expect(transformed[3]).toMatchObject({
@@ -528,7 +528,7 @@ bucket_definitions:
528528
* Return all the inserts from the first transaction in the replication stream.
529529
*/
530530
async function getReplicationTx(replicationStream: mongo.ChangeStream, count: number) {
531-
let transformed: SqliteRow[] = [];
531+
let transformed: SqliteInputRow[] = [];
532532
for await (const doc of replicationStream) {
533533
// Specifically filter out map_input / map_output collections
534534
if (!(doc as any)?.ns?.coll?.startsWith('test_data')) {

packages/sync-rules/src/ExpressionType.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ export const TYPE_REAL = 8;
66

77
export type SqliteType = 'null' | 'blob' | 'text' | 'integer' | 'real' | 'numeric';
88

9+
export type SqliteValueType = 'null' | 'blob' | 'text' | 'integer' | 'real';
10+
911
export interface ColumnDefinition {
1012
name: string;
1113
type: ExpressionType;

packages/sync-rules/src/sql_functions.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
import { JSONBig } from '@powersync/service-jsonbig';
22
import { SQLITE_FALSE, SQLITE_TRUE, sqliteBool, sqliteNot } from './sql_support.js';
3-
import { SqliteRow, SqliteValue } from './types.js';
3+
import { SqliteInputValue, SqliteValue } from './types.js';
44
import { jsonValueToSqlite } from './utils.js';
55
// Declares @syncpoint/wkx module
66
// This allows for consumers of this lib to resolve types correctly
77
/// <reference types="./wkx.d.ts" />
88
import wkx from '@syncpoint/wkx';
9-
import { ExpressionType, SqliteType, TYPE_INTEGER } from './ExpressionType.js';
9+
import { ExpressionType, SqliteType, SqliteValueType, TYPE_INTEGER } from './ExpressionType.js';
1010
import * as uuid from 'uuid';
11+
import { CustomSqliteType } from './types/custom_sqlite_type.js';
1112

1213
export const BASIC_OPERATORS = new Set<string>([
1314
'=',
@@ -635,7 +636,7 @@ export function cast(value: SqliteValue, to: string) {
635636
}
636637
}
637638

638-
export function sqliteTypeOf(arg: SqliteValue) {
639+
export function sqliteTypeOf(arg: SqliteInputValue): SqliteValueType {
639640
if (arg == null) {
640641
return 'null';
641642
} else if (typeof arg == 'string') {
@@ -646,6 +647,8 @@ export function sqliteTypeOf(arg: SqliteValue) {
646647
return 'real';
647648
} else if (arg instanceof Uint8Array) {
648649
return 'blob';
650+
} else if (arg instanceof CustomSqliteType) {
651+
return arg.sqliteType;
649652
} else {
650653
// Should not happen
651654
throw new Error(`Unknown type: ${arg}`);

packages/sync-rules/src/types/custom_sqlite_type.ts

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { JSONBig } from '@powersync/service-jsonbig';
22
import { CompatibilityContext } from '../quirks.js';
33
import { SqliteValue, EvaluatedRow, SqliteInputValue, DatabaseInputValue } from '../types.js';
4-
import { filterJsonData } from '../utils.js';
4+
import { SqliteValueType } from '../ExpressionType.js';
55

66
/**
77
* A value that decays into a {@link SqliteValue} in a context-specific way.
@@ -23,14 +23,16 @@ export abstract class CustomSqliteType {
2323
*/
2424
abstract toSqliteValue(context: CompatibilityContext): SqliteValue;
2525

26+
abstract get sqliteType(): SqliteValueType;
27+
2628
static wrapArray(elements: DatabaseInputValue[]): SqliteInputValue {
2729
const hasCustomValue = elements.some((v) => v instanceof CustomSqliteType);
2830
if (hasCustomValue) {
2931
// We need access to the compatibility context before encoding contents as JSON.
3032
return new CustomArray(elements);
3133
} else {
3234
// We can encode the array statically.
33-
return JSONBig.stringify(elements.map((element) => filterJsonData(element)));
35+
return JSONBig.stringify(elements);
3436
}
3537
}
3638
}
@@ -40,11 +42,14 @@ class CustomArray extends CustomSqliteType {
4042
super();
4143
}
4244

45+
get sqliteType(): SqliteValueType {
46+
return 'text';
47+
}
48+
4349
toSqliteValue(context: CompatibilityContext): SqliteValue {
4450
return JSONBig.stringify(
4551
this.elements.map((element) => {
46-
const mapped = element instanceof CustomSqliteType ? element.toSqliteValue(context) : element;
47-
return filterJsonData(mapped);
52+
return element instanceof CustomSqliteType ? element.toSqliteValue(context) : element;
4853
})
4954
);
5055
}

packages/sync-rules/src/types/time.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { SqliteValueType } from '../ExpressionType.js';
12
import { CompatibilityContext, Quirk } from '../quirks.js';
23
import { CustomSqliteType } from './custom_sqlite_type.js';
34

@@ -20,6 +21,10 @@ export class TimeValue extends CustomSqliteType {
2021
this.iso8601Representation = iso8601Representation;
2122
}
2223

24+
get sqliteType(): SqliteValueType {
25+
return 'text';
26+
}
27+
2328
toSqliteValue(context: CompatibilityContext) {
2429
return context.isFixed(Quirk.nonIso8601Timestampts) ? this.iso8601Representation : this.legacyRepresentation;
2530
}

packages/sync-rules/src/utils.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ export function isJsonValue(value: SqliteValue): value is SqliteJsonValue {
6969
return value == null || typeof value == 'string' || typeof value == 'number' || typeof value == 'bigint';
7070
}
7171

72-
export function filterJsonData(data: any, depth = 0): any {
72+
function filterJsonData(data: any, depth = 0): any {
7373
if (depth > DEPTH_LIMIT) {
7474
// This is primarily to prevent infinite recursion
7575
// TODO: Proper error class
@@ -88,7 +88,7 @@ export function filterJsonData(data: any, depth = 0): any {
8888
} else if (typeof data == 'bigint') {
8989
return data;
9090
} else if (Array.isArray(data)) {
91-
return data.map((element) => filterJsonData(element, depth + 1));
91+
return CustomSqliteType.wrapArray(data.map((element) => filterJsonData(element, depth + 1)));
9292
} else if (ArrayBuffer.isView(data)) {
9393
return undefined;
9494
} else if (data instanceof JsonContainer) {
@@ -158,7 +158,7 @@ export function toSyncRulesValue(
158158
} else if (typeof data == 'boolean') {
159159
return data ? SQLITE_TRUE : SQLITE_FALSE;
160160
} else if (Array.isArray(data)) {
161-
return CustomSqliteType.wrapArray(data);
161+
return CustomSqliteType.wrapArray(data.map(filterJsonData));
162162
} else if (data instanceof Uint8Array || data instanceof CustomSqliteType) {
163163
return data;
164164
} else if (data instanceof JsonContainer) {

0 commit comments

Comments
 (0)