Skip to content

Commit b5973e3

Browse files
authored
bq: test enum[], fix enum[], fix partition by jsonb (#3144)
1 parent bd33001 commit b5973e3

File tree

2 files changed

+61
-74
lines changed

2 files changed

+61
-74
lines changed

flow/connectors/bigquery/merge_stmt_generator.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
4646
types.QValueKindArrayInt32, types.QValueKindArrayInt64, types.QValueKindArrayString,
4747
types.QValueKindArrayBoolean, types.QValueKindArrayTimestamp, types.QValueKindArrayTimestampTZ,
4848
types.QValueKindArrayDate, types.QValueKindArrayInterval, types.QValueKindArrayUUID,
49-
types.QValueKindArrayNumeric:
49+
types.QValueKindArrayNumeric, types.QValueKindArrayEnum:
5050
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
5151
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
5252
bqTypeString, column.Name, shortCol)
@@ -87,7 +87,7 @@ func (m *mergeStmtGenerator) transformedPkeyStrings(normalizedTableSchema *proto
8787
continue
8888
}
8989
switch pkeyColType {
90-
case types.QValueKindJSON:
90+
case types.QValueKindJSON, types.QValueKindJSONB:
9191
if forPartition {
9292
pkeys = append(pkeys, fmt.Sprintf("TO_JSON_STRING(%s)", m.shortColumn[pkeyCol]))
9393
} else {

flow/e2e/bigquery/peer_flow_bq_test.go

Lines changed: 59 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -108,8 +108,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Complete_Flow_No_Data() {
108108
id SERIAL PRIMARY KEY,
109109
key TEXT NOT NULL,
110110
value VARCHAR(255) NOT NULL
111-
);
112-
`, srcTableName))
111+
)`, srcTableName))
113112
require.NoError(s.t, err)
114113

115114
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -139,8 +138,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
139138
id SERIAL PRIMARY KEY,
140139
key TEXT NOT NULL,
141140
value CHAR(255) NOT NULL
142-
);
143-
`, srcTableName))
141+
)`, srcTableName))
144142
require.NoError(s.t, err)
145143

146144
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -159,7 +157,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Char_ColType_Error() {
159157
e2e.RequireEnvCanceled(s.t, env)
160158
}
161159

162-
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
160+
func (s PeerFlowE2ETestSuiteBQ) Test_Toast() {
163161
tc := e2e.NewTemporalClient(s.t)
164162

165163
srcTableName := s.attachSchemaSuffix("test_toast_bq_1")
@@ -171,8 +169,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
171169
t1 text,
172170
t2 text,
173171
k int
174-
);
175-
`, srcTableName))
172+
)`, srcTableName))
176173
require.NoError(s.t, err)
177174

178175
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -196,12 +193,11 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
196193
*/
197194
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
198195
BEGIN;
199-
INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000),
196+
INSERT INTO %[1]s(t1,t2,k) SELECT random_string(9000),random_string(9000),
200197
1 FROM generate_series(1,2);
201-
UPDATE %s SET k=102 WHERE id=1;
202-
UPDATE %s SET t1='dummy' WHERE id=2;
203-
END;
204-
`, srcTableName, srcTableName, srcTableName))
198+
UPDATE %[1]s SET k=102 WHERE id=1;
199+
UPDATE %[1]s SET t1='dummy' WHERE id=2;
200+
END`, srcTableName))
205201
e2e.EnvNoError(s.t, env, err)
206202
s.t.Log("Executed a transaction touching toast columns")
207203

@@ -210,7 +206,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_BQ() {
210206
e2e.RequireEnvCanceled(s.t, env)
211207
}
212208

213-
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
209+
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1() {
214210
tc := e2e.NewTemporalClient(s.t)
215211

216212
srcTableName := s.attachSchemaSuffix("test_toast_bq_3")
@@ -222,8 +218,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
222218
t1 text,
223219
t2 text,
224220
k int
225-
);
226-
`, srcTableName))
221+
)`, srcTableName))
227222
require.NoError(s.t, err)
228223

229224
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -242,23 +237,21 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
242237
// complex transaction with random DMLs on a table with toast columns
243238
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
244239
BEGIN;
245-
INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000),
240+
INSERT INTO %[1]s(t1,t2,k) SELECT random_string(9000),random_string(9000),
246241
1 FROM generate_series(1,2);
247-
UPDATE %s SET k=102 WHERE id=1;
248-
UPDATE %s SET t1='dummy' WHERE id=2;
249-
UPDATE %s SET t2='dummy' WHERE id=2;
250-
DELETE FROM %s WHERE id=1;
251-
INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000),
242+
UPDATE %[1]s SET k=102 WHERE id=1;
243+
UPDATE %[1]s SET t1='dummy' WHERE id=2;
244+
UPDATE %[1]s SET t2='dummy' WHERE id=2;
245+
DELETE FROM %[1]s WHERE id=1;
246+
INSERT INTO %[1]s(t1,t2,k) SELECT random_string(9000),random_string(9000),
252247
1 FROM generate_series(1,2);
253-
UPDATE %s SET k=1 WHERE id=1;
254-
UPDATE %s SET t1='dummy1',t2='dummy2' WHERE id=1;
255-
UPDATE %s SET t1='dummy3' WHERE id=3;
256-
DELETE FROM %s WHERE id=2;
257-
DELETE FROM %s WHERE id=3;
258-
DELETE FROM %s WHERE id=2;
259-
END;
260-
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName,
261-
srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
248+
UPDATE %[1]s SET k=1 WHERE id=1;
249+
UPDATE %[1]s SET t1='dummy1',t2='dummy2' WHERE id=1;
250+
UPDATE %[1]s SET t1='dummy3' WHERE id=3;
251+
DELETE FROM %[1]s WHERE id=2;
252+
DELETE FROM %[1]s WHERE id=3;
253+
DELETE FROM %[1]s WHERE id=2;
254+
END`, srcTableName))
262255
e2e.EnvNoError(s.t, env, err)
263256
s.t.Log("Executed a transaction touching toast columns")
264257

@@ -267,7 +260,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_1_BQ() {
267260
e2e.RequireEnvCanceled(s.t, env)
268261
}
269262

270-
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
263+
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2() {
271264
tc := e2e.NewTemporalClient(s.t)
272265

273266
srcTableName := s.attachSchemaSuffix("test_toast_bq_4")
@@ -298,25 +291,25 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_2_BQ() {
298291
// complex transaction with random DMLs on a table with toast columns
299292
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
300293
BEGIN;
301-
INSERT INTO %s(t1,k) SELECT random_string(9000),
294+
INSERT INTO %[1]s(t1,k) SELECT random_string(9000),
302295
1 FROM generate_series(1,1);
303-
UPDATE %s SET t1=sub.t1 FROM (SELECT random_string(9000) t1
296+
UPDATE %[1]s SET t1=sub.t1 FROM (SELECT random_string(9000) t1
304297
FROM generate_series(1,1) ) sub WHERE id=1;
305-
UPDATE %s SET k=2 WHERE id=1;
306-
UPDATE %s SET k=3 WHERE id=1;
307-
UPDATE %s SET t1=sub.t1 FROM (SELECT random_string(9000) t1
298+
UPDATE %[1]s SET k=2 WHERE id=1;
299+
UPDATE %[1]s SET k=3 WHERE id=1;
300+
UPDATE %[1]s SET t1=sub.t1 FROM (SELECT random_string(9000) t1
308301
FROM generate_series(1,1)) sub WHERE id=1;
309-
UPDATE %s SET k=4 WHERE id=1;
302+
UPDATE %[1]s SET k=4 WHERE id=1;
310303
END;
311-
`, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName, srcTableName))
304+
`, srcTableName))
312305
e2e.EnvNoError(s.t, env, err)
313306
s.t.Log("Executed a transaction touching toast columns")
314307
e2e.EnvWaitForEqualTables(env, s, "normalizing tx", dstTableName, "id,t1,k")
315308
env.Cancel(s.t.Context())
316309
e2e.RequireEnvCanceled(s.t, env)
317310
}
318311

319-
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
312+
func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3() {
320313
tc := e2e.NewTemporalClient(s.t)
321314

322315
srcTableName := s.attachSchemaSuffix("test_toast_bq_5")
@@ -345,27 +338,24 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Toast_Advance_3_BQ() {
345338
// and execute a transaction touching toast columns
346339
env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
347340
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
348-
/*
349-
transaction updating a single row
350-
multiple times with changed/unchanged toast columns
351-
*/
341+
// transaction updating a single row multiple times with changed/unchanged toast columns
352342
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
353343
BEGIN;
354-
INSERT INTO %s(t1,t2,k) SELECT random_string(9000),random_string(9000),
344+
INSERT INTO %[1]s(t1,t2,k) SELECT random_string(9000),random_string(9000),
355345
1 FROM generate_series(1,1);
356-
UPDATE %s SET k=102 WHERE id=1;
357-
UPDATE %s SET t1='dummy' WHERE id=1;
358-
UPDATE %s SET t2='dummy' WHERE id=1;
346+
UPDATE %[1]s SET k=102 WHERE id=1;
347+
UPDATE %[1]s SET t1='dummy' WHERE id=1;
348+
UPDATE %[1]s SET t2='dummy' WHERE id=1;
359349
END;
360-
`, srcTableName, srcTableName, srcTableName, srcTableName))
350+
`, srcTableName))
361351
e2e.EnvNoError(s.t, env, err)
362352
s.t.Log("Executed a transaction touching toast columns")
363353
e2e.EnvWaitForEqualTables(env, s, "normalizing tx", dstTableName, "id,t1,t2,k")
364354
env.Cancel(s.t.Context())
365355
e2e.RequireEnvCanceled(s.t, env)
366356
}
367357

368-
func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
358+
func (s PeerFlowE2ETestSuiteBQ) Test_Types() {
369359
tc := e2e.NewTemporalClient(s.t)
370360

371361
srcTableName := s.attachSchemaSuffix("test_types_bq")
@@ -385,7 +375,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
385375
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
386376
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE,
387377
c47 DATE[], c48 TIMESTAMPTZ[], c49 TIMESTAMP[], c50 BOOLEAN[], c51 SMALLINT[], c52 NUMERIC,
388-
c53 NUMERIC(16,2)[], c54 NUMERIC[]);
378+
c53 NUMERIC(16,2)[], c54 NUMERIC[], c55 mood[]);
389379
`, srcTableName))
390380
require.NoError(s.t, err)
391381

@@ -402,9 +392,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
402392
// and execute a transaction touching toast columns
403393
env := e2e.ExecutePeerflow(s.t.Context(), tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
404394
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
405-
/* test inserting various types*/
406-
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
407-
INSERT INTO %s SELECT 2,2,b'1',b'101',
395+
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(
396+
`INSERT INTO %s VALUES (2,2,b'1',b'101',
408397
true,random_bytes(32),'s','test','1.1.10.2'::cidr,
409398
CURRENT_DATE,1.23,1.234,'10.0.0.0/32'::inet,1,
410399
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
@@ -422,7 +411,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
422411
'{"2020-01-01 01:01:01", "2020-01-02 01:01:01"}'::timestamp[],
423412
'{true, false}'::boolean[],
424413
'{1, 2}'::smallint[], null,
425-
'{1.2, 1.23, null}'::numeric(16,2)[], '{1.2, 1.23, null}'::numeric[];
414+
'{1.2, 1.23, null}'::numeric(16,2)[], '{1.2, 1.23, null}'::numeric[], ARRAY['happy','sad']::mood[])
426415
`, srcTableName))
427416
e2e.EnvNoError(s.t, env, err)
428417

@@ -432,7 +421,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
432421
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
433422
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
434423
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46", "c47", "c48",
435-
"c49", "c50", "c51", "c53", "c54",
424+
"c49", "c50", "c51", "c53", "c54", "c55",
436425
})
437426
if err != nil {
438427
s.t.Log(err)
@@ -462,14 +451,13 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
462451
e2e.RequireEnvCanceled(s.t, env)
463452
}
464453

465-
func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
454+
func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles() {
466455
tc := e2e.NewTemporalClient(s.t)
467456

468457
srcTableName := s.attachSchemaSuffix("test_nans_bq")
469458
dstTableName := "test_nans_bq"
470-
_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
471-
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 double precision,c2 double precision[],c3 numeric);
472-
`, srcTableName))
459+
_, err := s.Conn().Exec(s.t.Context(), fmt.Sprintf(
460+
`CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 double precision,c2 double precision[],c3 numeric)`, srcTableName))
473461
require.NoError(s.t, err)
474462

475463
connectionGen := e2e.FlowConnectionGenerationConfig{
@@ -487,9 +475,8 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
487475
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)
488476

489477
// test inserting various types
490-
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(`
491-
INSERT INTO %s SELECT 2, 'NaN'::double precision, '{NaN, Infinity, -Infinity}', 'NaN'::numeric;
492-
`, srcTableName))
478+
_, err = s.Conn().Exec(s.t.Context(), fmt.Sprintf(
479+
`INSERT INTO %s SELECT 2, 'NaN'::double precision, '{NaN, Infinity, -Infinity}', 'NaN'::numeric`, srcTableName))
493480
e2e.EnvNoError(s.t, env, err)
494481

495482
e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize weird floats", func() bool {
@@ -509,7 +496,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
509496
e2e.RequireEnvCanceled(s.t, env)
510497
}
511498

512-
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
499+
func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_Avro_CDC() {
513500
tc := e2e.NewTemporalClient(s.t)
514501

515502
srcTableName := s.attachSchemaSuffix("test_invalid_geo_bq_avro_cdc")
@@ -586,7 +573,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Invalid_Geo_BQ_Avro_CDC() {
586573
e2e.RequireEnvCanceled(s.t, env)
587574
}
588575

589-
func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
576+
func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table() {
590577
tc := e2e.NewTemporalClient(s.t)
591578

592579
srcTable1Name := s.attachSchemaSuffix("test1_bq")
@@ -640,7 +627,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_BQ() {
640627

641628
// TODO: not checking schema exactly
642629
// write a GetTableSchemaConnector for BQ to enable generic_test
643-
func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
630+
func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes() {
644631
tc := e2e.NewTemporalClient(s.t)
645632

646633
tableName := "test_simple_schema_changes"
@@ -718,7 +705,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Simple_Schema_Changes_BQ() {
718705
e2e.RequireEnvCanceled(s.t, env)
719706
}
720707

721-
func (s PeerFlowE2ETestSuiteBQ) Test_All_Types_Schema_Changes_BQ() {
708+
func (s PeerFlowE2ETestSuiteBQ) Test_All_Types_Schema_Changes() {
722709
tc := e2e.NewTemporalClient(s.t)
723710

724711
tableName := "test_all_types_schema_changes"
@@ -783,7 +770,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_All_Types_Schema_Changes_BQ() {
783770
e2e.RequireEnvCanceled(s.t, env)
784771
}
785772

786-
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
773+
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey() {
787774
tc := e2e.NewTemporalClient(s.t)
788775

789776
tableName := "test_simple_cpkey"
@@ -838,7 +825,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_BQ() {
838825
e2e.RequireEnvCanceled(s.t, env)
839826
}
840827

841-
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
828+
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1() {
842829
tc := e2e.NewTemporalClient(s.t)
843830

844831
srcTableName := s.attachSchemaSuffix("test_cpkey_toast1")
@@ -898,7 +885,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_1_BQ() {
898885
e2e.RequireEnvCanceled(s.t, env)
899886
}
900887

901-
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
888+
func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2() {
902889
tc := e2e.NewTemporalClient(s.t)
903890

904891
tableName := "test_cpkey_toast2"
@@ -952,7 +939,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Composite_PKey_Toast_2_BQ() {
952939
e2e.RequireEnvCanceled(s.t, env)
953940
}
954941

955-
func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
942+
func (s PeerFlowE2ETestSuiteBQ) Test_Columns() {
956943
tc := e2e.NewTemporalClient(s.t)
957944

958945
srcTableName := s.attachSchemaSuffix("test_peerdb_cols")
@@ -999,7 +986,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Columns_BQ() {
999986
e2e.RequireEnvCanceled(s.t, env)
1000987
}
1001988

1002-
func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset_BQ() {
989+
func (s PeerFlowE2ETestSuiteBQ) Test_Multi_Table_Multi_Dataset() {
1003990
tc := e2e.NewTemporalClient(s.t)
1004991

1005992
srcTable1Name := s.attachSchemaSuffix("test1_bq")
@@ -1339,7 +1326,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Soft_Delete_Insert_After_Delete() {
13391326
require.Equal(s.t, int64(0), numNewRows)
13401327
}
13411328

1342-
func (s PeerFlowE2ETestSuiteBQ) Test_JSON_PKey_BQ() {
1329+
func (s PeerFlowE2ETestSuiteBQ) Test_JSON_PKey() {
13431330
srcTableName := s.attachSchemaSuffix("test_json_pkey_bq")
13441331
dstTableName := "test_json_pkey_bq"
13451332

0 commit comments

Comments
 (0)