Skip to content

Commit 28706f1

Browse files
authored
fix: Flatten V2 tables (#882)
A bit more footgun in old protocol.
1 parent 7d749b1 commit 28706f1

File tree

3 files changed

+17
-8
lines changed

3 files changed

+17
-8
lines changed

internal/servers/destination/v0/destinations.go

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ func (s *Server) Migrate(ctx context.Context, req *pb.Migrate_Request) (*pb.Migr
5959
if err := json.Unmarshal(req.Tables, &tablesV2); err != nil {
6060
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
6161
}
62-
tables := TablesV2ToV3(tablesV2)
62+
tables := TablesV2ToV3(tablesV2).FlattenTables()
6363
SetDestinationManagedCqColumns(tables)
6464
s.setPKsForTables(tables)
6565

@@ -97,7 +97,7 @@ func (s *Server) Write2(msg pb.Destination_Write2Server) error {
9797
return status.Errorf(codes.InvalidArgument, "failed to unmarshal source spec: %v", err)
9898
}
9999
}
100-
tables := TablesV2ToV3(tablesV2)
100+
tables := TablesV2ToV3(tablesV2).FlattenTables()
101101
syncTime := r.Timestamp.AsTime()
102102
SetDestinationManagedCqColumns(tables)
103103
s.setPKsForTables(tables)
@@ -201,12 +201,8 @@ func (s *Server) DeleteStale(ctx context.Context, req *pb.DeleteStale_Request) (
201201
if err := json.Unmarshal(req.Tables, &tablesV2); err != nil {
202202
return nil, status.Errorf(codes.InvalidArgument, "failed to unmarshal tables: %v", err)
203203
}
204-
tables := TablesV2ToV3(tablesV2)
204+
tables := TablesV2ToV3(tablesV2).FlattenTables()
205205
SetDestinationManagedCqColumns(tables)
206-
schemas := make(schemav2.Schemas, len(tables.FlattenTables()))
207-
for i, table := range tables.FlattenTables() {
208-
schemas[i] = table.ToArrowSchema()
209-
}
210206
if err := s.Plugin.DeleteStale(ctx, tables, req.Source, req.Timestamp.AsTime()); err != nil {
211207
return nil, err
212208
}

schema/table.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,16 @@ func (tt Tables) FlattenTables() Tables {
231231
tables = append(tables, t)
232232
tables = append(tables, t.Relations.FlattenTables()...)
233233
}
234-
return tables
234+
tableNames := make(map[string]bool)
235+
dedupedTables := make(Tables, 0, len(tables))
236+
for _, t := range tables {
237+
if _, found := tableNames[t.Name]; !found {
238+
dedupedTables = append(dedupedTables, t)
239+
tableNames[t.Name] = true
240+
}
241+
}
242+
243+
return dedupedTables
235244
}
236245

237246
func (tt Tables) TableNames() []string {

schema/table_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@ func TestTablesFlatten(t *testing.T) {
2323
if len(tables) != 2 {
2424
t.Fatal("expected 2 tables")
2525
}
26+
tables = Tables{testTable}.FlattenTables()
27+
if len(tables) != 2 {
28+
t.Fatal("expected 2 tables")
29+
}
2630
}
2731

2832
func TestTablesFilterDFS(t *testing.T) {

0 commit comments

Comments
 (0)