Skip to content

Commit c53f99d

Browse files
committed
Parse custom postgres types
1 parent bb1bd27 commit c53f99d

File tree

16 files changed

+1061
-96
lines changed

16 files changed

+1061
-96
lines changed
Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1 @@
11
export * from './module/PostgresModule.js';
2-
3-
export * as pg_utils from './utils/pgwire_utils.js';

modules/module-postgres/src/replication/PgManager.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import * as pgwire from '@powersync/service-jpgwire';
22
import semver from 'semver';
33
import { NormalizedPostgresConnectionConfig } from '../types/types.js';
44
import { getApplicationName } from '../utils/application-name.js';
5+
import { PostgresTypeCache } from '../types/custom.js';
56

67
/**
78
* Shorter timeout for snapshot connections than for replication connections.
@@ -14,6 +15,8 @@ export class PgManager {
1415
*/
1516
public readonly pool: pgwire.PgClient;
1617

18+
public readonly types: PostgresTypeCache;
19+
1720
private connectionPromises: Promise<pgwire.PgConnection>[] = [];
1821

1922
constructor(
@@ -22,6 +25,7 @@ export class PgManager {
2225
) {
2326
// The pool is lazy - no connections are opened until a query is performed.
2427
this.pool = pgwire.connectPgWirePool(this.options, poolOptions);
28+
this.types = new PostgresTypeCache(this.pool);
2529
}
2630

2731
public get connectionTag() {

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import {
2929
TablePattern,
3030
toSyncRulesRow
3131
} from '@powersync/service-sync-rules';
32-
import * as pg_utils from '../utils/pgwire_utils.js';
3332

3433
import { PgManager } from './PgManager.js';
3534
import { getPgOutputRelation, getRelId } from './PgRelation.js';
@@ -779,7 +778,7 @@ WHERE oid = $1::regclass`,
779778

780779
if (msg.tag == 'insert') {
781780
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
782-
const baseRecord = pg_utils.constructAfterRecord(msg);
781+
const baseRecord = this.connections.types.constructAfterRecord(msg);
783782
return await batch.save({
784783
tag: storage.SaveOperationTag.INSERT,
785784
sourceTable: table,
@@ -792,8 +791,8 @@ WHERE oid = $1::regclass`,
792791
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
793792
// "before" may be null if the replica id columns are unchanged
794793
// It's fine to treat that the same as an insert.
795-
const before = pg_utils.constructBeforeRecord(msg);
796-
const after = pg_utils.constructAfterRecord(msg);
794+
const before = this.connections.types.constructBeforeRecord(msg);
795+
const after = this.connections.types.constructAfterRecord(msg);
797796
return await batch.save({
798797
tag: storage.SaveOperationTag.UPDATE,
799798
sourceTable: table,
@@ -804,7 +803,7 @@ WHERE oid = $1::regclass`,
804803
});
805804
} else if (msg.tag == 'delete') {
806805
this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(1);
807-
const before = pg_utils.constructBeforeRecord(msg)!;
806+
const before = this.connections.types.constructBeforeRecord(msg)!;
808807

809808
return await batch.save({
810809
tag: storage.SaveOperationTag.DELETE,
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
import { DatabaseInputRow, SqliteInputRow, toSyncRulesRow } from '@powersync/service-sync-rules';
2+
import * as pgwire from '@powersync/service-jpgwire';
3+
import { CustomTypeRegistry } from './registry.js';
4+
5+
export class PostgresTypeCache {
6+
readonly registry: CustomTypeRegistry;
7+
8+
constructor(private readonly pool: pgwire.PgClient) {
9+
this.registry = new CustomTypeRegistry();
10+
}
11+
12+
public async fetchTypes(oids: number[]) {
13+
let pending = oids.filter((id) => !(id in Object.values(pgwire.PgTypeOid)));
14+
// For details on columns, see https://www.postgresql.org/docs/current/catalog-pg-type.html
15+
const statement = `
16+
SELECT oid, pg_type.typtype,
17+
CASE pg_type.typtype
18+
WHEN 'd' THEN json_build_object('type', pg_type.typbasetype)
19+
WHEN 'c' THEN json_build_object(
20+
'elements',
21+
(SELECT json_agg(json_build_object('name', a.attname, 'type', a.atttypid))
22+
FROM pg_attribute a
23+
WHERE a.attrelid = pg_type.typrelid)
24+
)
25+
ELSE NULL
26+
END AS desc
27+
FROM pg_type
28+
WHERE pg_type.oid = ANY($1)
29+
`;
30+
31+
while (pending.length != 0) {
32+
const query = await this.pool.query({ statement, params: [{ type: 1016, value: pending }] });
33+
const stillPending: number[] = [];
34+
35+
const requireType = (oid: number) => {
36+
if (!this.registry.knows(oid) && !pending.includes(oid) && !stillPending.includes(oid)) {
37+
stillPending.push(oid);
38+
}
39+
};
40+
41+
for (const row of pgwire.pgwireRows(query)) {
42+
const oid = Number(row.oid);
43+
const desc = JSON.parse(row.desc);
44+
45+
switch (row.typtype) {
46+
case 'd':
47+
// For domain values like CREATE DOMAIN api.rating_value AS FLOAT CHECK (VALUE BETWEEN 0 AND 5), we sync
48+
// the inner type (pg_type.typbasetype).
49+
const inner = Number(desc.type);
50+
this.registry.setDomainType(oid, inner);
51+
requireType(inner);
52+
break;
53+
case 'c':
54+
// For composite types, we sync the JSON representation.
55+
const elements: { name: string; typeId: number }[] = [];
56+
for (const { name, type } of desc.elements) {
57+
const typeId = Number(type);
58+
elements.push({ name, typeId });
59+
requireType(typeId);
60+
}
61+
62+
this.registry.set(oid, {
63+
type: 'composite',
64+
members: elements,
65+
sqliteType: () => 'text' // Since it's JSON
66+
});
67+
break;
68+
}
69+
}
70+
71+
pending = stillPending;
72+
}
73+
}
74+
75+
/**
76+
* Used for testing - fetches all custom types referenced by any column in the schema.
77+
*/
78+
public async fetchTypesForSchema(schema: string = 'public') {
79+
const sql = `
80+
SELECT DISTINCT a.atttypid AS type_oid
81+
FROM pg_attribute a
82+
JOIN pg_class c ON c.oid = a.attrelid
83+
JOIN pg_namespace cn ON cn.oid = c.relnamespace
84+
JOIN pg_type t ON t.oid = a.atttypid
85+
JOIN pg_namespace tn ON tn.oid = t.typnamespace
86+
WHERE a.attnum > 0
87+
AND NOT a.attisdropped
88+
AND cn.nspname = $1
89+
AND tn.nspname NOT IN ('pg_catalog', 'information_schema');
90+
`;
91+
92+
const query = await this.pool.query({ statement: sql, params: [{ type: 'varchar', value: schema }] });
93+
let ids: number[] = [];
94+
for (const row of pgwire.pgwireRows(query)) {
95+
ids.push(Number(row.type_oid));
96+
}
97+
98+
await this.fetchTypes(ids);
99+
}
100+
101+
/**
102+
* pgwire message -> SQLite row.
103+
* @param message
104+
*/
105+
constructAfterRecord(message: pgwire.PgoutputInsert | pgwire.PgoutputUpdate): SqliteInputRow {
106+
const rawData = (message as any).afterRaw;
107+
108+
const record = this.decodeTuple(message.relation, rawData);
109+
return toSyncRulesRow(record);
110+
}
111+
112+
/**
113+
* pgwire message -> SQLite row.
114+
* @param message
115+
*/
116+
constructBeforeRecord(message: pgwire.PgoutputDelete | pgwire.PgoutputUpdate): SqliteInputRow | undefined {
117+
const rawData = (message as any).beforeRaw;
118+
if (rawData == null) {
119+
return undefined;
120+
}
121+
const record = this.decodeTuple(message.relation, rawData);
122+
return toSyncRulesRow(record);
123+
}
124+
125+
/**
126+
* We need a high level of control over how values are decoded, to make sure there is no loss
127+
* of precision in the process.
128+
*/
129+
decodeTuple(relation: pgwire.PgoutputRelation, tupleRaw: Record<string, any>): DatabaseInputRow {
130+
let result: Record<string, any> = {};
131+
for (let columnName in tupleRaw) {
132+
const rawval = tupleRaw[columnName];
133+
const typeOid = (relation as any)._tupleDecoder._typeOids.get(columnName);
134+
if (typeof rawval == 'string' && typeOid) {
135+
result[columnName] = this.registry.decodeDatabaseValue(rawval, typeOid);
136+
} else {
137+
result[columnName] = rawval;
138+
}
139+
}
140+
return result;
141+
}
142+
}

0 commit comments

Comments
 (0)