Skip to content

Commit b1437f1

Browse files
authored
fix(testing): Add sorting for testing dest migrations (#814)
This should fix some flakey tests for destination and also fix germlin - cloudquery/cloudquery#10188
1 parent 1320d32 commit b1437f1

File tree

6 files changed

+41
-25
lines changed

6 files changed

+41
-25
lines changed

plugins/destination/plugin_testing.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -198,9 +198,9 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
198198
})
199199
}
200200

201-
func sortRecordsBySyncTime(table *schema.Table, records []arrow.Record) {
202-
syncTimeIndex := table.Columns.Index(schema.CqSyncTimeColumn.Name)
203-
cqIDIndex := table.Columns.Index(schema.CqIDColumn.Name)
201+
func sortRecordsBySyncTime(table *arrow.Schema, records []arrow.Record) {
202+
syncTimeIndex := table.FieldIndices(schema.CqSyncTimeColumn.Name)[0]
203+
cqIDIndex := table.FieldIndices(schema.CqIDColumn.Name)[0]
204204
sort.Slice(records, func(i, j int) bool {
205205
// sort by sync time, then UUID
206206
first := records[i].Column(syncTimeIndex).(*array.Timestamp).Value(0).ToTime(arrow.Millisecond)

plugins/destination/plugin_testing_migrate.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
4848
if err := p.Migrate(ctx, []*arrow.Schema{target}); err != nil {
4949
return fmt.Errorf("failed to migrate existing table: %w", err)
5050
}
51+
opts.SyncTime = syncTime.Add(time.Second).UTC()
5152
resource2 := testdata.GenTestData(target, opts)[0]
5253
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
5354
return fmt.Errorf("failed to write one after migration: %w", err)
@@ -57,6 +58,7 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
5758
if err != nil {
5859
return fmt.Errorf("failed to read all: %w", err)
5960
}
61+
sortRecordsBySyncTime(target, resourcesRead)
6062
if mode == specs.MigrateModeSafe {
6163
if len(resourcesRead) != 2 {
6264
return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead))

plugins/destination/plugin_testing_overwrite.go

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77

88
"github.com/apache/arrow/go/v12/arrow"
99
"github.com/apache/arrow/go/v12/arrow/array"
10-
"github.com/cloudquery/plugin-sdk/v2/schema"
1110
"github.com/cloudquery/plugin-sdk/v2/specs"
1211
"github.com/cloudquery/plugin-sdk/v2/testdata"
1312
"github.com/cloudquery/plugin-sdk/v2/types"
@@ -21,10 +20,10 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
2120
return fmt.Errorf("failed to init plugin: %w", err)
2221
}
2322
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
24-
table := testdata.TestTable(tableName)
23+
table := testdata.TestTable(tableName).ToArrowSchema()
2524
syncTime := time.Now().UTC().Round(1 * time.Second)
2625
tables := []*arrow.Schema{
27-
table.ToArrowSchema(),
26+
table,
2827
}
2928
if err := p.Migrate(ctx, tables); err != nil {
3029
return fmt.Errorf("failed to migrate tables: %w", err)
@@ -40,13 +39,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
4039
SyncTime: syncTime,
4140
MaxRows: 2,
4241
}
43-
resources := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)
42+
resources := testdata.GenTestData(table, opts)
4443
if err := p.writeAll(ctx, sourceSpec, syncTime, resources); err != nil {
4544
return fmt.Errorf("failed to write all: %w", err)
4645
}
4746
sortRecordsBySyncTime(table, resources)
4847

49-
resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName)
48+
resourcesRead, err := p.readAll(ctx, table, sourceName)
5049
if err != nil {
5150
return fmt.Errorf("failed to read all: %w", err)
5251
}
@@ -76,13 +75,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
7675
MaxRows: 1,
7776
StableUUID: *u,
7877
}
79-
updatedResource := testdata.GenTestData(schema.CQSchemaToArrow(table), opts)[0]
78+
updatedResource := testdata.GenTestData(table, opts)[0]
8079
// write second time
8180
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
8281
return fmt.Errorf("failed to write one second time: %w", err)
8382
}
8483

85-
resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName)
84+
resourcesRead, err = p.readAll(ctx, table, sourceName)
8685
if err != nil {
8786
return fmt.Errorf("failed to read all second time: %w", err)
8887
}

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,12 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
2020
return fmt.Errorf("failed to init plugin: %w", err)
2121
}
2222
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
23-
table := testdata.TestTable(tableName)
24-
incTable := testdata.TestTable(tableName + "_incremental")
25-
incTable.IsIncremental = true
23+
table := testdata.TestTable(tableName).ToArrowSchema()
24+
incTable := testdata.TestTableIncremental(tableName + "_incremental").ToArrowSchema()
2625
syncTime := time.Now().UTC().Round(1 * time.Second)
2726
tables := []*arrow.Schema{
28-
table.ToArrowSchema(),
29-
incTable.ToArrowSchema(),
27+
table,
28+
incTable,
3029
}
3130
if err := p.Migrate(ctx, tables); err != nil {
3231
return fmt.Errorf("failed to migrate tables: %w", err)
@@ -43,16 +42,16 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
4342
SyncTime: syncTime,
4443
MaxRows: 2,
4544
}
46-
resources := testdata.GenTestData(table.ToArrowSchema(), opts)
47-
incResources := testdata.GenTestData(incTable.ToArrowSchema(), opts)
45+
resources := testdata.GenTestData(table, opts)
46+
incResources := testdata.GenTestData(incTable, opts)
4847
allResources := resources
4948
allResources = append(allResources, incResources...)
5049
if err := p.writeAll(ctx, sourceSpec, syncTime, allResources); err != nil {
5150
return fmt.Errorf("failed to write all: %w", err)
5251
}
5352
sortRecordsBySyncTime(table, resources)
5453

55-
resourcesRead, err := p.readAll(ctx, table.ToArrowSchema(), sourceName)
54+
resourcesRead, err := p.readAll(ctx, table, sourceName)
5655
if err != nil {
5756
return fmt.Errorf("failed to read all: %w", err)
5857
}
@@ -72,7 +71,7 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
7271
}
7372

7473
// read from incremental table
75-
resourcesRead, err = p.readAll(ctx, incTable.ToArrowSchema(), sourceName)
74+
resourcesRead, err = p.readAll(ctx, incTable, sourceName)
7675
if err != nil {
7776
return fmt.Errorf("failed to read all: %w", err)
7877
}
@@ -89,13 +88,13 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
8988
StableUUID: *u,
9089
MaxRows: 1,
9190
}
92-
updatedResources := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
91+
updatedResources := testdata.GenTestData(table, opts)[0]
9392

9493
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResources); err != nil {
9594
return fmt.Errorf("failed to write one second time: %w", err)
9695
}
9796

98-
resourcesRead, err = p.readAll(ctx, table.ToArrowSchema(), sourceName)
97+
resourcesRead, err = p.readAll(ctx, table, sourceName)
9998
if err != nil {
10099
return fmt.Errorf("failed to read all second time: %w", err)
101100
}

plugins/destination/plugin_testing_write_append.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
1919
return fmt.Errorf("failed to init plugin: %w", err)
2020
}
2121
tableName := spec.Name
22-
table := testdata.TestTable(tableName)
22+
table := testdata.TestTable(tableName).ToArrowSchema()
2323
syncTime := time.Now().UTC().Round(1 * time.Second)
2424
tables := []*arrow.Schema{
25-
table.ToArrowSchema(),
25+
table,
2626
}
2727
if err := p.Migrate(ctx, tables); err != nil {
2828
return fmt.Errorf("failed to migrate tables: %w", err)
@@ -38,14 +38,14 @@ func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context,
3838
SyncTime: syncTime,
3939
MaxRows: 1,
4040
}
41-
record1 := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
41+
record1 := testdata.GenTestData(table, opts)[0]
4242
if err := p.writeOne(ctx, specSource, syncTime, record1); err != nil {
4343
return fmt.Errorf("failed to write one second time: %w", err)
4444
}
4545

4646
secondSyncTime := syncTime.Add(10 * time.Second).UTC()
4747
opts.SyncTime = secondSyncTime
48-
record2 := testdata.GenTestData(table.ToArrowSchema(), opts)[0]
48+
record2 := testdata.GenTestData(table, opts)[0]
4949

5050
if !s.tests.SkipSecondAppend {
5151
// write second time

testdata/testdata.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@ package testdata
22

33
import (
44
"net"
5+
"sort"
6+
"strings"
57
"time"
68

79
"github.com/apache/arrow/go/v12/arrow"
@@ -109,6 +111,12 @@ func TestSourceTable(name string) *schema.Table {
109111
}
110112
}
111113

114+
func TestTableIncremental(name string) *schema.Table {
115+
t := TestTable(name)
116+
t.IsIncremental = true
117+
return t
118+
}
119+
112120
// TestTable returns a table with columns of all type. useful for destination testing purposes
113121
func TestTable(name string) *schema.Table {
114122
sourceTable := TestSourceTable(name)
@@ -243,6 +251,14 @@ func GenTestData(sc *arrow.Schema, opts GenTestDataOptions) []arrow.Record {
243251
records = append(records, bldr.NewRecord())
244252
bldr.Release()
245253
}
254+
if indices := sc.FieldIndices(schema.CqIDColumn.Name); len(indices) > 0 {
255+
cqIDIndex := indices[0]
256+
sort.Slice(records, func(i, j int) bool {
257+
firstUUID := records[i].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
258+
secondUUID := records[j].Column(cqIDIndex).(*types.UUIDArray).Value(0).String()
259+
return strings.Compare(firstUUID, secondUUID) < 0
260+
})
261+
}
246262
return records
247263
}
248264

0 commit comments

Comments
 (0)