Skip to content

Commit 61d98c9

Browse files
authored
fix(destination): Don't duplicate tables to be removed (#886)
BEGIN_COMMIT_OVERRIDE fix(destination): Don't duplicate tables to be removed (#886) fix(arrow): `schema.Table` <-> `arrow.Schema` conversion (#886) END_COMMIT_OVERRIDE
1 parent f4aa5bc commit 61d98c9

File tree

3 files changed

+31
-21
lines changed

3 files changed

+31
-21
lines changed

plugins/destination/plugin.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,7 @@ func (p *Plugin) Write(ctx context.Context, sourceSpec specs.Source, tables sche
269269
if p.spec.WriteMode == specs.WriteModeOverwriteDeleteStale {
270270
tablesToDelete := tables
271271
if sourceSpec.Backend != specs.BackendNone {
272+
tablesToDelete = make(schema.Tables, 0, len(tables))
272273
for _, t := range tables {
273274
if !t.IsIncremental {
274275
tablesToDelete = append(tablesToDelete, t)

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -89,10 +89,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
8989
StableUUID: u,
9090
MaxRows: 1,
9191
}
92-
updatedResources := schema.GenTestData(table, opts)[0]
92+
updatedResources := schema.GenTestData(table, opts)
93+
updatedIncResources := schema.GenTestData(incTable, opts)
94+
allUpdatedResources := updatedResources
95+
allUpdatedResources = append(allUpdatedResources, updatedIncResources...)
9396

94-
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {
95-
return fmt.Errorf("failed to write one second time: %w", err)
97+
if err := p.writeAll(ctx, sourceSpec, secondSyncTime, allUpdatedResources); err != nil {
98+
return fmt.Errorf("failed to write all second time: %w", err)
9699
}
97100

98101
resourcesRead, err = p.readAll(ctx, table, sourceName)
@@ -108,7 +111,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
108111
return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff)
109112
}
110113

111-
resourcesRead, err = p.readAll(ctx, tables[0], sourceName)
114+
resourcesRead, err = p.readAll(ctx, table, sourceName)
112115
if err != nil {
113116
return fmt.Errorf("failed to read all second time: %w", err)
114117
}
@@ -117,19 +120,19 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
117120
}
118121

119122
// we expect the only resource returned to match the updated resource we wrote
120-
if !array.RecordApproxEqual(updatedResources, resourcesRead[0]) {
121-
diff := RecordDiff(updatedResources, resourcesRead[0])
123+
if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) {
124+
diff := RecordDiff(updatedResources[0], resourcesRead[0])
122125
return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff)
123126
}
124127

125-
// we expect the incremental table to still have 2 resources, because delete-stale should
128+
// we expect the incremental table to still have 3 resources, because delete-stale should
126129
// not apply there
127-
resourcesRead, err = p.readAll(ctx, tables[1], sourceName)
130+
resourcesRead, err = p.readAll(ctx, incTable, sourceName)
128131
if err != nil {
129132
return fmt.Errorf("failed to read all from incremental table: %w", err)
130133
}
131-
if len(resourcesRead) != 2 {
132-
return fmt.Errorf("expected 2 resources in incremental table after delete-stale, got %d", len(resourcesRead))
134+
if len(resourcesRead) != 3 {
135+
return fmt.Errorf("expected 3 resources in incremental table after delete-stale, got %d", len(resourcesRead))
133136
}
134137

135138
return nil

schema/table.go

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -116,21 +116,20 @@ func NewTableFromArrowSchema(sc *arrow.Schema) (*Table, error) {
116116
return nil, fmt.Errorf("missing table name")
117117
}
118118
description, _ := tableMD.GetValue(MetadataTableDescription)
119+
constraintName, _ := tableMD.GetValue(MetadataConstraintName)
119120
fields := sc.Fields()
120121
columns := make(ColumnList, len(fields))
121122
for i, field := range fields {
122123
columns[i] = NewColumnFromArrowField(field)
123124
}
124125
table := &Table{
125-
Name: name,
126-
Description: description,
127-
Columns: columns,
126+
Name: name,
127+
Description: description,
128+
PkConstraintName: constraintName,
129+
Columns: columns,
128130
}
129-
if constraintName, found := tableMD.GetValue(MetadataConstraintName); found {
130-
table.PkConstraintName = constraintName
131-
}
132-
if title, found := tableMD.GetValue(MetadataIncremental); found {
133-
table.Title = title
131+
if isIncremental, found := tableMD.GetValue(MetadataIncremental); found {
132+
table.IsIncremental = isIncremental == MetadataTrue
134133
}
135134
return table, nil
136135
}
@@ -369,9 +368,16 @@ func (t *Table) PrimaryKeysIndexes() []int {
369368

370369
func (t *Table) ToArrowSchema() *arrow.Schema {
371370
fields := make([]arrow.Field, len(t.Columns))
372-
schemaMd := arrow.MetadataFrom(map[string]string{
373-
MetadataTableName: t.Name,
374-
})
371+
md := map[string]string{
372+
MetadataTableName: t.Name,
373+
MetadataTableDescription: t.Description,
374+
MetadataConstraintName: t.PkConstraintName,
375+
MetadataIncremental: MetadataFalse,
376+
}
377+
if t.IsIncremental {
378+
md[MetadataIncremental] = MetadataTrue
379+
}
380+
schemaMd := arrow.MetadataFrom(md)
375381
for i, c := range t.Columns {
376382
fields[i] = c.ToArrowField()
377383
}

0 commit comments

Comments
 (0)