Skip to content

Commit d5ddee0

Browse files
committed
Use iso8601 timestamps as an opt-in
1 parent ee3c3f2 commit d5ddee0

File tree

25 files changed

+460
-62
lines changed

25 files changed

+460
-62
lines changed

modules/module-mysql/src/api/MySQLRouteAPIAdapter.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,10 @@ export class MySQLRouteAPIAdapter implements api.RouteAPI {
102102
*/
103103
return fields.map((c) => {
104104
const value = row[c.name];
105-
const sqlValue = sync_rules.toSyncRulesValue(value);
105+
const sqlValue = sync_rules.applyValueContext(
106+
sync_rules.toSyncRulesValue(value),
107+
sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY
108+
);
106109
if (typeof sqlValue == 'bigint') {
107110
return Number(value);
108111
} else if (value instanceof Date) {

modules/module-mysql/src/common/mysql-to-sqlite.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,10 @@ export function toColumnDescriptorFromDefinition(column: ColumnDefinition): Colu
103103
};
104104
}
105105

106-
export function toSQLiteRow(row: Record<string, any>, columns: Map<string, ColumnDescriptor>): sync_rules.SqliteRow {
106+
export function toSQLiteRow(
107+
row: Record<string, any>,
108+
columns: Map<string, ColumnDescriptor>
109+
): sync_rules.SqliteInputRow {
107110
let result: sync_rules.DatabaseInputRow = {};
108111
for (let key in row) {
109112
// We are very much expecting the column to be there

modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,10 @@ export class PostgresRouteAPIAdapter implements api.RouteAPI {
105105
columns: result.columns.map((c) => c.name),
106106
rows: result.rows.map((row) => {
107107
return row.map((value) => {
108-
const sqlValue = sync_rules.toSyncRulesValue(value);
108+
const sqlValue = sync_rules.applyValueContext(
109+
sync_rules.toSyncRulesValue(value),
110+
sync_rules.CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY
111+
);
109112
if (typeof sqlValue == 'bigint') {
110113
return Number(value);
111114
} else if (sync_rules.isJsonValue(sqlValue)) {

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,15 @@ import {
2020
storage
2121
} from '@powersync/service-core';
2222
import * as pgwire from '@powersync/service-jpgwire';
23-
import { DatabaseInputRow, SqliteRow, SqlSyncRules, TablePattern, toSyncRulesRow } from '@powersync/service-sync-rules';
23+
import {
24+
applyValueContext,
25+
CompatibilityContext,
26+
DatabaseInputRow,
27+
SqliteInputRow,
28+
SqlSyncRules,
29+
TablePattern,
30+
toSyncRulesRow
31+
} from '@powersync/service-sync-rules';
2432
import * as pg_utils from '../utils/pgwire_utils.js';
2533

2634
import { PgManager } from './PgManager.js';
@@ -500,7 +508,7 @@ WHERE oid = $1::regclass`,
500508
await sendKeepAlive(db);
501509
}
502510

503-
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteRow> {
511+
static *getQueryData(results: Iterable<DatabaseInputRow>): Generator<SqliteInputRow> {
504512
for (let row of results) {
505513
yield toSyncRulesRow(row);
506514
}
@@ -885,7 +893,8 @@ WHERE oid = $1::regclass`,
885893
// The key should always be present in the "after" record.
886894
return;
887895
}
888-
key[name] = value;
896+
// We just need a consistent representation of the primary key, and don't care about fixed quirks.
897+
key[name] = applyValueContext(value, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY);
889898
}
890899
resnapshot.push({
891900
table: record.sourceTable,

modules/module-postgres/src/utils/pgwire_utils.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,13 @@
11
// Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218
22

33
import * as pgwire from '@powersync/service-jpgwire';
4-
import { DatabaseInputRow, SqliteRow, toSyncRulesRow } from '@powersync/service-sync-rules';
4+
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
55

66
/**
77
* pgwire message -> SQLite row.
88
* @param message
99
*/
10-
export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.PgoutputUpdate): SqliteRow {
10+
export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.PgoutputUpdate): SqliteInputRow {
1111
const rawData = (message as any).afterRaw;
1212

1313
const record = decodeTuple(message.relation, rawData);
@@ -18,7 +18,9 @@ export function constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.Pgo
1818
* pgwire message -> SQLite row.
1919
* @param message
2020
*/
21-
export function constructBeforeRecord(message: pgwire.PgoutputDelete | pgwire.PgoutputUpdate): SqliteRow | undefined {
21+
export function constructBeforeRecord(
22+
message: pgwire.PgoutputDelete | pgwire.PgoutputUpdate
23+
): SqliteInputRow | undefined {
2224
const rawData = (message as any).beforeRaw;
2325
if (rawData == null) {
2426
return undefined;

modules/module-postgres/test/src/pg_test.test.ts

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import { constructAfterRecord } from '@module/utils/pgwire_utils.js';
22
import * as pgwire from '@powersync/service-jpgwire';
3-
import { SqliteRow } from '@powersync/service-sync-rules';
3+
import { CustomSqliteType, SqliteInputRow, SqliteRow, TimeValue } from '@powersync/service-sync-rules';
44
import { describe, expect, test } from 'vitest';
55
import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js';
66
import { WalStream } from '@module/replication/WalStream.js';
@@ -159,8 +159,8 @@ VALUES(10, ARRAY['null']::TEXT[]);
159159
id: 3n,
160160
date: '2023-03-06',
161161
time: '15:47:00',
162-
timestamp: '2023-03-06 15:47:00',
163-
timestamptz: '2023-03-06 13:47:00Z'
162+
timestamp: new TimeValue('2023-03-06 15:47:00', '2023-03-06T15:47:00'),
163+
timestamptz: new TimeValue('2023-03-06 13:47:00Z', '2023-03-06T13:47:00Z')
164164
});
165165

166166
expect(transformed[3]).toMatchObject({
@@ -176,25 +176,25 @@ VALUES(10, ARRAY['null']::TEXT[]);
176176
id: 5n,
177177
date: '0000-01-01',
178178
time: '00:00:00',
179-
timestamp: '0000-01-01 00:00:00',
180-
timestamptz: '0000-01-01 00:00:00Z'
179+
timestamp: new TimeValue('0000-01-01 00:00:00', '0000-01-01T00:00:00'),
180+
timestamptz: new TimeValue('0000-01-01 00:00:00Z', '0000-01-01T00:00:00Z')
181181
});
182182

183183
expect(transformed[5]).toMatchObject({
184184
id: 6n,
185-
timestamp: '1970-01-01 00:00:00',
186-
timestamptz: '1970-01-01 00:00:00Z'
185+
timestamp: new TimeValue('1970-01-01 00:00:00', '1970-01-01T00:00:00'),
186+
timestamptz: new TimeValue('1970-01-01 00:00:00Z', '1970-01-01T00:00:00Z')
187187
});
188188

189189
expect(transformed[6]).toMatchObject({
190190
id: 7n,
191-
timestamp: '9999-12-31 23:59:59',
192-
timestamptz: '9999-12-31 23:59:59Z'
191+
timestamp: new TimeValue('9999-12-31 23:59:59', '9999-12-31T23:59:59'),
192+
timestamptz: new TimeValue('9999-12-31 23:59:59Z', '9999-12-31T23:59:59Z')
193193
});
194194

195195
expect(transformed[7]).toMatchObject({
196196
id: 8n,
197-
timestamptz: '0022-02-03 09:13:14Z'
197+
timestamptz: new TimeValue('0022-02-03 09:13:14Z', '0022-02-03T09:13:14Z')
198198
});
199199

200200
expect(transformed[8]).toMatchObject({
@@ -235,8 +235,11 @@ VALUES(10, ARRAY['null']::TEXT[]);
235235
id: 3n,
236236
date: `["2023-03-06"]`,
237237
time: `["15:47:00"]`,
238-
timestamp: `["2023-03-06 15:47:00"]`,
239-
timestamptz: `["2023-03-06 13:47:00Z","2023-03-06 13:47:00.12345Z"]`
238+
timestamp: CustomSqliteType.wrapArray([new TimeValue('2023-03-06 15:47:00', '2023-03-06T15:47:00')]),
239+
timestamptz: CustomSqliteType.wrapArray([
240+
new TimeValue('2023-03-06 13:47:00Z', '2023-03-06T13:47:00Z'),
241+
new TimeValue('2023-03-06 13:47:00.12345Z', '2023-03-06T13:47:00.12345Z')
242+
])
240243
});
241244

242245
expect(transformed[3]).toMatchObject({
@@ -436,7 +439,7 @@ VALUES(10, ARRAY['null']::TEXT[]);
436439
* Return all the inserts from the first transaction in the replication stream.
437440
*/
438441
async function getReplicationTx(replicationStream: pgwire.ReplicationStream) {
439-
let transformed: SqliteRow[] = [];
442+
let transformed: SqliteInputRow[] = [];
440443
for await (const batch of replicationStream.pgoutputDecode()) {
441444
for (const msg of batch.messages) {
442445
if (msg.tag == 'insert') {

packages/jpgwire/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
},
2020
"dependencies": {
2121
"@powersync/service-jsonbig": "workspace:^",
22+
"@powersync/service-sync-rules": "workspace:^",
2223
"date-fns": "^4.1.0",
2324
"pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87"
2425
}

packages/jpgwire/src/pgwire_types.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
// Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218
22

33
import { JsonContainer } from '@powersync/service-jsonbig';
4+
import { type DatabaseInputValue } from '@powersync/service-sync-rules';
45
import { dateToSqlite, lsnMakeComparable, timestampToSqlite, timestamptzToSqlite } from './util.js';
56

67
export enum PgTypeOid {
@@ -105,7 +106,7 @@ export class PgType {
105106
return ELEM_OID_TO_ARRAY.get(typeOid);
106107
}
107108

108-
static decode(text: string, typeOid: number) {
109+
static decode(text: string, typeOid: number): DatabaseInputValue {
109110
switch (typeOid) {
110111
// add line here when register new type
111112
case PgTypeOid.TEXT:
@@ -178,7 +179,14 @@ export class PgType {
178179
const decoded = this.decode(unescaped, elemTypeOid);
179180
stack[0].push(decoded);
180181
}
181-
result = ch == 0x7d /*}*/ && stack.shift();
182+
183+
if (ch == 0x7d /*}*/) {
184+
const entry = stack.shift();
185+
result = entry;
186+
} else {
187+
result = false;
188+
}
189+
182190
elStart = i + 1; // TODO dry
183191
}
184192
}

packages/jpgwire/src/util.ts

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import { DEFAULT_CERTS } from './certs.js';
55
import * as pgwire from './pgwire.js';
66
import { PgType } from './pgwire_types.js';
77
import { ConnectOptions } from './socket_adapter.js';
8+
import { DatabaseInputValue, TimeValue } from '@powersync/service-sync-rules';
89

910
// TODO this is duplicated, but maybe that is ok
1011
export interface NormalizedConnectionConfig {
@@ -132,7 +133,7 @@ export async function connectPgWire(
132133
return connection;
133134
}
134135

135-
function _recvDataRow(this: any, _message: any, row: Uint8Array[], batch: any) {
136+
function _recvDataRow(this: any, _message: any, row: (Uint8Array | DatabaseInputValue)[], batch: any) {
136137
for (let i = 0; i < this._rowColumns.length; i++) {
137138
const valbuf = row[i];
138139
if (valbuf == null) {
@@ -232,17 +233,17 @@ export function lsnMakeComparable(text: string) {
232233
*
233234
* We have specific exceptions for -infinity and infinity.
234235
*/
235-
export function timestamptzToSqlite(source?: string) {
236+
export function timestamptzToSqlite(source?: string): TimeValue | null {
236237
if (source == null) {
237238
return null;
238239
}
239240
// Make compatible with SQLite
240241
const match = /^([\d\-]+) ([\d:]+)(\.\d+)?([+-][\d:]+)$/.exec(source);
241242
if (match == null) {
242243
if (source == 'infinity') {
243-
return '9999-12-31 23:59:59Z';
244+
return new TimeValue('9999-12-31 23:59:59Z', '9999-12-31T23:59:59Z');
244245
} else if (source == '-infinity') {
245-
return '0000-01-01 00:00:00Z';
246+
return new TimeValue('0000-01-01 00:00:00Z', '0000-01-01T00:00:00Z');
246247
} else {
247248
return null;
248249
}
@@ -256,9 +257,11 @@ export function timestamptzToSqlite(source?: string) {
256257
if (isNaN(parsed.getTime())) {
257258
return null;
258259
}
259-
const text = parsed.toISOString().replace('T', ' ').replace('.000', '').replace('Z', '');
260260

261-
return `${text}${precision ?? ''}Z`;
261+
const baseValue = parsed.toISOString().replace('.000', '').replace('Z', '');
262+
const baseText = `${baseValue}${precision ?? ''}Z`;
263+
264+
return new TimeValue(baseText.replace('T', ' '), baseText);
262265
}
263266

264267
/**
@@ -268,16 +271,16 @@ export function timestamptzToSqlite(source?: string) {
268271
*
269272
* https://www.postgresql.org/docs/current/datatype-datetime.html#DATATYPE-DATETIME-SPECIAL-VALUES
270273
*/
271-
export function timestampToSqlite(source?: string) {
274+
export function timestampToSqlite(source?: string): TimeValue | null {
272275
if (source == null) {
273276
return null;
274277
}
275278
if (source == 'infinity') {
276-
return '9999-12-31 23:59:59';
279+
return new TimeValue('9999-12-31 23:59:59', '9999-12-31T23:59:59');
277280
} else if (source == '-infinity') {
278-
return '0000-01-01 00:00:00';
281+
return new TimeValue('0000-01-01 00:00:00', '0000-01-01T00:00:00');
279282
} else {
280-
return source;
283+
return new TimeValue(source, source.replace(' ', 'T'));
281284
}
282285
}
283286
/**

packages/service-core/src/storage/BucketStorageBatch.ts

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
import { ObserverClient } from '@powersync/lib-services-framework';
2-
import { EvaluatedParameters, EvaluatedRow, SqliteRow, ToastableSqliteRow } from '@powersync/service-sync-rules';
2+
import {
3+
EvaluatedParameters,
4+
EvaluatedRow,
5+
SqliteInputRow,
6+
SqliteRow,
7+
ToastableSqliteRow
8+
} from '@powersync/service-sync-rules';
39
import { BSON } from 'bson';
410
import { ReplicationEventPayload } from './ReplicationEventPayload.js';
511
import { SourceTable, TableSnapshotStatus } from './SourceTable.js';
@@ -132,7 +138,7 @@ export interface SaveInsert {
132138
sourceTable: SourceTable;
133139
before?: undefined;
134140
beforeReplicaId?: undefined;
135-
after: SqliteRow;
141+
after: SqliteInputRow;
136142
afterReplicaId: ReplicaId;
137143
}
138144

@@ -143,7 +149,7 @@ export interface SaveUpdate {
143149
/**
144150
* This is only present when the id has changed, and will only contain replica identity columns.
145151
*/
146-
before?: SqliteRow;
152+
before?: SqliteInputRow;
147153
beforeReplicaId?: ReplicaId;
148154

149155
/**
@@ -158,7 +164,7 @@ export interface SaveUpdate {
158164
export interface SaveDelete {
159165
tag: SaveOperationTag.DELETE;
160166
sourceTable: SourceTable;
161-
before?: SqliteRow;
167+
before?: SqliteInputRow;
162168
beforeReplicaId: ReplicaId;
163169
after?: undefined;
164170
afterReplicaId?: undefined;

0 commit comments

Comments
 (0)