@@ -2289,6 +2289,46 @@ func TestLogicalReplicationGatewayRoute(t *testing.T) {
22892289 require .Empty (t , progress .Details .(* jobspb.Progress_LogicalReplication ).LogicalReplication .PartitionConnUris )
22902290}
22912291
2292+ func TestMismatchColIDs (t * testing.T ) {
2293+ defer leaktest .AfterTest (t )()
2294+ skip .UnderDeadlock (t )
2295+ defer log .Scope (t ).Close (t )
2296+
2297+ ctx := context .Background ()
2298+ tc , s , sqlA , sqlB := setupLogicalTestServer (t , ctx , testClusterBaseClusterArgs , 1 )
2299+ defer tc .Stopper ().Stop (ctx )
2300+
2301+ dbBURL := replicationtestutils .GetExternalConnectionURI (t , s , s , serverutils .DBName ("b" ))
2302+
2303+ createStmt := "CREATE TABLE foo (pk int primary key, payload string)"
2304+ sqlA .Exec (t , createStmt )
2305+ sqlA .Exec (t , "ALTER TABLE foo ADD COLUMN baz INT DEFAULT 2" )
2306+
2307+ // Insert some data into foo
2308+ sqlA .Exec (t , "INSERT INTO foo VALUES (1, 'hello')" )
2309+ sqlA .Exec (t , "INSERT INTO foo VALUES (2, 'world')" )
2310+
2311+ sqlB .Exec (t , createStmt )
2312+ sqlB .Exec (t , "ALTER TABLE foo ADD COLUMN bar INT DEFAULT 2" )
2313+
2314+ sqlB .Exec (t , "ALTER TABLE foo ADD COLUMN baz INT DEFAULT 2" )
2315+ sqlB .Exec (t , "INSERT INTO foo VALUES (3, 'hello', 3)" )
2316+ sqlB .Exec (t , "ALTER TABLE foo DROP COLUMN bar" )
2317+ sqlB .Exec (t , "INSERT INTO foo VALUES (4, 'world')" )
2318+
2319+ // LDR immediate mode creation should fail because of mismatched column IDs.
2320+ sqlA .ExpectErr (t ,
2321+ "destination table foo column baz has ID 3, but the source table foo has ID 4" ,
2322+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'immediate'" , dbBURL .String ())
2323+
2324+ // LDR validated mode creation should succeed because the SQL writer supports mismatched column IDs.
2325+ var jobID jobspb.JobID
2326+ sqlA .QueryRow (t , "CREATE LOGICAL REPLICATION STREAM FROM TABLE foo ON $1 INTO TABLE foo WITH MODE = 'validated'" , dbBURL .String ()).Scan (& jobID )
2327+
2328+ now := s .Clock ().Now ()
2329+ WaitUntilReplicatedTime (t , now , sqlA , jobID )
2330+ }
2331+
22922332// TestLogicalReplicationCreationChecks verifies that we check that the table
22932333// schemas are compatible when creating the replication stream.
22942334func TestLogicalReplicationCreationChecks (t * testing.T ) {
@@ -2311,40 +2351,33 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
23112351
23122352 dbBURL := replicationtestutils .GetExternalConnectionURI (t , s , s , serverutils .DBName ("b" ))
23132353
2354+ expectErr := func (t * testing.T , tableName string , err string ) {
2355+ t .Helper ()
2356+ dbA .ExpectErr (t , err , fmt .Sprintf ("CREATE LOGICAL REPLICATION STREAM FROM TABLE %s ON $1 INTO TABLE %s WITH MODE = 'validated'" , tableName , tableName ), dbBURL .String ())
2357+ replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2358+ }
2359+
23142360 // Column families are not allowed.
23152361 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN new_col INT NOT NULL CREATE FAMILY f1" )
23162362 dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN new_col INT NOT NULL" )
2317- dbA .ExpectErr (t ,
2318- "cannot create logical replication stream: table tab has more than one column family" ,
2319- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2320- )
2321- replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
2363+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has more than one column family" )
23222364
23232365 // UniqueWithoutIndex constraints are not allowed.
23242366 for _ , db := range []* sqlutils.SQLRunner {dbA , dbB } {
23252367 db .Exec (t , "SET experimental_enable_unique_without_index_constraints = true" )
23262368 db .Exec (t , "CREATE TABLE tab_with_uwi (pk INT PRIMARY KEY, v INT UNIQUE WITHOUT INDEX)" )
23272369 }
2328- dbA .ExpectErr (t ,
2329- "cannot create logical replication stream: table tab_with_uwi has UNIQUE WITHOUT INDEX constraints: unique_v" ,
2330- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab_with_uwi ON $1 INTO TABLE tab_with_uwi" , dbBURL .String (),
2331- )
2370+ expectErr (t , "tab_with_uwi" , "cannot create logical replication stream: table tab_with_uwi has UNIQUE WITHOUT INDEX constraints: unique_v" )
23322371 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23332372
23342373 // Check for mismatched numbers of columns.
23352374 dbA .Exec (t , "ALTER TABLE tab DROP COLUMN new_col" )
2336- dbA .ExpectErr (t ,
2337- "cannot create logical replication stream: destination table tab has 2 columns, but the source table tab has 3 columns" ,
2338- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2339- )
2375+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab has 2 columns, but the source table tab has 3 columns" )
23402376 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23412377
23422378 // Check for mismatched column types.
23432379 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN new_col TEXT NOT NULL" )
2344- dbA .ExpectErr (t ,
2345- "cannot create logical replication stream: destination table tab column new_col has type STRING, but the source table tab has type INT8" ,
2346- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2347- )
2380+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab column new_col has type STRING, but the source table tab has type INT8" )
23482381 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23492382
23502383 // Check for composite type in primary key.
@@ -2353,39 +2386,27 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
23532386 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN composite_col DECIMAL NOT NULL" )
23542387 dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN composite_col DECIMAL NOT NULL" )
23552388 dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, composite_col)" )
2356- dbA .ExpectErr (t ,
2357- `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding` ,
2358- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2359- )
2389+ expectErr (t , "tab" , `cannot create logical replication stream: table tab has a primary key column \(composite_col\) with composite encoding` )
23602390 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23612391
23622392 // Check for partial indexes.
23632393 dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk)" )
23642394 dbA .Exec (t , "CREATE INDEX partial_idx ON tab(composite_col) WHERE pk > 0" )
2365- dbA .ExpectErr (t ,
2366- `cannot create logical replication stream: table tab has a partial index partial_idx` ,
2367- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2368- )
2395+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a partial index partial_idx" )
23692396 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23702397
23712398 // Check for virtual computed columns that are a key of a secondary index.
23722399 dbA .Exec (t , "DROP INDEX partial_idx" )
23732400 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN virtual_col INT NOT NULL AS (pk + 1) VIRTUAL" )
23742401 dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN virtual_col INT NOT NULL AS (pk + 1) VIRTUAL" )
23752402 dbA .Exec (t , "CREATE INDEX virtual_col_idx ON tab(virtual_col)" )
2376- dbA .ExpectErr (t ,
2377- `cannot create logical replication stream: table tab has a virtual computed column virtual_col that is a key of index virtual_col_idx` ,
2378- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2379- )
2403+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a virtual computed column virtual_col that is a key of index virtual_col_idx" )
23802404 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23812405
23822406 // Check for virtual columns that are in the primary index.
23832407 dbA .Exec (t , "DROP INDEX virtual_col_idx" )
23842408 dbA .Exec (t , "ALTER TABLE tab ALTER PRIMARY KEY USING COLUMNS (pk, virtual_col)" )
2385- dbA .ExpectErr (t ,
2386- `cannot create logical replication stream: table tab has a virtual computed column virtual_col that appears in the primary key` ,
2387- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2388- )
2409+ expectErr (t , "tab" , "cannot create logical replication stream: table tab has a virtual computed column virtual_col that appears in the primary key" )
23892410 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
23902411
23912412 // Change the primary key back, and remove the indexes that are left over from
@@ -2398,10 +2419,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
23982419 // Check that CHECK constraints match.
23992420 dbA .Exec (t , "ALTER TABLE tab ADD CONSTRAINT check_constraint_1 CHECK (pk > 0)" )
24002421 dbB .Exec (t , "ALTER TABLE b.tab ADD CONSTRAINT check_constraint_1 CHECK (length(payload) > 1)" )
2401- dbA .ExpectErr (t ,
2402- `cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab` ,
2403- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2404- )
2422+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab CHECK constraints do not match source table tab" )
24052423 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24062424
24072425 // Allow user to create LDR stream with mismatched CHECK via SKIP SCHEMA CHECK.
@@ -2419,7 +2437,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24192437 dbB .Exec (t , "ALTER TABLE b.tab ADD CONSTRAINT check_constraint_2 CHECK (pk > 0)" )
24202438 var jobAID jobspb.JobID
24212439 dbA .QueryRow (t ,
2422- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" ,
2440+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'validated' " ,
24232441 dbBURL .String (),
24242442 ).Scan (& jobAID )
24252443
@@ -2433,10 +2451,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24332451 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL" )
24342452 dbA .Exec (t , "ALTER TABLE tab ALTER COLUMN udf_col SET DEFAULT my_udf()" )
24352453 dbB .Exec (t , "ALTER TABLE tab ADD COLUMN udf_col INT NOT NULL DEFAULT 1" )
2436- dbA .ExpectErr (t ,
2437- `cannot create logical replication stream: table tab references functions with IDs \[[0-9]+\]` ,
2438- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2439- )
2454+ expectErr (t , "tab" , "cannot create logical replication stream: table tab references functions with IDs [[0-9]+]" )
24402455 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24412456
24422457 // Check if the table references a sequence.
@@ -2445,21 +2460,15 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24452460 dbA .Exec (t , "CREATE SEQUENCE my_seq" )
24462461 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT nextval('my_seq')" )
24472462 dbB .Exec (t , "ALTER TABLE tab ADD COLUMN seq_col INT NOT NULL DEFAULT 1" )
2448- dbA .ExpectErr (t ,
2449- `cannot create logical replication stream: table tab references sequences with IDs \[[0-9]+\]` ,
2450- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2451- )
2463+ expectErr (t , "tab" , "cannot create logical replication stream: table tab references sequences with IDs [[0-9]+]" )
24522464 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24532465
24542466 // Check if table has a trigger.
24552467 dbA .Exec (t , "ALTER TABLE tab DROP COLUMN seq_col" )
24562468 dbB .Exec (t , "ALTER TABLE tab DROP COLUMN seq_col" )
24572469 dbA .Exec (t , "CREATE OR REPLACE FUNCTION my_trigger() RETURNS TRIGGER AS $$ BEGIN RETURN NEW; END $$ LANGUAGE PLPGSQL" )
24582470 dbA .Exec (t , "CREATE TRIGGER my_trigger BEFORE INSERT ON tab FOR EACH ROW EXECUTE FUNCTION my_trigger()" )
2459- dbA .ExpectErr (t ,
2460- `cannot create logical replication stream: table tab references triggers \[my_trigger\]` ,
2461- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2462- )
2471+ expectErr (t , "tab" , `cannot create logical replication stream: table tab references triggers \[my_trigger\]` )
24632472 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24642473
24652474 // Verify that the stream cannot be created with mismatched enum types.
@@ -2468,9 +2477,8 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24682477 dbB .Exec (t , "CREATE TYPE b.mytype AS ENUM ('a', 'b')" )
24692478 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN enum_col mytype NOT NULL" )
24702479 dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN enum_col b.mytype NOT NULL" )
2471- dbA . ExpectErr ( t ,
2480+ expectErr ( t , "tab" ,
24722481 `cannot create logical replication stream: .* destination type USER DEFINED ENUM: public.mytype has logical representations \[a b c\], but the source type USER DEFINED ENUM: mytype has \[a b\]` ,
2473- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
24742482 )
24752483 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24762484
@@ -2490,21 +2498,15 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
24902498 dbB .Exec (t , "CREATE TYPE b.composite_typ AS (a TEXT, b INT)" )
24912499 dbA .Exec (t , "ALTER TABLE tab ADD COLUMN composite_udt_col composite_typ NOT NULL" )
24922500 dbB .Exec (t , "ALTER TABLE b.tab ADD COLUMN composite_udt_col b.composite_typ NOT NULL" )
2493- dbA .ExpectErr (t ,
2494- `cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING` ,
2495- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2496- )
2501+ expectErr (t , "tab" , "cannot create logical replication stream: .* destination type USER DEFINED RECORD: public.composite_typ tuple element 0 does not match source type USER DEFINED RECORD: composite_typ tuple element 0: destination type INT8 does not match source type STRING" )
24972502 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
24982503
24992504 // Check that UNIQUE indexes match.
25002505 dbA .Exec (t , "ALTER TABLE tab DROP COLUMN composite_udt_col" )
25012506 dbB .Exec (t , "ALTER TABLE b.tab DROP COLUMN composite_udt_col" )
25022507 dbA .Exec (t , "CREATE UNIQUE INDEX payload_idx ON tab(payload)" )
25032508 dbB .Exec (t , "CREATE UNIQUE INDEX multi_idx ON b.tab(composite_col, pk)" )
2504- dbA .ExpectErr (t ,
2505- `cannot create logical replication stream: destination table tab UNIQUE indexes do not match source table tab` ,
2506- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" , dbBURL .String (),
2507- )
2509+ expectErr (t , "tab" , "cannot create logical replication stream: destination table tab UNIQUE indexes do not match source table tab" )
25082510 replicationtestutils .WaitForAllProducerJobsToFail (t , dbB )
25092511
25102512 // Create the missing indexes on each side and verify the stream can be
@@ -2513,7 +2515,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
25132515 dbA .Exec (t , "CREATE UNIQUE INDEX multi_idx ON tab(composite_col, pk)" )
25142516 dbB .Exec (t , "CREATE UNIQUE INDEX payload_idx ON b.tab(payload)" )
25152517 dbA .QueryRow (t ,
2516- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab" ,
2518+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab WITH MODE = 'validated' " ,
25172519 dbBURL .String (),
25182520 ).Scan (& jobAID )
25192521
@@ -2528,7 +2530,7 @@ func TestLogicalReplicationCreationChecks(t *testing.T) {
25282530 dbB .Exec (t , "CREATE TABLE b.tab2 (pk INT PRIMARY KEY, payload STRING DEFAULT 'dog')" )
25292531 dbB .Exec (t , "Insert into tab2 values (1)" )
25302532 dbA .QueryRow (t ,
2531- "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab2 ON $1 INTO TABLE tab2" ,
2533+ "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab2 ON $1 INTO TABLE tab2 WITH MODE = 'validated' " ,
25322534 dbBURL .String (),
25332535 ).Scan (& jobAID )
25342536 WaitUntilReplicatedTime (t , s .Clock ().Now (), dbA , jobAID )
0 commit comments