Skip to content

Commit f583cea

Browse files
authored
feat: Support CQ ID on the source only (#1461)
This change updates the `MigrateMessage` so that only the `_cq_id` will be a PK and it will not have a Unique constraint on it
1 parent a1857c3 commit f583cea

File tree

2 files changed

+96
-4
lines changed

2 files changed

+96
-4
lines changed

scheduler/scheduler.go

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,28 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
188188
}
189189

190190
// send migrate messages first
191-
for _, table := range tables.FlattenTables() {
191+
for _, tableOriginal := range tables.FlattenTables() {
192+
// var table *schema.Table
193+
table := tableOriginal.Copy(nil)
194+
if syncClient.deterministicCQID {
195+
// No PK adjustment should occur if `_cq_id` is not present in the table
196+
cqIDCol := table.Columns.Get(schema.CqIDColumn.Name)
197+
if cqIDCol == nil {
198+
continue
199+
}
200+
for i, c := range table.Columns {
201+
if c.Name == schema.CqIDColumn.Name {
202+
// Ensure that the cq_id column is the primary key
203+
table.Columns[i].PrimaryKey = true
204+
continue
205+
}
206+
if !c.PrimaryKey {
207+
continue
208+
}
209+
table.Columns[i].PrimaryKey = false
210+
}
211+
}
212+
192213
res <- &message.SyncMigrateTable{
193214
Table: table,
194215
}

scheduler/scheduler_test.go

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,21 @@ func testTableSuccessWithPK() *schema.Table {
8686
}
8787
}
8888

89+
func testTableSuccessWithCQIDPK() *schema.Table {
90+
return &schema.Table{
91+
Name: "test_table_success",
92+
Resolver: testResolverSuccess,
93+
Columns: []schema.Column{
94+
schema.CqIDColumn,
95+
{
96+
Name: "test_column",
97+
Type: arrow.PrimitiveTypes.Int64,
98+
PrimaryKey: true,
99+
},
100+
},
101+
}
102+
}
103+
89104
func testTableResolverPanic() *schema.Table {
90105
return &schema.Table{
91106
Name: "test_table_resolver_panic",
@@ -233,6 +248,28 @@ var syncTestCases = []syncTestCase{
233248
},
234249
// deterministicCQID: true,
235250
},
251+
{
252+
table: testTableSuccessWithCQIDPK(),
253+
data: []scalar.Vector{
254+
{
255+
// This value will be validated because deterministicCQID is true
256+
&scalar.UUID{Value: [16]byte{194, 83, 85, 170, 181, 44, 91, 112, 164, 224, 201, 153, 31, 90, 59, 135}, Valid: true},
257+
&scalar.Int{Value: 3, Valid: true},
258+
},
259+
},
260+
deterministicCQID: true,
261+
},
262+
{
263+
table: testTableSuccessWithCQIDPK(),
264+
data: []scalar.Vector{
265+
{
266+
// This value will not be validated as it will be randomly set by the scheduler
267+
&scalar.UUID{},
268+
&scalar.Int{Value: 3, Valid: true},
269+
},
270+
},
271+
deterministicCQID: false,
272+
},
236273
}
237274

238275
func TestScheduler(t *testing.T) {
@@ -323,9 +360,10 @@ func TestScheduler_Cancellation(t *testing.T) {
323360
}
324361
}
325362

363+
// nolint:revive
326364
func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, deterministicCQID bool) {
327365
ctx := context.Background()
328-
tables := []*schema.Table{}
366+
var tables schema.Tables
329367
if tc.table != nil {
330368
tables = append(tables, tc.table)
331369
}
@@ -349,11 +387,44 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist
349387
record := v.Record
350388
rec := tc.data[i].ToArrowRecord(record.Schema())
351389
if !array.RecordEqual(rec, record) {
352-
t.Fatalf("expected at i=%d: %v. got %v", i, tc.data[i], record)
390+
// For records that include CqIDColumn, we can't verify equality because it is generated by the scheduler, unless deterministicCQID is true
391+
onlyCqIDInequality := false
392+
for col := range rec.Columns() {
393+
if !deterministicCQID && rec.ColumnName(col) == schema.CqIDColumn.Name {
394+
onlyCqIDInequality = true
395+
continue
396+
}
397+
lc := rec.Column(col)
398+
rc := record.Column(col)
399+
if !array.Equal(lc, rc) {
400+
onlyCqIDInequality = false
401+
}
402+
}
403+
if !onlyCqIDInequality {
404+
t.Fatalf("expected at i=%d: %v. got %v", i, tc.data[i], record)
405+
}
353406
}
354407
i++
355408
case *message.SyncMigrateTable:
356-
// ignore
409+
migratedTable := v.Table
410+
411+
initialTable := tables.Get(v.Table.Name)
412+
413+
pks := migratedTable.PrimaryKeys()
414+
if deterministicCQID {
415+
if len(pks) != 1 {
416+
t.Fatalf("expected 1 pk. got %d", len(pks))
417+
}
418+
if pks[0] != schema.CqIDColumn.Name {
419+
t.Fatalf("expected pk name %s. got %s", schema.CqIDColumn.Name, pks[0])
420+
}
421+
} else if len(pks) != len(initialTable.PrimaryKeys()) {
422+
t.Fatalf("expected 0 pk. got %d", len(pks))
423+
}
424+
425+
if len(pks) == 0 {
426+
continue
427+
}
357428
default:
358429
t.Fatalf("expected insert message. got %T", msg)
359430
}

0 commit comments

Comments
 (0)