Skip to content

Commit 88f08ee

Browse files
authored
fix: Handle null-related test options (#1074)
Follow-up for #1072 & #1002
1 parent 8356590 commit 88f08ee

File tree

5 files changed

+66
-61
lines changed

5 files changed

+66
-61
lines changed

plugin/nulls.go

Lines changed: 52 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -6,73 +6,75 @@ import (
66
"github.com/apache/arrow/go/v13/arrow/memory"
77
)
88

9-
// TODO(v4): use in v4
10-
//
11-
// nolint:unused
12-
func stripNullsFromLists(records []arrow.Record) {
13-
for i := range records {
14-
cols := records[i].Columns()
15-
for c, col := range cols {
16-
if col.DataType().ID() != arrow.LIST {
9+
func stripNullsFromLists(record arrow.Record) arrow.Record {
10+
cols := record.Columns()
11+
for c, col := range cols {
12+
list, ok := col.(array.ListLike)
13+
if !ok {
14+
continue
15+
}
16+
if _, ok := list.(*array.Map); ok {
17+
// maps also correspond to array.ListLike
18+
continue
19+
}
20+
21+
bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(arrow.ListLikeType).Elem())
22+
for j := 0; j < list.Len(); j++ {
23+
if list.IsNull(j) {
24+
bldr.AppendNull()
1725
continue
1826
}
19-
20-
list := col.(*array.List)
21-
bldr := array.NewListBuilder(memory.DefaultAllocator, list.DataType().(*arrow.ListType).Elem())
22-
for j := 0; j < list.Len(); j++ {
23-
if list.IsNull(j) {
24-
bldr.AppendNull()
27+
bldr.Append(true)
28+
vBldr := bldr.ValueBuilder()
29+
from, to := list.ValueOffsets(j)
30+
slc := array.NewSlice(list.ListValues(), from, to)
31+
for k := 0; k < int(to-from); k++ {
32+
if slc.IsNull(k) {
2533
continue
2634
}
27-
bldr.Append(true)
28-
vBldr := bldr.ValueBuilder()
29-
from, to := list.ValueOffsets(j)
30-
slc := array.NewSlice(list.ListValues(), from, to)
31-
for k := 0; k < int(to-from); k++ {
32-
if slc.IsNull(k) {
33-
continue
34-
}
35-
err := vBldr.AppendValueFromString(slc.ValueStr(k))
36-
if err != nil {
37-
panic(err)
38-
}
35+
err := vBldr.AppendValueFromString(slc.ValueStr(k))
36+
if err != nil {
37+
panic(err)
3938
}
4039
}
41-
cols[c] = bldr.NewArray()
4240
}
43-
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
41+
cols[c] = bldr.NewArray()
4442
}
43+
return array.NewRecord(record.Schema(), cols, record.NumRows())
4544
}
4645

4746
type AllowNullFunc func(arrow.DataType) bool
4847

49-
// TODO(v4): use in v4
50-
//
51-
// nolint:unused
52-
func (f AllowNullFunc) replaceNullsByEmpty(records []arrow.Record) {
53-
if f == nil {
54-
return
48+
func (s *WriterTestSuite) replaceNullsByEmpty(record arrow.Record) arrow.Record {
49+
if s.allowNull == nil {
50+
return record
5551
}
56-
for i := range records {
57-
cols := records[i].Columns()
58-
for c, col := range records[i].Columns() {
59-
if col.NullN() == 0 || f(col.DataType()) {
52+
53+
cols := record.Columns()
54+
for c, col := range cols {
55+
if col.NullN() == 0 || s.allowNull(col.DataType()) {
56+
continue
57+
}
58+
59+
builder := array.NewBuilder(memory.DefaultAllocator, col.DataType())
60+
for j := 0; j < col.Len(); j++ {
61+
if col.IsNull(j) {
62+
builder.AppendEmptyValue()
6063
continue
6164
}
6265

63-
builder := array.NewBuilder(memory.DefaultAllocator, records[i].Column(c).DataType())
64-
for j := 0; j < col.Len(); j++ {
65-
if col.IsNull(j) {
66-
builder.AppendEmptyValue()
67-
continue
68-
}
69-
70-
if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
71-
panic(err)
72-
}
66+
if err := builder.AppendValueFromString(col.ValueStr(j)); err != nil {
67+
panic(err)
7368
}
74-
cols[c] = builder.NewArray()
7569
}
76-
records[i] = array.NewRecord(records[i].Schema(), cols, records[i].NumRows())
70+
cols[c] = builder.NewArray()
71+
}
72+
return array.NewRecord(record.Schema(), cols, record.NumRows())
73+
}
74+
75+
func (s *WriterTestSuite) handleNulls(record arrow.Record) arrow.Record {
76+
if s.ignoreNullsInLists {
77+
record = stripNullsFromLists(record)
7778
}
79+
return s.replaceNullsByEmpty(record)
7880
}

plugin/testing_upsert.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (s *WriterTestSuite) testUpsertBasic(ctx context.Context) error {
3737
}); err != nil {
3838
return fmt.Errorf("failed to insert record: %w", err)
3939
}
40+
record = s.handleNulls(record) // we process nulls after writing
4041

4142
records, err := s.plugin.readAll(ctx, table)
4243
if err != nil {
@@ -79,13 +80,12 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
7980

8081
tg := schema.NewTestDataGenerator()
8182
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1})[0]
82-
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]
83-
8483
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
8584
Record: normalRecord,
8685
}); err != nil {
8786
return fmt.Errorf("failed to insert record: %w", err)
8887
}
88+
normalRecord = s.handleNulls(normalRecord) // we process nulls after writing
8989

9090
records, err := s.plugin.readAll(ctx, table)
9191
if err != nil {
@@ -100,11 +100,13 @@ func (s *WriterTestSuite) testUpsertAll(ctx context.Context) error {
100100
return fmt.Errorf("record differs after insert: %s", diff)
101101
}
102102

103+
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]
103104
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
104105
Record: nullRecord,
105106
}); err != nil {
106107
return fmt.Errorf("failed to insert record: %w", err)
107108
}
109+
nullRecord = s.handleNulls(nullRecord) // we process nulls after writing
108110

109111
records, err = s.plugin.readAll(ctx, table)
110112
if err != nil {

plugin/testing_write_delete.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ func (s *WriterTestSuite) testDeleteStale(ctx context.Context) error {
3737
}); err != nil {
3838
return fmt.Errorf("failed to insert record: %w", err)
3939
}
40+
record = s.handleNulls(record) // we process nulls after writing
4041

4142
records, err := s.plugin.readAll(ctx, table)
4243
if err != nil {

plugin/testing_write_insert.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ func (s *WriterTestSuite) testInsertBasic(ctx context.Context) error {
4545
}); err != nil {
4646
return fmt.Errorf("failed to insert record: %w", err)
4747
}
48+
record = s.handleNulls(record) // we process nulls after writing
49+
4850
readRecords, err := s.plugin.readAll(ctx, table)
4951
if err != nil {
5052
return fmt.Errorf("failed to sync: %w", err)
@@ -91,19 +93,14 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
9193
return fmt.Errorf("failed to create table: %w", err)
9294
}
9395
tg := schema.NewTestDataGenerator()
94-
normalRecord := tg.Generate(table, schema.GenTestDataOptions{
95-
MaxRows: 1,
96-
})[0]
97-
nullRecord := tg.Generate(table, schema.GenTestDataOptions{
98-
MaxRows: 1,
99-
NullRows: true,
100-
})[0]
101-
96+
normalRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1})[0]
10297
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
10398
Record: normalRecord,
10499
}); err != nil {
105100
return fmt.Errorf("failed to insert record: %w", err)
106101
}
102+
normalRecord = s.handleNulls(normalRecord) // we process nulls after writing
103+
107104
readRecords, err := s.plugin.readAll(ctx, table)
108105
if err != nil {
109106
return fmt.Errorf("failed to sync: %w", err)
@@ -114,11 +111,13 @@ func (s *WriterTestSuite) testInsertAll(ctx context.Context) error {
114111
return fmt.Errorf("expected 1 item, got %d", totalItems)
115112
}
116113

114+
nullRecord := tg.Generate(table, schema.GenTestDataOptions{MaxRows: 1, NullRows: true})[0]
117115
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
118116
Record: nullRecord,
119117
}); err != nil {
120118
return fmt.Errorf("failed to insert record: %w", err)
121119
}
120+
nullRecord = s.handleNulls(nullRecord) // we process nulls after writing
122121

123122
readRecords, err = s.plugin.readAll(ctx, table)
124123
if err != nil {

plugin/testing_write_migrate.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,12 +38,12 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
3838
}
3939
tg := schema.NewTestDataGenerator()
4040
resource1 := tg.Generate(source, opts)[0]
41-
4241
if err := s.plugin.writeOne(ctx, &message.WriteInsert{
4342
Record: resource1,
4443
}); err != nil {
4544
return fmt.Errorf("failed to insert first record: %w", err)
4645
}
46+
resource1 = s.handleNulls(resource1) // we process nulls after writing
4747

4848
records, err := s.plugin.readAll(ctx, source)
4949
if err != nil {
@@ -70,6 +70,7 @@ func (s *WriterTestSuite) migrate(ctx context.Context, target *schema.Table, sou
7070
}); err != nil {
7171
return fmt.Errorf("failed to insert second record: %w", err)
7272
}
73+
resource2 = s.handleNulls(resource2) // we process nulls after writing
7374

7475
records, err = s.plugin.readAll(ctx, target)
7576
if err != nil {

0 commit comments

Comments
 (0)