Skip to content

Commit cf13dd5

Browse files
authored
fix: Support list scalars from JSON (#1530)
Without this, lists from the test source (at least, could be more plugins/cases) get persisted as `[]` I'm still not 100% sure how the lists end up getting encoded to JSON, probably we use Arrow's `.ValueStr` and/or `.GetOneForMarshal` somewhere. Called from table resolver - source plugins need this fix.
1 parent b7dcd56 commit cf13dd5

File tree

3 files changed

+54
-7
lines changed

3 files changed

+54
-7
lines changed

scalar/list.go

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package scalar
22

33
import (
4+
"encoding/json"
45
"reflect"
56
"strings"
67

@@ -85,6 +86,49 @@ func (s *List) Set(val any) error {
8586
return nil
8687
}
8788

89+
switch value := val.(type) {
90+
case string:
91+
var x []any
92+
if err := json.Unmarshal([]byte(value), &x); err != nil {
93+
return err
94+
}
95+
length := len(x)
96+
s.Value = make(Vector, length)
97+
for i := 0; i < length; i++ {
98+
s.Value[i] = NewScalar(s.Type.(*arrow.ListType).Elem())
99+
if x[i] == nil {
100+
continue
101+
}
102+
if err := s.Value[i].Set(x[i]); err != nil {
103+
return err
104+
}
105+
}
106+
107+
case []byte:
108+
var x []any
109+
if err := json.Unmarshal(value, &x); err != nil {
110+
return err
111+
}
112+
length := len(x)
113+
s.Value = make(Vector, length)
114+
for i := 0; i < length; i++ {
115+
s.Value[i] = NewScalar(s.Type.(*arrow.ListType).Elem())
116+
if x[i] == nil {
117+
continue
118+
}
119+
if err := s.Value[i].Set(x[i]); err != nil {
120+
return err
121+
}
122+
}
123+
124+
case *string:
125+
if value == nil {
126+
s.Valid = false
127+
return nil
128+
}
129+
return s.Set(*value)
130+
}
131+
88132
switch reflectedValue.Kind() {
89133
case reflect.Array:
90134
fallthrough

scalar/list_test.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,15 @@ func TestListSet(t *testing.T) {
3737
&Inet{Valid: false},
3838
&Inet{Valid: false},
3939
}, Valid: true, Type: arrow.ListOf(types.ExtensionTypes.Inet)}},
40+
{source: `[1, 2]`, result: List{Value: []Scalar{
41+
&Int{Value: 1, Valid: true},
42+
&Int{Value: 2, Valid: true},
43+
}, Valid: true, Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}},
44+
{source: `[1, null, 2]`, result: List{Value: []Scalar{
45+
&Int{Value: 1, Valid: true},
46+
&Int{Valid: false},
47+
&Int{Value: 2, Valid: true},
48+
}, Valid: true, Type: arrow.ListOf(arrow.PrimitiveTypes.Int64)}},
4049
}
4150

4251
for i, tt := range successfulTests {

scheduler/scheduler.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,8 @@ import (
1111

1212
"github.com/apache/arrow/go/v15/arrow"
1313

14-
"github.com/apache/arrow/go/v15/arrow/array"
15-
"github.com/apache/arrow/go/v15/arrow/memory"
1614
"github.com/cloudquery/plugin-sdk/v4/caser"
1715
"github.com/cloudquery/plugin-sdk/v4/message"
18-
"github.com/cloudquery/plugin-sdk/v4/scalar"
1916
"github.com/cloudquery/plugin-sdk/v4/schema"
2017
"github.com/getsentry/sentry-go"
2118
"github.com/rs/zerolog"
@@ -225,10 +222,7 @@ func (s *Scheduler) Sync(ctx context.Context, client schema.ClientMeta, tables s
225222

226223
func resourceToRecord(resource *schema.Resource) arrow.Record {
227224
vector := resource.GetValues()
228-
bldr := array.NewRecordBuilder(memory.DefaultAllocator, resource.Table.ToArrowSchema())
229-
scalar.AppendToRecordBuilder(bldr, vector)
230-
rec := bldr.NewRecord()
231-
return rec
225+
return vector.ToArrowRecord(resource.Table.ToArrowSchema())
232226
}
233227

234228
func (s *syncClient) logTablesMetrics(tables schema.Tables, client Client) {

0 commit comments

Comments
 (0)