@@ -29,10 +29,9 @@ import {
2929 TablePattern ,
3030 toSyncRulesRow
3131} from '@powersync/service-sync-rules' ;
32- import * as pg_utils from '../utils/pgwire_utils.js' ;
3332
3433import { PgManager } from './PgManager.js' ;
35- import { getPgOutputRelation , getRelId } from './PgRelation.js' ;
34+ import { getPgOutputRelation , getRelId , referencedColumnTypeIds } from './PgRelation.js' ;
3635import { checkSourceConfiguration , checkTableRls , getReplicationIdentityColumns } from './replication-utils.js' ;
3736import { ReplicationMetric } from '@powersync/service-types' ;
3837import {
@@ -189,28 +188,30 @@ export class WalStream {
189188
190189 let tableRows : any [ ] ;
191190 const prefix = tablePattern . isWildcard ? tablePattern . tablePrefix : undefined ;
192- if ( tablePattern . isWildcard ) {
193- const result = await db . query ( {
194- statement : `SELECT c.oid AS relid, c.relname AS table_name
191+
192+ {
193+ let query = `
194+ SELECT
195+ c.oid AS relid,
196+ c.relname AS table_name,
197+ (SELECT
198+ json_agg(DISTINCT a.atttypid)
199+ FROM pg_attribute a
200+ WHERE a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = c.oid)
201+ AS column_types
195202 FROM pg_class c
196203 JOIN pg_namespace n ON n.oid = c.relnamespace
197204 WHERE n.nspname = $1
198- AND c.relkind = 'r'
199- AND c.relname LIKE $2` ,
200- params : [
201- { type : 'varchar' , value : schema } ,
202- { type : 'varchar' , value : tablePattern . tablePattern }
203- ]
204- } ) ;
205- tableRows = pgwire . pgwireRows ( result ) ;
206- } else {
205+ AND c.relkind = 'r'` ;
206+
207+ if ( tablePattern . isWildcard ) {
208+ query += ' AND c.relname LIKE $2' ;
209+ } else {
210+ query += ' AND c.relname = $2' ;
211+ }
212+
207213 const result = await db . query ( {
208- statement : `SELECT c.oid AS relid, c.relname AS table_name
209- FROM pg_class c
210- JOIN pg_namespace n ON n.oid = c.relnamespace
211- WHERE n.nspname = $1
212- AND c.relkind = 'r'
213- AND c.relname = $2` ,
214+ statement : query ,
214215 params : [
215216 { type : 'varchar' , value : schema } ,
216217 { type : 'varchar' , value : tablePattern . tablePattern }
@@ -219,6 +220,7 @@ export class WalStream {
219220
220221 tableRows = pgwire . pgwireRows ( result ) ;
221222 }
223+
222224 let result : storage . SourceTable [ ] = [ ] ;
223225
224226 for ( let row of tableRows ) {
@@ -258,16 +260,18 @@ export class WalStream {
258260
259261 const cresult = await getReplicationIdentityColumns ( db , relid ) ;
260262
261- const table = await this . handleRelation (
263+ const columnTypes = ( JSON . parse ( row . column_types ) as string [ ] ) . map ( ( e ) => Number ( e ) ) ;
264+ const table = await this . handleRelation ( {
262265 batch,
263- {
266+ descriptor : {
264267 name,
265268 schema,
266269 objectId : relid ,
267270 replicaIdColumns : cresult . replicationColumns
268271 } as SourceEntityDescriptor ,
269- false
270- ) ;
272+ snapshot : false ,
273+ referencedTypeIds : columnTypes
274+ } ) ;
271275
272276 result . push ( table ) ;
273277 }
@@ -683,7 +687,14 @@ WHERE oid = $1::regclass`,
683687 }
684688 }
685689
686- async handleRelation ( batch : storage . BucketStorageBatch , descriptor : SourceEntityDescriptor , snapshot : boolean ) {
690+ async handleRelation ( options : {
691+ batch : storage . BucketStorageBatch ;
692+ descriptor : SourceEntityDescriptor ;
693+ snapshot : boolean ;
694+ referencedTypeIds : number [ ] ;
695+ } ) {
696+ const { batch, descriptor, snapshot, referencedTypeIds } = options ;
697+
687698 if ( ! descriptor . objectId && typeof descriptor . objectId != 'number' ) {
688699 throw new ReplicationAssertionError ( `objectId expected, got ${ typeof descriptor . objectId } ` ) ;
689700 }
@@ -699,6 +710,9 @@ WHERE oid = $1::regclass`,
699710 // Drop conflicting tables. This includes for example renamed tables.
700711 await batch . drop ( result . dropTables ) ;
701712
713+ // Ensure we have a description for custom types referenced in the table.
714+ await this . connections . types . fetchTypes ( referencedTypeIds ) ;
715+
702716 // Snapshot if:
703717 // 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
704718 // 2. Snapshot is not already done, AND:
@@ -789,7 +803,7 @@ WHERE oid = $1::regclass`,
789803
790804 if ( msg . tag == 'insert' ) {
791805 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
792- const baseRecord = pg_utils . constructAfterRecord ( msg ) ;
806+ const baseRecord = this . connections . types . constructAfterRecord ( msg ) ;
793807 return await batch . save ( {
794808 tag : storage . SaveOperationTag . INSERT ,
795809 sourceTable : table ,
@@ -802,8 +816,8 @@ WHERE oid = $1::regclass`,
802816 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
803817 // "before" may be null if the replica id columns are unchanged
804818 // It's fine to treat that the same as an insert.
805- const before = pg_utils . constructBeforeRecord ( msg ) ;
806- const after = pg_utils . constructAfterRecord ( msg ) ;
819+ const before = this . connections . types . constructBeforeRecord ( msg ) ;
820+ const after = this . connections . types . constructAfterRecord ( msg ) ;
807821 return await batch . save ( {
808822 tag : storage . SaveOperationTag . UPDATE ,
809823 sourceTable : table ,
@@ -814,7 +828,7 @@ WHERE oid = $1::regclass`,
814828 } ) ;
815829 } else if ( msg . tag == 'delete' ) {
816830 this . metrics . getCounter ( ReplicationMetric . ROWS_REPLICATED ) . add ( 1 ) ;
817- const before = pg_utils . constructBeforeRecord ( msg ) ! ;
831+ const before = this . connections . types . constructBeforeRecord ( msg ) ! ;
818832
819833 return await batch . save ( {
820834 tag : storage . SaveOperationTag . DELETE ,
@@ -955,7 +969,12 @@ WHERE oid = $1::regclass`,
955969
956970 for ( const msg of messages ) {
957971 if ( msg . tag == 'relation' ) {
958- await this . handleRelation ( batch , getPgOutputRelation ( msg ) , true ) ;
972+ await this . handleRelation ( {
973+ batch,
974+ descriptor : getPgOutputRelation ( msg ) ,
975+ snapshot : true ,
976+ referencedTypeIds : referencedColumnTypeIds ( msg )
977+ } ) ;
959978 } else if ( msg . tag == 'begin' ) {
960979 // This may span multiple transactions in the same chunk, or even across chunks.
961980 skipKeepalive = true ;
0 commit comments