Skip to content

Commit c4e8e73

Browse files
committed
Actually fetch types
1 parent 069684d commit c4e8e73

File tree

3 files changed

+47
-3
lines changed

3 files changed

+47
-3
lines changed

modules/module-postgres/src/replication/WalStream.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -700,6 +700,9 @@ WHERE oid = $1::regclass`,
700700
// Drop conflicting tables. This includes for example renamed tables.
701701
await batch.drop(result.dropTables);
702702

703+
// Ensure we have a description for custom types referenced in the table.
704+
await this.connections.types.fetchTypes(referencedTypeIds);
705+
703706
// Snapshot if:
704707
// 1. Snapshot is requested (false for initial snapshot, since that process handles it elsewhere)
705708
// 2. Snapshot is not already done, AND:
@@ -710,9 +713,6 @@ WHERE oid = $1::regclass`,
710713
// Truncate this table, in case a previous snapshot was interrupted.
711714
await batch.truncate([result.table]);
712715

713-
// Ensure we have a description for custom types referenced in the table.
714-
await this.connections.types.fetchTypes(referencedTypeIds);
715-
716716
// Start the snapshot inside a transaction.
717717
// We use a dedicated connection for this.
718718
const db = await this.connections.snapshotConnection();

modules/module-postgres/test/src/schema_changes.test.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,4 +590,24 @@ function defineTests(factory: storage.TestStorageFactory) {
590590

591591
expect(failures).toEqual([]);
592592
});
593+
594+
test('custom types', async () => {
595+
await using context = await WalStreamTestContext.open(factory);
596+
597+
await context.updateSyncRules(`
598+
streams:
599+
stream:
600+
query: SELECT id, * FROM "test_data"
601+
602+
config:
603+
edition: 2
604+
`);
605+
606+
const { pool } = context;
607+
await pool.query(`DROP TABLE IF EXISTS test_data`);
608+
await pool.query(`CREATE TABLE test_data(id text primary key, description composite);`);
609+
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', ROW(TRUE, 2)::composite)`);
610+
611+
await context.replicateSnapshot();
612+
});
593613
}

modules/module-postgres/test/src/wal_stream.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,4 +324,28 @@ bucket_definitions:
324324
// creating a new replication slot.
325325
}
326326
});
327+
328+
test('custom types', async () => {
329+
await using context = await WalStreamTestContext.open(factory);
330+
331+
await context.updateSyncRules(`
332+
streams:
333+
stream:
334+
query: SELECT id, * FROM "test_data"
335+
336+
config:
337+
edition: 2
338+
`);
339+
340+
const { pool } = context;
341+
await pool.query(`DROP TABLE IF EXISTS test_data`);
342+
await pool.query(`CREATE TYPE composite AS (foo bool, bar int4);`);
343+
await pool.query(`CREATE TABLE test_data(id text primary key, description composite);`);
344+
345+
await context.initializeReplication();
346+
await pool.query(`INSERT INTO test_data(id, description) VALUES ('t1', ROW(TRUE, 2)::composite)`);
347+
348+
const data = await context.getBucketData('1#stream|0[]');
349+
expect(data).toMatchObject([putOp('test_data', { id: 't1', description: 'test1' })]);
350+
});
327351
}

0 commit comments

Comments
 (0)