Skip to content

Commit bc3c251

Browse files
authored
feat: Refactor test options and allow skipping of nulls in lists (#892)
- Skipping of null values in lists when doing comparisons is necessary for BigQuery tests to pass right now, because BigQuery does not support null values in repeated columns. To keep the migration to Arrow backwards-compatible, we are opting to strip nulls in that case (for now), but we still want our automated tests to pass. - This also refactors the test options. Previously you had to call `destination.PluginTestSuiteRunner` with options from the `schema` package, which is a bit unusual and counter-intuitive, and makes it hard to extend with options that relate specifically to the test suite runner. This changes it so that the options passed in are from the `destination` package and passed through to the `schema` package. It's a breaking change, but this functionality hasn't been out for long, will be easy to update, and only affects tests, so I'm hoping users will forgive us for this one.
1 parent 6967929 commit bc3c251

File tree

10 files changed

+205
-122
lines changed

10 files changed

+205
-122
lines changed

internal/memdb/memdb_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ func TestOnWriteError(t *testing.T) {
112112
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
113113
t.Fatal(err)
114114
}
115-
table := schema.TestTable("test")
115+
table := schema.TestTable("test", schema.TestSourceOptions{})
116116
tables := schema.Tables{
117117
table,
118118
}
@@ -147,7 +147,7 @@ func TestOnWriteCtxCancelled(t *testing.T) {
147147
if err := p.Init(ctx, getTestLogger(t), specs.Destination{}); err != nil {
148148
t.Fatal(err)
149149
}
150-
table := schema.TestTable("test")
150+
table := schema.TestTable("test", schema.TestSourceOptions{})
151151
tables := schema.Tables{
152152
table,
153153
}

plugins/destination/plugin_testing.go

Lines changed: 79 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,13 +100,84 @@ func getTestLogger(t *testing.T) zerolog.Logger {
100100

101101
type NewPluginFunc func() *Plugin
102102

103-
func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testSourceOptions ...func(o *schema.TestSourceOptions)) {
103+
type PluginTestSuiteRunnerOptions struct {
104+
IgnoreNullsInLists bool // strip nulls from lists before checking equality. Destination setups that don't support nulls in lists should set this to true.
105+
schema.TestSourceOptions
106+
}
107+
108+
func WithTestIgnoreNullsInLists() func(o *PluginTestSuiteRunnerOptions) {
109+
return func(o *PluginTestSuiteRunnerOptions) {
110+
o.IgnoreNullsInLists = true
111+
}
112+
}
113+
114+
func WithTestSourceSkipLists() func(o *PluginTestSuiteRunnerOptions) {
115+
return func(o *PluginTestSuiteRunnerOptions) {
116+
o.SkipLists = true
117+
}
118+
}
119+
120+
func WithTestSourceSkipTimestamps() func(o *PluginTestSuiteRunnerOptions) {
121+
return func(o *PluginTestSuiteRunnerOptions) {
122+
o.SkipTimestamps = true
123+
}
124+
}
125+
126+
func WithTestSourceSkipDates() func(o *PluginTestSuiteRunnerOptions) {
127+
return func(o *PluginTestSuiteRunnerOptions) {
128+
o.SkipDates = true
129+
}
130+
}
131+
132+
func WithTestSourceSkipMaps() func(o *PluginTestSuiteRunnerOptions) {
133+
return func(o *PluginTestSuiteRunnerOptions) {
134+
o.SkipMaps = true
135+
}
136+
}
137+
138+
func WithTestSourceSkipStructs() func(o *PluginTestSuiteRunnerOptions) {
139+
return func(o *PluginTestSuiteRunnerOptions) {
140+
o.SkipStructs = true
141+
}
142+
}
143+
144+
func WithTestSourceSkipIntervals() func(o *PluginTestSuiteRunnerOptions) {
145+
return func(o *PluginTestSuiteRunnerOptions) {
146+
o.SkipIntervals = true
147+
}
148+
}
149+
150+
func WithTestSourceSkipDurations() func(o *PluginTestSuiteRunnerOptions) {
151+
return func(o *PluginTestSuiteRunnerOptions) {
152+
o.SkipDurations = true
153+
}
154+
}
155+
156+
func WithTestSourceSkipTimes() func(o *PluginTestSuiteRunnerOptions) {
157+
return func(o *PluginTestSuiteRunnerOptions) {
158+
o.SkipTimes = true
159+
}
160+
}
161+
162+
func WithTestSourceSkipLargeTypes() func(o *PluginTestSuiteRunnerOptions) {
163+
return func(o *PluginTestSuiteRunnerOptions) {
164+
o.SkipLargeTypes = true
165+
}
166+
}
167+
168+
func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testOptions ...func(o *PluginTestSuiteRunnerOptions)) {
104169
t.Helper()
105170
destSpec.Name = "testsuite"
106171

107172
suite := &PluginTestSuite{
108173
tests: tests,
109174
}
175+
176+
opts := PluginTestSuiteRunnerOptions{}
177+
for _, o := range testOptions {
178+
o(&opts)
179+
}
180+
110181
ctx := context.Background()
111182
logger := getTestLogger(t)
112183

@@ -117,7 +188,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
117188
}
118189
destSpec.Name = "test_write_overwrite"
119190
p := newPlugin()
120-
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
191+
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, opts); err != nil {
121192
t.Fatal(err)
122193
}
123194
if err := p.Close(ctx); err != nil {
@@ -132,7 +203,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
132203
}
133204
destSpec.Name = "test_write_overwrite_delete_stale"
134205
p := newPlugin()
135-
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
206+
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, opts); err != nil {
136207
t.Fatal(err)
137208
}
138209
if err := p.Close(ctx); err != nil {
@@ -148,7 +219,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
148219
destSpec.WriteMode = specs.WriteModeOverwrite
149220
destSpec.MigrateMode = specs.MigrateModeSafe
150221
destSpec.Name = "test_migrate_overwrite"
151-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
222+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts)
152223
})
153224

154225
t.Run("TestMigrateOverwriteForce", func(t *testing.T) {
@@ -159,7 +230,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
159230
destSpec.WriteMode = specs.WriteModeOverwrite
160231
destSpec.MigrateMode = specs.MigrateModeForced
161232
destSpec.Name = "test_migrate_overwrite_force"
162-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
233+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, opts)
163234
})
164235

165236
t.Run("TestWriteAppend", func(t *testing.T) {
@@ -169,7 +240,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
169240
}
170241
destSpec.Name = "test_write_append"
171242
p := newPlugin()
172-
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
243+
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, opts); err != nil {
173244
t.Fatal(err)
174245
}
175246
if err := p.Close(ctx); err != nil {
@@ -185,7 +256,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
185256
destSpec.WriteMode = specs.WriteModeAppend
186257
destSpec.MigrateMode = specs.MigrateModeSafe
187258
destSpec.Name = "test_migrate_append"
188-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
259+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts)
189260
})
190261

191262
t.Run("TestMigrateAppendForce", func(t *testing.T) {
@@ -196,7 +267,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
196267
destSpec.WriteMode = specs.WriteModeAppend
197268
destSpec.MigrateMode = specs.MigrateModeForced
198269
destSpec.Name = "test_migrate_append_force"
199-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
270+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, opts)
200271
})
201272
}
202273

plugins/destination/plugin_testing_migrate.go

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ func tableUUIDSuffix() string {
2121
return strings.ReplaceAll(uuid.NewString(), "-", "_")
2222
}
2323

24-
func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode) error {
24+
func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.Logger, spec specs.Destination, target *schema.Table, source *schema.Table, mode specs.MigrateMode, testOpts PluginTestSuiteRunnerOptions) error {
2525
if err := p.Init(ctx, logger, spec); err != nil {
2626
return fmt.Errorf("failed to init plugin: %w", err)
2727
}
@@ -49,10 +49,13 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
4949
return fmt.Errorf("failed to migrate existing table: %w", err)
5050
}
5151
opts.SyncTime = syncTime.Add(time.Second).UTC()
52-
resource2 := schema.GenTestData(target, opts)[0]
53-
if err := p.writeOne(ctx, sourceSpec, syncTime, resource2); err != nil {
52+
resource2 := schema.GenTestData(target, opts)
53+
if err := p.writeAll(ctx, sourceSpec, syncTime, resource2); err != nil {
5454
return fmt.Errorf("failed to write one after migration: %w", err)
5555
}
56+
if testOpts.IgnoreNullsInLists {
57+
stripNullsFromLists(resource2)
58+
}
5659

5760
resourcesRead, err := p.readAll(ctx, target, sourceName)
5861
if err != nil {
@@ -63,16 +66,16 @@ func testMigration(ctx context.Context, _ *testing.T, p *Plugin, logger zerolog.
6366
if len(resourcesRead) != 2 {
6467
return fmt.Errorf("expected 2 resources after write, got %d", len(resourcesRead))
6568
}
66-
if !array.RecordApproxEqual(resourcesRead[1], resource2) {
67-
diff := RecordDiff(resourcesRead[1], resource2)
69+
if !array.RecordApproxEqual(resourcesRead[1], resource2[0]) {
70+
diff := RecordDiff(resourcesRead[1], resource2[0])
6871
return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff)
6972
}
7073
} else {
7174
if len(resourcesRead) != 1 {
7275
return fmt.Errorf("expected 1 resource after write, got %d", len(resourcesRead))
7376
}
74-
if !array.RecordApproxEqual(resourcesRead[0], resource2) {
75-
diff := RecordDiff(resourcesRead[0], resource2)
77+
if !array.RecordApproxEqual(resourcesRead[0], resource2[0]) {
78+
diff := RecordDiff(resourcesRead[0], resource2[0])
7679
return fmt.Errorf("resource1 and resource2 are not equal. diff: %s", diff)
7780
}
7881
}
@@ -87,7 +90,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
8790
logger zerolog.Logger,
8891
spec specs.Destination,
8992
strategy MigrateStrategy,
90-
testSourceOptions ...func(o *schema.TestSourceOptions),
93+
testOpts PluginTestSuiteRunnerOptions,
9194
) {
9295
spec.BatchSize = 1
9396

@@ -119,7 +122,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
119122
}
120123

121124
p := newPlugin()
122-
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn); err != nil {
125+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumn, testOpts); err != nil {
123126
t.Fatalf("failed to migrate %s: %v", tableName, err)
124127
}
125128
if err := p.Close(ctx); err != nil {
@@ -153,7 +156,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
153156
{Name: "bool", Type: arrow.FixedWidthTypes.Boolean, NotNull: true},
154157
}}
155158
p := newPlugin()
156-
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull); err != nil {
159+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.AddColumnNotNull, testOpts); err != nil {
157160
t.Fatalf("failed to migrate add_column_not_null: %v", err)
158161
}
159162
if err := p.Close(ctx); err != nil {
@@ -186,7 +189,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
186189
}}
187190

188191
p := newPlugin()
189-
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn); err != nil {
192+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumn, testOpts); err != nil {
190193
t.Fatalf("failed to migrate remove_column: %v", err)
191194
}
192195
if err := p.Close(ctx); err != nil {
@@ -220,7 +223,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
220223
}}
221224

222225
p := newPlugin()
223-
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull); err != nil {
226+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.RemoveColumnNotNull, testOpts); err != nil {
224227
t.Fatalf("failed to migrate remove_column_not_null: %v", err)
225228
}
226229
if err := p.Close(ctx); err != nil {
@@ -254,7 +257,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
254257
}}
255258

256259
p := newPlugin()
257-
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn); err != nil {
260+
if err := testMigration(ctx, t, p, logger, spec, target, source, strategy.ChangeColumn, testOpts); err != nil {
258261
t.Fatalf("failed to migrate change_column: %v", err)
259262
}
260263
if err := p.Close(ctx); err != nil {
@@ -264,7 +267,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
264267

265268
t.Run("double_migration", func(t *testing.T) {
266269
tableName := "double_migration_" + tableUUIDSuffix()
267-
table := schema.TestTable(tableName, testSourceOptions...)
270+
table := schema.TestTable(tableName, testOpts.TestSourceOptions)
268271

269272
p := newPlugin()
270273
require.NoError(t, p.Init(ctx, logger, spec))

plugins/destination/plugin_testing_overwrite.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import (
1313
"github.com/rs/zerolog"
1414
)
1515

16-
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
16+
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error {
1717
spec.WriteMode = specs.WriteModeOverwrite
1818
if err := p.Init(ctx, logger, spec); err != nil {
1919
return fmt.Errorf("failed to init plugin: %w", err)
2020
}
2121
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
22-
table := schema.TestTable(tableName, testSourceOptions...)
22+
table := schema.TestTable(tableName, testOpts.TestSourceOptions)
2323
syncTime := time.Now().UTC().Round(1 * time.Second)
2424
tables := schema.Tables{
2525
table,
@@ -43,7 +43,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
4343
return fmt.Errorf("failed to write all: %w", err)
4444
}
4545
sortRecordsBySyncTime(table, resources)
46-
46+
if testOpts.IgnoreNullsInLists {
47+
stripNullsFromLists(resources)
48+
}
4749
resourcesRead, err := p.readAll(ctx, table, sourceName)
4850
if err != nil {
4951
return fmt.Errorf("failed to read all: %w", err)
@@ -75,12 +77,15 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
7577
MaxRows: 1,
7678
StableUUID: u,
7779
}
78-
updatedResource := schema.GenTestData(table, opts)[0]
80+
updatedResource := schema.GenTestData(table, opts)
7981
// write second time
80-
if err := p.writeOne(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
82+
if err := p.writeAll(ctx, sourceSpec, secondSyncTime, updatedResource); err != nil {
8183
return fmt.Errorf("failed to write one second time: %w", err)
8284
}
8385

86+
if testOpts.IgnoreNullsInLists {
87+
stripNullsFromLists(updatedResource)
88+
}
8489
resourcesRead, err = p.readAll(ctx, table, sourceName)
8590
if err != nil {
8691
return fmt.Errorf("failed to read all second time: %w", err)
@@ -94,8 +99,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
9499
diff := RecordDiff(resources[1], resourcesRead[0])
95100
return fmt.Errorf("after overwrite expected first resource to be equal. diff=%s", diff)
96101
}
97-
if !array.RecordApproxEqual(updatedResource, resourcesRead[1]) {
98-
diff := RecordDiff(updatedResource, resourcesRead[1])
102+
if !array.RecordApproxEqual(updatedResource[0], resourcesRead[1]) {
103+
diff := RecordDiff(updatedResource[0], resourcesRead[1])
99104
return fmt.Errorf("after overwrite expected second resource to be equal. diff=%s", diff)
100105
}
101106

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,14 @@ import (
1313
"github.com/rs/zerolog"
1414
)
1515

16-
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
16+
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testOpts PluginTestSuiteRunnerOptions) error {
1717
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
1818
if err := p.Init(ctx, logger, spec); err != nil {
1919
return fmt.Errorf("failed to init plugin: %w", err)
2020
}
2121
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
22-
table := schema.TestTable(tableName, testSourceOptions...)
23-
incTable := schema.TestTable(tableName+"_incremental", testSourceOptions...)
22+
table := schema.TestTable(tableName, testOpts.TestSourceOptions)
23+
incTable := schema.TestTable(tableName+"_incremental", testOpts.TestSourceOptions)
2424
incTable.IsIncremental = true
2525
syncTime := time.Now().UTC().Round(1 * time.Second)
2626
tables := schema.Tables{
@@ -60,6 +60,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
6060
if len(resourcesRead) != 2 {
6161
return fmt.Errorf("expected 2 resources, got %d", len(resourcesRead))
6262
}
63+
if testOpts.IgnoreNullsInLists {
64+
stripNullsFromLists(resources)
65+
}
6366
if !array.RecordApproxEqual(resources[0], resourcesRead[0]) {
6467
diff := RecordDiff(resources[0], resourcesRead[0])
6568
return fmt.Errorf("expected first resource to be equal. diff: %s", diff)
@@ -106,6 +109,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
106109
if len(resourcesRead) != 1 {
107110
return fmt.Errorf("after overwrite expected 1 resource, got %d", len(resourcesRead))
108111
}
112+
if testOpts.IgnoreNullsInLists {
113+
stripNullsFromLists(resources)
114+
}
109115
if array.RecordApproxEqual(resources[0], resourcesRead[0]) {
110116
diff := RecordDiff(resources[0], resourcesRead[0])
111117
return fmt.Errorf("after overwrite expected first resource to be different. diff: %s", diff)
@@ -120,6 +126,9 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
120126
}
121127

122128
// we expect the only resource returned to match the updated resource we wrote
129+
if testOpts.IgnoreNullsInLists {
130+
stripNullsFromLists(updatedResources)
131+
}
123132
if !array.RecordApproxEqual(updatedResources[0], resourcesRead[0]) {
124133
diff := RecordDiff(updatedResources[0], resourcesRead[0])
125134
return fmt.Errorf("after delete stale expected resource to be equal. diff: %s", diff)

0 commit comments

Comments
 (0)