Skip to content

Commit d177320

Browse files
authored
fix: When _cq_id SyncMigrateMessage not sent (#1489)
Fixed a case where no sync message would be sent
1 parent e399d15 commit d177320

File tree

3 files changed

+29
-26
lines changed

3 files changed

+29
-26
lines changed

scheduler/scheduler.go

Lines changed: 5 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -189,30 +189,13 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
189189

190190
// send migrate messages first
191191
for _, tableOriginal := range tables.FlattenTables() {
192-
// var table *schema.Table
193-
table := tableOriginal.Copy(nil)
194-
if syncClient.deterministicCQID {
195-
// No PK adjustment should occur if `_cq_id` is not present in the table
196-
cqIDCol := table.Columns.Get(schema.CqIDColumn.Name)
197-
if cqIDCol == nil {
198-
continue
199-
}
200-
for i, c := range table.Columns {
201-
if c.Name == schema.CqIDColumn.Name {
202-
// Ensure that the cq_id column is the primary key
203-
table.Columns[i].PrimaryKey = true
204-
continue
205-
}
206-
if !c.PrimaryKey {
207-
continue
208-
}
209-
table.Columns[i].PrimaryKey = false
210-
}
192+
migrateMessage := &message.SyncMigrateTable{
193+
Table: tableOriginal.Copy(nil),
211194
}
212-
213-
res <- &message.SyncMigrateTable{
214-
Table: table,
195+
if syncClient.deterministicCQID {
196+
schema.CqIDAsPK(migrateMessage.Table)
215197
}
198+
res <- migrateMessage
216199
}
217200

218201
resources := make(chan *schema.Resource)

scheduler/scheduler_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func testColumnResolverPanic(context.Context, schema.ClientMeta, *schema.Resourc
4545

4646
func testTableSuccessWithData(data []any) *schema.Table {
4747
return &schema.Table{
48-
Name: "test_table_success",
48+
Name: "test_table_success_with_data",
4949
Resolver: func(_ context.Context, _ schema.ClientMeta, _ *schema.Resource, res chan<- any) error {
5050
res <- data
5151
return nil
@@ -74,7 +74,7 @@ func testTableSuccess() *schema.Table {
7474

7575
func testTableSuccessWithPK() *schema.Table {
7676
return &schema.Table{
77-
Name: "test_table_success",
77+
Name: "test_table_success_pk",
7878
Resolver: testResolverSuccess,
7979
Columns: []schema.Column{
8080
{
@@ -88,7 +88,7 @@ func testTableSuccessWithPK() *schema.Table {
8888

8989
func testTableSuccessWithCQIDPK() *schema.Table {
9090
return &schema.Table{
91-
Name: "test_table_success",
91+
Name: "test_table_success_cq_id",
9292
Resolver: testResolverSuccess,
9393
Columns: []schema.Column{
9494
schema.CqIDColumn,
@@ -411,7 +411,7 @@ func testSyncTable(t *testing.T, tc syncTestCase, strategy Strategy, determinist
411411
initialTable := tables.Get(v.Table.Name)
412412

413413
pks := migratedTable.PrimaryKeys()
414-
if deterministicCQID {
414+
if deterministicCQID && initialTable.Columns.Get(schema.CqIDColumn.Name) != nil {
415415
if len(pks) != 1 {
416416
t.Fatalf("expected 1 pk. got %d", len(pks))
417417
}

schema/table.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,26 @@ func AddCqIDs(table *Table) {
124124
}
125125
}
126126

127+
// CqIDAsPK sets the cq_id column as primary key if it exists
128+
// and removes the primary key from all other columns
129+
func CqIDAsPK(t *Table) {
130+
cqIDCol := t.Columns.Get(CqIDColumn.Name)
131+
if cqIDCol == nil {
132+
return
133+
}
134+
for i, c := range t.Columns {
135+
if c.Name == CqIDColumn.Name {
136+
// Ensure that the cq_id column is the primary key
137+
t.Columns[i].PrimaryKey = true
138+
continue
139+
}
140+
if !c.PrimaryKey {
141+
continue
142+
}
143+
t.Columns[i].PrimaryKey = false
144+
}
145+
}
146+
127147
func NewTablesFromArrowSchemas(schemas []*arrow.Schema) (Tables, error) {
128148
tables := make(Tables, len(schemas))
129149
for i, schema := range schemas {

0 commit comments

Comments
 (0)