Skip to content

Commit 86e717a

Browse files
authored
fix: Change testdata.Generate signature (#1153)
Part of cloudquery/cloudquery#13153 This changes the semantics of `testdata.Generate` from `generate opts.MaxRows` records to `generate a single record with opts.MaxRows rows`
1 parent 6180bbc commit 86e717a

File tree

7 files changed

+102
-52
lines changed

7 files changed

+102
-52
lines changed

plugin/diff.go

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,25 +6,40 @@ import (
66

77
"github.com/apache/arrow/go/v13/arrow"
88
"github.com/apache/arrow/go/v13/arrow/array"
9+
"github.com/apache/arrow/go/v13/arrow/memory"
910
)
1011

11-
func RecordDiff(l, r arrow.Record) string {
12-
if array.RecordApproxEqual(l, r, array.WithUnorderedMapKeys(true)) {
12+
func RecordsDiff(sc *arrow.Schema, l, r []arrow.Record) string {
13+
return TableDiff(array.NewTableFromRecords(sc, l), array.NewTableFromRecords(sc, r))
14+
}
15+
16+
func TableDiff(l, r arrow.Table) string {
17+
if array.TableApproxEqual(l, r, array.WithUnorderedMapKeys(true)) {
1318
return ""
1419
}
15-
var sb strings.Builder
20+
1621
if l.NumCols() != r.NumCols() {
1722
return fmt.Sprintf("different number of columns: %d vs %d", l.NumCols(), r.NumCols())
1823
}
1924
if l.NumRows() != r.NumRows() {
2025
return fmt.Sprintf("different number of rows: %d vs %d", l.NumRows(), r.NumRows())
2126
}
27+
28+
var sb strings.Builder
2229
for i := 0; i < int(l.NumCols()); i++ {
23-
edits, err := array.Diff(l.Column(i), r.Column(i))
30+
lCol, err := array.Concatenate(l.Column(i).Data().Chunks(), memory.DefaultAllocator)
31+
if err != nil {
32+
panic(fmt.Errorf("failed to concat left columns at idx %d: %w", i, err))
33+
}
34+
rCol, err := array.Concatenate(r.Column(i).Data().Chunks(), memory.DefaultAllocator)
35+
if err != nil {
36+
panic(fmt.Errorf("failed to concat right columns at idx %d: %w", i, err))
37+
}
38+
edits, err := array.Diff(lCol, rCol)
2439
if err != nil {
25-
panic(fmt.Sprintf("left: %v, right: %v, error: %v", l.Column(i).DataType(), r.Column(i).DataType(), err))
40+
panic(fmt.Errorf("left: %v, right: %v, error: %w", lCol.DataType(), rCol.DataType(), err))
2641
}
27-
diff := edits.UnifiedDiff(l.Column(i), r.Column(i))
42+
diff := edits.UnifiedDiff(lCol, rCol)
2843
if diff != "" {
2944
sb.WriteString(l.Schema().Field(i).Name)
3045
sb.WriteString(": ")

plugin/nulls_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,14 @@
11
package plugin
22

33
import (
4+
"testing"
5+
"time"
6+
47
"github.com/apache/arrow/go/v13/arrow"
58
"github.com/apache/arrow/go/v13/arrow/array"
69
"github.com/cloudquery/plugin-sdk/v4/schema"
710
"github.com/stretchr/testify/assert"
811
"github.com/stretchr/testify/require"
9-
"testing"
10-
"time"
1112
)
1213

1314
func TestWithTestIgnoreNullsInLists(t *testing.T) {
@@ -20,7 +21,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) {
2021
SyncTime: time.Now(),
2122
MaxRows: 100,
2223
NullRows: false,
23-
})[0])
24+
}))
2425
for _, c := range resource.Columns() {
2526
assertNoNullsInLists(t, c)
2627
}
@@ -30,7 +31,7 @@ func TestWithTestIgnoreNullsInLists(t *testing.T) {
3031
SyncTime: time.Now(),
3132
MaxRows: 100,
3233
NullRows: true,
33-
})[0])
34+
}))
3435
for _, c := range resource.Columns() {
3536
assertNoNullsInLists(t, c)
3637
}
@@ -65,7 +66,7 @@ func TestWithTestSourceAllowNull(t *testing.T) {
6566
SyncTime: time.Now(),
6667
MaxRows: 100,
6768
NullRows: false,
68-
})[0])
69+
}))
6970
for _, c := range resource.Columns() {
7071
assertNoNulls(t, s.allowNull, c)
7172
}
@@ -75,7 +76,7 @@ func TestWithTestSourceAllowNull(t *testing.T) {
7576
SyncTime: time.Now(),
7677
MaxRows: 100,
7778
NullRows: true,
78-
})[0])
79+
}))
7980
for _, c := range resource.Columns() {
8081
assertNoNulls(t, s.allowNull, c)
8182
}

plugin/testing_upsert.go

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -61,13 +61,14 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error {
6161
if totalItems != 1 {
6262
return fmt.Errorf("expected 1 item, got %d", totalItems)
6363
}
64-
if diff := RecordDiff(records[0], record); diff != "" {
64+
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" {
6565
return fmt.Errorf("record differs: %s", diff)
6666
}
6767
return nil
6868
}
6969

7070
func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
71+
const rowsPerRecord = 10
7172
tableName := s.tableNameForTest("upsert_all")
7273
table := schema.TestTable(tableName, s.genDatOptions)
7374
table.Columns = append(table.Columns, schema.Column{Name: "name", Type: arrow.BinaryTypes.String, PrimaryKey: true})
@@ -78,7 +79,10 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
7879
}
7980

8081
tg := schema.NewTestDataGenerator()
81-
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0]
82+
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
83+
MaxRows: rowsPerRecord,
84+
TimePrecision: s.genDatOptions.TimePrecision,
85+
})
8286
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
8387
Record: normalRecord,
8488
}); err != nil {
@@ -91,15 +95,15 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
9195
return fmt.Errorf("failed to readAll: %w", err)
9296
}
9397
totalItems := TotalRows(records)
94-
if totalItems != 1 {
95-
return fmt.Errorf("expected 1 item, got %d", totalItems)
98+
if totalItems != rowsPerRecord {
99+
return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems)
96100
}
97101

98-
if diff := RecordDiff(records[0], normalRecord); diff != "" {
102+
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{normalRecord}); diff != "" {
99103
return fmt.Errorf("record differs after insert: %s", diff)
100104
}
101105

102-
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0]
106+
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 10, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})
103107
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
104108
Record: nullRecord,
105109
}); err != nil {
@@ -113,11 +117,11 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
113117
}
114118

115119
totalItems = TotalRows(records)
116-
if totalItems != 1 {
117-
return fmt.Errorf("expected 1 item, got %d", totalItems)
120+
if totalItems != rowsPerRecord {
121+
return fmt.Errorf("expected items: %d, got %d", rowsPerRecord, totalItems)
118122
}
119123

120-
if diff := RecordDiff(records[0], nullRecord); diff != "" {
124+
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{nullRecord}); diff != "" {
121125
return fmt.Errorf("record differs after upsert (columns should be null): %s", diff)
122126
}
123127

plugin/testing_write_delete.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"time"
77

8+
"github.com/apache/arrow/go/v13/arrow"
89
"github.com/apache/arrow/go/v13/arrow/array"
910
"github.com/apache/arrow/go/v13/arrow/memory"
1011
"github.com/cloudquery/plugin-sdk/v4/message"
@@ -70,7 +71,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error {
7071
if totalItems != 1 {
7172
return fmt.Errorf("expected 1 item, got %d", totalItems)
7273
}
73-
if diff := RecordDiff(records[0], record); diff != "" {
74+
if diff := RecordsDiff(table.ToArrowSchema(), records, []arrow.Record{record}); diff != "" {
7475
return fmt.Errorf("record differs: %s", diff)
7576
}
7677
return nil

plugin/testing_write_insert.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -73,17 +73,15 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error {
7373
return fmt.Errorf("expected 2 items, got %d", totalItems)
7474
}
7575

76-
if diff := RecordDiff(readRecords[0], record); diff != "" {
76+
if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{record, record}); diff != "" {
7777
return fmt.Errorf("record[0] differs: %s", diff)
7878
}
79-
if diff := RecordDiff(readRecords[1], record); diff != "" {
80-
return fmt.Errorf("record[1] differs: %s", diff)
81-
}
8279

8380
return nil
8481
}
8582

8683
func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
84+
const rowsPerRecord = 10
8785
tableName := s.tableNameForTest("insert_all")
8886
table := schema.TestTable(tableName, s.genDatOptions)
8987
if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{
@@ -92,7 +90,10 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
9290
return fmt.Errorf("failed to create table: %w", err)
9391
}
9492
tg := schema.NewTestDataGenerator()
95-
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision})[0]
93+
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
94+
MaxRows: rowsPerRecord,
95+
TimePrecision: s.genDatOptions.TimePrecision,
96+
})
9697
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
9798
Record: normalRecord,
9899
}); err != nil {
@@ -106,11 +107,15 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
106107
}
107108

108109
totalItems := TotalRows(readRecords)
109-
if totalItems != 1 {
110-
return fmt.Errorf("expected 1 item, got %d", totalItems)
110+
if totalItems != rowsPerRecord {
111+
return fmt.Errorf("items expected: %d, got: %d", rowsPerRecord, totalItems)
111112
}
112113

113-
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, TimePrecision: s.genDatOptions.TimePrecision, NullRows: true})[0]
114+
nullRecord := tg.Generate(table, schema.GenTestDataOptions{
115+
MaxRows: rowsPerRecord,
116+
TimePrecision: s.genDatOptions.TimePrecision,
117+
NullRows: true,
118+
})
114119
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
115120
Record: nullRecord,
116121
}); err != nil {
@@ -125,14 +130,11 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
125130
sortRecords(table, readRecords, "id")
126131

127132
totalItems = TotalRows(readRecords)
128-
if totalItems != 2 {
129-
return fmt.Errorf("expected 2 items, got %d", totalItems)
133+
if totalItems != 2*rowsPerRecord {
134+
return fmt.Errorf("items expected: %d, got: %d", 2*rowsPerRecord, totalItems)
130135
}
131-
if diff := RecordDiff(readRecords[0], normalRecord); diff != "" {
136+
if diff := RecordsDiff(table.ToArrowSchema(), readRecords, []arrow.Record{normalRecord, nullRecord}); diff != "" {
132137
return fmt.Errorf("record[0] differs: %s", diff)
133138
}
134-
if diff := RecordDiff(readRecords[1], nullRecord); diff != "" {
135-
return fmt.Errorf("record[1] differs: %s", diff)
136-
}
137139
return nil
138140
}

plugin/testing_write_migrate.go

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ func tableUUIDSuffix() string {
2121

2222
// nolint:revive
2323
func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, source *schema.Table, supportsSafeMigrate bool, writeOptionMigrateForce bool) error {
24+
const rowsPerRecord = 10
2425
if err := s.plugin.writeOne(ctx, &message.WriteMigrateTable{
2526
Table: source,
2627
MigrateForce: writeOptionMigrateForce,
@@ -33,11 +34,11 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
3334
opts := schema.GenTestDataOptions{
3435
SourceName: sourceName,
3536
SyncTime: syncTime,
36-
MaxRows: 1,
37+
MaxRows: rowsPerRecord,
3738
TimePrecision: s.genDatOptions.TimePrecision,
3839
}
3940
tg := schema.NewTestDataGenerator()
40-
resource1 := tg.Generate(source, opts)[0]
41+
resource1 := tg.Generate(source, opts)
4142
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
4243
Record: resource1,
4344
}); err != nil {
@@ -50,10 +51,10 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
5051
return fmt.Errorf("failed to sync: %w", err)
5152
}
5253
totalItems := TotalRows(records)
53-
if totalItems != 1 {
54-
return fmt.Errorf("expected 1 item, got %d", totalItems)
54+
if totalItems != rowsPerRecord {
55+
return fmt.Errorf("expected items: %d, got: %d", rowsPerRecord, totalItems)
5556
}
56-
if diff := RecordDiff(records[0], resource1); diff != "" {
57+
if diff := RecordsDiff(source.ToArrowSchema(), records, []arrow.Record{resource1}); diff != "" {
5758
return fmt.Errorf("first record differs from expectation: %s", diff)
5859
}
5960

@@ -64,7 +65,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
6465
return fmt.Errorf("failed to create table: %w", err)
6566
}
6667

67-
resource2 := tg.Generate(target, opts)[0]
68+
resource2 := tg.Generate(target, opts)
6869
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
6970
Record: resource2,
7071
}); err != nil {
@@ -78,12 +79,13 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
7879
}
7980
sortRecords(target, records, "id")
8081

82+
lastRow := resource2.NewSlice(resource2.NumRows()-1, resource2.NumRows())
8183
// if force migration is not required, we don't expect any items to be dropped (so there should be 2 items)
8284
if !writeOptionMigrateForce || supportsSafeMigrate {
83-
if err := expectRows(records, 2, resource2); err != nil {
84-
if writeOptionMigrateForce && TotalRows(records) == 1 {
85+
if err := expectRows(target.ToArrowSchema(), records, 2*rowsPerRecord, lastRow); err != nil {
86+
if writeOptionMigrateForce && TotalRows(records) == rowsPerRecord {
8587
// if force migration is required, we can also expect 1 item to be dropped
86-
return expectRows(records, 1, resource2)
88+
return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow)
8789
}
8890

8991
return err
@@ -92,7 +94,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
9294
return nil
9395
}
9496

95-
return expectRows(records, 1, resource2)
97+
return expectRows(target.ToArrowSchema(), records, rowsPerRecord, lastRow)
9698
}
9799

98100
// nolint:revive
@@ -235,12 +237,14 @@ func (s *WriterTestSuite) testMigrate(
235237
})
236238
}
237239

238-
func expectRows(records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error {
240+
func expectRows(sc *arrow.Schema, records []arrow.Record, expectTotal int64, expectedLast arrow.Record) error {
239241
totalItems := TotalRows(records)
240242
if totalItems != expectTotal {
241243
return fmt.Errorf("expected %d items, got %d", expectTotal, totalItems)
242244
}
243-
if diff := RecordDiff(records[totalItems-1], expectedLast); diff != "" {
245+
lastRecord := records[len(records)-1]
246+
lastRow := lastRecord.NewSlice(lastRecord.NumRows()-1, lastRecord.NumRows())
247+
if diff := RecordsDiff(sc, []arrow.Record{lastRow}, []arrow.Record{expectedLast}); diff != "" {
244248
return fmt.Errorf("record #%d differs from expectation: %s", totalItems, diff)
245249
}
246250
return nil

schema/testdata.go

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -212,10 +212,17 @@ func NewTestDataGenerator() *TestDataGenerator {
212212
}
213213
}
214214

215-
// GenTestData generates a slice of arrow.Records with the given schema and options.
216-
func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []arrow.Record {
217-
var records []arrow.Record
215+
// Generate will produce a single arrow.Record with the given schema and options.
216+
func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) arrow.Record {
218217
sc := table.ToArrowSchema()
218+
if opts.MaxRows == 0 {
219+
// We generate an empty record
220+
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
221+
defer bldr.Release()
222+
return bldr.NewRecord()
223+
}
224+
225+
var records []arrow.Record
219226
for j := 0; j < opts.MaxRows; j++ {
220227
tg.counter++
221228
bldr := array.NewRecordBuilder(memory.DefaultAllocator, sc)
@@ -245,7 +252,23 @@ func (tg *TestDataGenerator) Generate(table *Table, opts GenTestDataOptions) []a
245252
return strings.Compare(firstUUID, secondUUID) < 0
246253
})
247254
}
248-
return records
255+
256+
// now we have sorted 1-row-records. Transform them into a single record with opts.MaxRows rows
257+
columns := make([]arrow.Array, sc.NumFields())
258+
for n := 0; n < sc.NumFields(); n++ {
259+
arrs := make([]arrow.Array, len(records))
260+
for i := range arrs {
261+
arrs[i] = records[i].Column(n)
262+
}
263+
264+
concatenated, err := array.Concatenate(arrs, memory.DefaultAllocator)
265+
if err != nil {
266+
panic(fmt.Sprintf("failed to concatenate arrays: %v", err))
267+
}
268+
columns[n] = concatenated
269+
}
270+
271+
return array.NewRecord(sc, columns, -1)
249272
}
250273

251274
func (tg TestDataGenerator) getExampleJSON(colName string, dataType arrow.DataType, opts GenTestDataOptions) string {

0 commit comments

Comments
 (0)