Skip to content

Commit 28642ec

Browse files
authored
feat: Allow testing of more Arrow types (#863)
This adds support for testing (almost) the complete range of Arrow types in destinations. By default, only the types previously supported by CQTypes are enabled (with the important exception of more uint and int types than before, plus float32). The TestSourceOptions struct allows destinations to declare that they want to test with more column types, including maps, structs, times, etc. I have not included all the nullable column variations. These are a bit more difficult to do now that we are not using `arrow.Field` directly anymore, plus we can rather do that as a follow-up if and when it becomes necessary to test this. This PR is a rebased version for v3 that replaces #818.
1 parent 766fe62 commit 28642ec

File tree

6 files changed

+488
-224
lines changed

6 files changed

+488
-224
lines changed

plugins/destination/plugin_testing.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ func getTestLogger(t *testing.T) zerolog.Logger {
100100

101101
type NewPluginFunc func() *Plugin
102102

103-
func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests) {
103+
func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs.Destination, tests PluginTestSuiteTests, testSourceOptions ...func(o *schema.TestSourceOptions)) {
104104
t.Helper()
105105
destSpec.Name = "testsuite"
106106

@@ -117,7 +117,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
117117
}
118118
destSpec.Name = "test_write_overwrite"
119119
p := newPlugin()
120-
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec); err != nil {
120+
if err := suite.destinationPluginTestWriteOverwrite(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
121121
t.Fatal(err)
122122
}
123123
if err := p.Close(ctx); err != nil {
@@ -132,7 +132,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
132132
}
133133
destSpec.Name = "test_write_overwrite_delete_stale"
134134
p := newPlugin()
135-
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec); err != nil {
135+
if err := suite.destinationPluginTestWriteOverwriteDeleteStale(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
136136
t.Fatal(err)
137137
}
138138
if err := p.Close(ctx); err != nil {
@@ -148,7 +148,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
148148
destSpec.WriteMode = specs.WriteModeOverwrite
149149
destSpec.MigrateMode = specs.MigrateModeSafe
150150
destSpec.Name = "test_migrate_overwrite"
151-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
151+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
152152
})
153153

154154
t.Run("TestMigrateOverwriteForce", func(t *testing.T) {
@@ -159,7 +159,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
159159
destSpec.WriteMode = specs.WriteModeOverwrite
160160
destSpec.MigrateMode = specs.MigrateModeForced
161161
destSpec.Name = "test_migrate_overwrite_force"
162-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite)
162+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyOverwrite, testSourceOptions...)
163163
})
164164

165165
t.Run("TestWriteAppend", func(t *testing.T) {
@@ -169,7 +169,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
169169
}
170170
destSpec.Name = "test_write_append"
171171
p := newPlugin()
172-
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec); err != nil {
172+
if err := suite.destinationPluginTestWriteAppend(ctx, p, logger, destSpec, testSourceOptions...); err != nil {
173173
t.Fatal(err)
174174
}
175175
if err := p.Close(ctx); err != nil {
@@ -185,7 +185,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
185185
destSpec.WriteMode = specs.WriteModeAppend
186186
destSpec.MigrateMode = specs.MigrateModeSafe
187187
destSpec.Name = "test_migrate_append"
188-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
188+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
189189
})
190190

191191
t.Run("TestMigrateAppendForce", func(t *testing.T) {
@@ -196,7 +196,7 @@ func PluginTestSuiteRunner(t *testing.T, newPlugin NewPluginFunc, destSpec specs
196196
destSpec.WriteMode = specs.WriteModeAppend
197197
destSpec.MigrateMode = specs.MigrateModeForced
198198
destSpec.Name = "test_migrate_append_force"
199-
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend)
199+
suite.destinationPluginTestMigrate(ctx, t, newPlugin, logger, destSpec, tests.MigrateStrategyAppend, testSourceOptions...)
200200
})
201201
}
202202

plugins/destination/plugin_testing_migrate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
8787
logger zerolog.Logger,
8888
spec specs.Destination,
8989
strategy MigrateStrategy,
90+
testSourceOptions ...func(o *schema.TestSourceOptions),
9091
) {
9192
spec.BatchSize = 1
9293

@@ -260,7 +261,7 @@ func (*PluginTestSuite) destinationPluginTestMigrate(
260261

261262
t.Run("double_migration", func(t *testing.T) {
262263
tableName := "double_migration_" + tableUUIDSuffix()
263-
table := schema.TestTable(tableName)
264+
table := schema.TestTable(tableName, testSourceOptions...)
264265

265266
p := newPlugin()
266267
require.NoError(t, p.Init(ctx, logger, spec))

plugins/destination/plugin_testing_overwrite.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,13 @@ import (
1313
"github.com/rs/zerolog"
1414
)
1515

16-
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
16+
func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
1717
spec.WriteMode = specs.WriteModeOverwrite
1818
if err := p.Init(ctx, logger, spec); err != nil {
1919
return fmt.Errorf("failed to init plugin: %w", err)
2020
}
2121
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
22-
table := schema.TestTable(tableName)
22+
table := schema.TestTable(tableName, testSourceOptions...)
2323
syncTime := time.Now().UTC().Round(1 * time.Second)
2424
tables := schema.Tables{
2525
table,
@@ -67,7 +67,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwrite(ctx context.Context,
6767
secondSyncTime := syncTime.Add(time.Second).UTC()
6868

6969
// copy first resource but update the sync time
70-
u := resources[0].Column(2).(*types.UUIDArray).Value(0)
70+
cqIDInds := resources[0].Schema().FieldIndices(schema.PKColumnNames[0])
71+
u := resources[0].Column(cqIDInds[0]).(*types.UUIDArray).Value(0)
7172
opts = schema.GenTestDataOptions{
7273
SourceName: sourceName,
7374
SyncTime: secondSyncTime,

plugins/destination/plugin_testing_overwrite_delete_stale.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,15 @@ import (
1313
"github.com/rs/zerolog"
1414
)
1515

16-
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
16+
func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
1717
spec.WriteMode = specs.WriteModeOverwriteDeleteStale
1818
if err := p.Init(ctx, logger, spec); err != nil {
1919
return fmt.Errorf("failed to init plugin: %w", err)
2020
}
2121
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
22-
table := schema.TestTable(tableName)
23-
incTable := schema.TestTableIncremental(tableName + "_incremental")
22+
table := schema.TestTable(tableName, testSourceOptions...)
23+
incTable := schema.TestTable(tableName+"_incremental", testSourceOptions...)
24+
incTable.IsIncremental = true
2425
syncTime := time.Now().UTC().Round(1 * time.Second)
2526
tables := schema.Tables{
2627
table,
@@ -80,7 +81,8 @@ func (*PluginTestSuite) destinationPluginTestWriteOverwriteDeleteStale(ctx conte
8081

8182
secondSyncTime := syncTime.Add(time.Second).UTC()
8283
// copy first resource but update the sync time
83-
u := resources[0].Column(2).(*types.UUIDArray).Value(0)
84+
cqIDInds := resources[0].Schema().FieldIndices(schema.CqIDColumn.Name)
85+
u := resources[0].Column(cqIDInds[0]).(*types.UUIDArray).Value(0)
8486
opts = schema.GenTestDataOptions{
8587
SourceName: sourceName,
8688
SyncTime: secondSyncTime,

plugins/destination/plugin_testing_write_append.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,13 @@ import (
1212
"github.com/rs/zerolog"
1313
)
1414

15-
func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination) error {
15+
func (s *PluginTestSuite) destinationPluginTestWriteAppend(ctx context.Context, p *Plugin, logger zerolog.Logger, spec specs.Destination, testSourceOptions ...func(o *schema.TestSourceOptions)) error {
1616
spec.WriteMode = specs.WriteModeAppend
1717
if err := p.Init(ctx, logger, spec); err != nil {
1818
return fmt.Errorf("failed to init plugin: %w", err)
1919
}
2020
tableName := fmt.Sprintf("cq_%s_%d", spec.Name, time.Now().Unix())
21-
table := schema.TestTable(tableName)
21+
table := schema.TestTable(tableName, testSourceOptions...)
2222
syncTime := time.Now().UTC().Round(1 * time.Second)
2323
tables := schema.Tables{
2424
table,

0 commit comments

Comments
 (0)