Skip to content

Commit 9d654b9

Browse files
committed
Simplify
1 parent 32eedcb commit 9d654b9

File tree

7 files changed

+161
-186
lines changed

7 files changed

+161
-186
lines changed

modules/module-postgres/src/replication/PgManager.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ import * as pgwire from '@powersync/service-jpgwire';
22
import semver from 'semver';
33
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
44
import { getApplicationName } from '../utils/application-name.js';
5-
import { PostgresTypeCache } from '../types/custom.js';
5+
import { PostgresTypeCache } from '../types/cache.js';
66

77
/**
88
* Shorter timeout for snapshot connections than for replication connections.

modules/module-postgres/src/replication/PgRelation.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,3 +30,12 @@ export function getPgOutputRelation(source: PgoutputRelation): storage.SourceEnt
3030
replicaIdColumns: getReplicaIdColumns(source)
3131
} satisfies storage.SourceEntityDescriptor;
3232
}
33+
34+
export function referencedColumnTypeIds(source: PgoutputRelation): number[] {
35+
const oids = new Set<number>();
36+
for (const column of source.columns) {
37+
oids.add(column.typeOid);
38+
}
39+
40+
return [...oids];
41+
}

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 45 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import {
3131
} from '@powersync/service-sync-rules';
3232

3333
import { PgManager } from './PgManager.js';
34-
import { getPgOutputRelation, getRelId } from './PgRelation.js';
34+
import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js';
3535
import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js';
3636
import { ReplicationMetric } from '@powersync/service-types';
3737
import {
@@ -188,28 +188,30 @@ export class WalStream {
188188

189189
let tableRows: any[];
190190
const prefix = tablePattern.isWildcard ? tablePattern.tablePrefix : undefined;
191-
if (tablePattern.isWildcard) {
192-
const result = await db.query({
193-
statement: `SELECT c.oid AS relid, c.relname AS table_name
191+
192+
{
193+
let query = `
194+
SELECT
195+
c.oid AS relid,
196+
c.relname AS table_name,
197+
(SELECT
198+
json_agg(DISTINCT a.atttypid)
199+
FROM pg_attribute a
200+
WHERE a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = c.oid)
201+
AS column_types
194202
FROM pg_class c
195203
JOIN pg_namespace n ON n.oid = c.relnamespace
196204
WHERE n.nspname = $1
197-
AND c.relkind = 'r'
198-
AND c.relname LIKE $2`,
199-
params: [
200-
{ type: 'varchar', value: schema },
201-
{ type: 'varchar', value: tablePattern.tablePattern }
202-
]
203-
});
204-
tableRows = pgwire.pgwireRows(result);
205-
} else {
205+
AND c.relkind = 'r'`;
206+
207+
if (tablePattern.isWildcard) {
208+
query += ' AND c.relname LIKE $2';
209+
} else {
210+
query += ' AND c.relname = $2';
211+
}
212+
206213
const result = await db.query({
207-
statement: `SELECT c.oid AS relid, c.relname AS table_name
208-
FROM pg_class c
209-
JOIN pg_namespace n ON n.oid = c.relnamespace
210-
WHERE n.nspname = $1
211-
AND c.relkind = 'r'
212-
AND c.relname = $2`,
214+
statement: query,
213215
params: [
214216
{ type: 'varchar', value: schema },
215217
{ type: 'varchar', value: tablePattern.tablePattern }
@@ -218,6 +220,7 @@ export class WalStream {
218220

219221
tableRows = pgwire.pgwireRows(result);
220222
}
223+
221224
let result: storage.SourceTable[] = [];
222225

223226
for (let row of tableRows) {
@@ -257,16 +260,18 @@ export class WalStream {
257260

258261
const cresult = await getReplicationIdentityColumns(db, relid);
259262

260-
const table = await this.handleRelation(
263+
const columnTypes = (JSON.parse(row.column_types) as string[]).map((e) => Number(e));
264+
const table = await this.handleRelation({
261265
batch,
262-
{
266+
descriptor: {
263267
name,
264268
schema,
265269
objectId: relid,
266270
replicaIdColumns: cresult.replicationColumns
267271
} as SourceEntityDescriptor,
268-
false
269-
);
272+
snapshot: false,
273+
referencedTypeIds: columnTypes
274+
});
270275

271276
result.push(table);
272277
}
@@ -672,7 +677,14 @@ WHERE oid = $1::regclass`,
672677
}
673678
}
674679

675-
async handleRelation(batch: storage.BucketStorageBatch, descriptor: SourceEntityDescriptor, snapshot: boolean) {
680+
async handleRelation(options: {
681+
batch: storage.BucketStorageBatch;
682+
descriptor: SourceEntityDescriptor;
683+
snapshot: boolean;
684+
referencedTypeIds: number[];
685+
}) {
686+
const { batch, descriptor, snapshot, referencedTypeIds } = options;
687+
676688
if (!descriptor.objectId && typeof descriptor.objectId != 'number') {
677689
throw new ReplicationAssertionError(`objectId expected, got ${typeof descriptor.objectId}`);
678690
}
@@ -698,6 +710,9 @@ WHERE oid = $1::regclass`,
698710
// Truncate this table, in case a previous snapshot was interrupted.
699711
await batch.truncate([result.table]);
700712

713+
// Ensure we have a description for custom types referenced in the table.
714+
await this.connections.types.fetchTypes(referencedTypeIds);
715+
701716
// Start the snapshot inside a transaction.
702717
// We use a dedicated connection for this.
703718
const db = await this.connections.snapshotConnection();
@@ -939,7 +954,12 @@ WHERE oid = $1::regclass`,
939954

940955
for (const msg of messages) {
941956
if (msg.tag == 'relation') {
942-
await this.handleRelation(batch, getPgOutputRelation(msg), true);
957+
await this.handleRelation({
958+
batch,
959+
descriptor: getPgOutputRelation(msg),
960+
snapshot: true,
961+
referencedTypeIds: referencedColumnTypeIds(msg)
962+
});
943963
} else if (msg.tag == 'begin') {
944964
// This may span multiple transactions in the same chunk, or even across chunks.
945965
skipKeepalive = true;

modules/module-postgres/src/types/custom.ts renamed to modules/module-postgres/src/types/cache.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ export class PostgresTypeCache {
1010
}
1111

1212
public async fetchTypes(oids: number[]) {
13-
let pending = oids.filter((id) => !(id in Object.values(pgwire.PgTypeOid)));
13+
let pending = oids.filter((id) => !this.registry.knows(id));
1414
// For details on columns, see https://www.postgresql.org/docs/current/catalog-pg-type.html
1515
const statement = `
1616
SELECT oid, t.typtype,

modules/module-postgres/src/types/registry.ts

Lines changed: 26 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import {
99
toSyncRulesValue
1010
} from '@powersync/service-sync-rules';
1111
import * as pgwire from '@powersync/service-jpgwire';
12+
import { JsonContainer } from '@powersync/service-jsonbig';
1213

1314
interface BaseType {
1415
sqliteType: () => SqliteValueType;
@@ -167,102 +168,34 @@ export class CustomTypeRegistry {
167168
return pgwire.PgType.decode(raw, oid);
168169
case 'domain':
169170
return this.decodeWithCustomTypes(raw, resolved.innerId);
170-
}
171-
172-
type StructureState = (ArrayType & { parsed: any[] }) | (CompositeType & { parsed: [string, any][] });
173-
const stateStack: StructureState[] = [];
174-
let pendingNestedStructure: ArrayType | CompositeType | null = resolved;
175-
176-
const pushParsedValue = (value: any) => {
177-
const top = stateStack[stateStack.length - 1];
178-
if (top.type == 'array') {
179-
top.parsed.push(value);
180-
} else {
181-
const nextMember = top.members[top.parsed.length];
182-
if (nextMember) {
183-
top.parsed.push([nextMember.name, value]);
184-
}
185-
}
186-
};
187-
188-
const resolveCurrentStructureTypeId = () => {
189-
const top = stateStack[stateStack.length - 1];
190-
if (top.type == 'array') {
191-
return top.innerId;
192-
} else {
193-
const nextMember = top.members[top.parsed.length];
194-
if (nextMember) {
195-
return nextMember.typeId;
196-
} else {
197-
return -1;
198-
}
199-
}
200-
};
201-
202-
let result: any;
203-
pgwire.decodeSequence({
204-
source: raw,
205-
delimiters: this.delimitersFor(resolved),
206-
listener: {
207-
onStructureStart: () => {
208-
stateStack.push({
209-
...pendingNestedStructure!,
210-
parsed: []
211-
});
212-
pendingNestedStructure = null;
213-
},
214-
onValue: (raw) => {
215-
pushParsedValue(raw == null ? null : this.decodeWithCustomTypes(raw, resolveCurrentStructureTypeId()));
216-
},
217-
onStructureEnd: () => {
218-
const top = stateStack.pop()!;
219-
// For arrays, pop the parsed array. For compounds, create an object from the key-value entries.
220-
const parsedValue = top.type == 'array' ? top.parsed : Object.fromEntries(top.parsed);
221-
222-
if (stateStack.length == 0) {
223-
// We have exited the outermost structure, parsedValue is the result.
224-
result = parsedValue;
225-
} else {
226-
// Add the result of parsing a nested structure to the current outer structure.
227-
pushParsedValue(parsedValue);
228-
}
229-
},
230-
maybeParseSubStructure: (firstChar: number) => {
231-
const top = stateStack[stateStack.length - 1];
232-
if (top.type == 'array' && firstChar == pgwire.CHAR_CODE_LEFT_BRACE) {
233-
// Postgres arrays are natively multidimensional - so if we're in an array, we can always parse sub-arrays
234-
// of the same type.
235-
pendingNestedStructure = top;
236-
return this.delimitersFor(top);
171+
case 'array':
172+
return pgwire.decodeArray({
173+
source: raw,
174+
decodeElement: (source) => this.decodeWithCustomTypes(source, resolved.innerId),
175+
delimiterCharCode: resolved.separatorCharCode
176+
});
177+
case 'composite': {
178+
const parsed: [string, any][] = [];
179+
180+
pgwire.decodeSequence({
181+
source: raw,
182+
delimiters: pgwire.COMPOSITE_DELIMITERS,
183+
listener: {
184+
onValue: (raw) => {
185+
const nextMember = resolved.members[parsed.length];
186+
if (nextMember) {
187+
const value = raw == null ? null : this.decodeWithCustomTypes(raw, nextMember.typeId);
188+
parsed.push([nextMember.name, value]);
189+
}
190+
},
191+
// These are only used for nested arrays
192+
onStructureStart: () => {},
193+
onStructureEnd: () => {}
237194
}
195+
});
238196

239-
// If we're in a compound type, nested compound values or arrays are encoded as strings.
240-
return null;
241-
}
197+
return Object.fromEntries(parsed);
242198
}
243-
});
244-
245-
return result;
246-
}
247-
248-
private resolveStructure(type: MaybeKnownType): [ArrayType | CompositeType, pgwire.Delimiters] | null {
249-
switch (type.type) {
250-
case 'builtin':
251-
case 'unknown':
252-
return null;
253-
case 'domain':
254-
return this.resolveStructure(this.lookupType(type.innerId));
255-
case 'array':
256-
case 'composite':
257-
return [type, this.delimitersFor(type)];
258-
}
259-
}
260-
261-
private delimitersFor(type: ArrayType | CompositeType): pgwire.Delimiters {
262-
if (type.type == 'array') {
263-
return pgwire.arrayDelimiters(type.separatorCharCode);
264-
} else {
265-
return pgwire.COMPOSITE_DELIMITERS;
266199
}
267200
}
268201

packages/jpgwire/src/pgwire_types.ts

Lines changed: 4 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
// Adapted from https://github.com/kagis/pgwire/blob/0dc927f9f8990a903f238737326e53ba1c8d094f/mod.js#L2218
22

33
import { JsonContainer } from '@powersync/service-jsonbig';
4-
import { TimeValue, type DatabaseInputValue } from '@powersync/service-sync-rules';
4+
import { CustomSqliteValue, TimeValue, type DatabaseInputValue } from '@powersync/service-sync-rules';
55
import { dateToSqlite, lsnMakeComparable, timestampToSqlite, timestamptzToSqlite } from './util.js';
66
import {
77
arrayDelimiters,
88
CHAR_CODE_COMMA,
99
CHAR_CODE_LEFT_BRACE,
1010
CHAR_CODE_RIGHT_BRACE,
11+
decodeArray,
1112
decodeSequence,
1213
Delimiters,
1314
SequenceListener
@@ -164,43 +165,10 @@ export class PgType {
164165

165166
static _decodeArray(text: string, elemTypeOid: number): DatabaseInputValue[] {
166167
text = text.replace(/^\[.+=/, ''); // skip dimensions
167-
168-
let results: DatabaseInputValue[];
169-
const stack: DatabaseInputValue[][] = [];
170-
const delimiters = arrayDelimiters();
171-
172-
const listener: SequenceListener = {
173-
maybeParseSubStructure: function (firstChar: number): Delimiters | null {
174-
return firstChar == CHAR_CODE_LEFT_BRACE ? delimiters : null;
175-
},
176-
onStructureStart: () => {
177-
// We're parsing a new array
178-
stack.push([]);
179-
},
180-
onValue: function (value: string | null): void {
181-
// Atomic (non-array) value, add to current array.
182-
stack[stack.length - 1].push(value && PgType.decode(value, elemTypeOid));
183-
},
184-
onStructureEnd: () => {
185-
// We're done parsing an array.
186-
const subarray = stack.pop()!;
187-
if (stack.length == 0) {
188-
// We are done with the outermost array, set results.
189-
results = subarray;
190-
} else {
191-
// We were busy parsing a nested array, continue outer array.
192-
stack[stack.length - 1].push(subarray);
193-
}
194-
}
195-
};
196-
197-
decodeSequence({
168+
return decodeArray({
198169
source: text,
199-
listener,
200-
delimiters
170+
decodeElement: (raw) => PgType.decode(raw, elemTypeOid)
201171
});
202-
203-
return results!;
204172
}
205173

206174
static _decodeBytea(text: string): Uint8Array {

0 commit comments

Comments
 (0)