Skip to content

Commit a7d23a7

Browse files
authored
fix(arrow/array): Fix RecordFromJSON perf (#449)
fixes #448 ### Rationale for this change When dealing with unicode in json values, `RecordFromJSON` seems to have a significant performance slow-down due to an odd interaction of decoders with goccy/go-json. `NewJSONReader` doesn't exhibit the issue because it essentially creates a NewDecoder for each line/record by decoding into a RecordBuilder directly. ### What changes are included in this PR? Change `RecordFromJSON` to work closer to `NewJSONReader` in how it decodes directly into a `RecordBuilder` so that we side-step the performance problem for large amounts of JSON. ### Are these changes tested? Yes, benchmarks are added to keep track of the performance of using `RecordFromJSON` vs `NewJSONReader` for the same data. ### Are there any user-facing changes? Only a performance improvement when JSON has large amounts of unicode data.
1 parent 1a625d4 commit a7d23a7

File tree

3 files changed

+187
-7
lines changed

3 files changed

+187
-7
lines changed

arrow/array/json_reader_test.go

Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,15 @@
1717
package array_test
1818

1919
import (
20+
"bytes"
21+
"fmt"
2022
"strings"
2123
"testing"
2224

2325
"github.com/apache/arrow-go/v18/arrow"
2426
"github.com/apache/arrow-go/v18/arrow/array"
2527
"github.com/apache/arrow-go/v18/arrow/memory"
28+
"github.com/apache/arrow-go/v18/internal/json"
2629
"github.com/stretchr/testify/assert"
2730
)
2831

@@ -139,3 +142,122 @@ func TestUnmarshalJSON(t *testing.T) {
139142

140143
assert.NotNil(t, record)
141144
}
145+
146+
func generateJSONData(n int) []byte {
147+
records := make([]map[string]any, n)
148+
for i := range n {
149+
records[i] = map[string]any{
150+
"id": i,
151+
"name": fmt.Sprintf("record_%d", i),
152+
"value": float64(i) * 1.5,
153+
"active": i%2 == 0,
154+
"metadata": fmt.Sprintf("metadata_%d_%s", i, make([]byte, 500)),
155+
}
156+
}
157+
158+
data, _ := json.Marshal(records)
159+
return data
160+
}
161+
162+
func jsonArrayToNDJSON(data []byte) ([]byte, error) {
163+
var records []json.RawMessage
164+
if err := json.Unmarshal(data, &records); err != nil {
165+
return nil, err
166+
}
167+
168+
var ndjson bytes.Buffer
169+
for _, record := range records {
170+
ndjson.Write(record)
171+
ndjson.WriteString("\n")
172+
}
173+
174+
return ndjson.Bytes(), nil
175+
}
176+
177+
func BenchmarkRecordFromJSON(b *testing.B) {
178+
schema := arrow.NewSchema([]arrow.Field{
179+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
180+
{Name: "name", Type: arrow.BinaryTypes.String},
181+
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
182+
{Name: "active", Type: arrow.FixedWidthTypes.Boolean},
183+
{Name: "metadata", Type: arrow.BinaryTypes.String},
184+
}, nil)
185+
186+
testSizes := []int64{1000, 5000, 10000}
187+
188+
for _, size := range testSizes {
189+
b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
190+
data := generateJSONData(int(size))
191+
pool := memory.NewGoAllocator()
192+
193+
var rdr bytes.Reader
194+
b.SetBytes(int64(len(data)))
195+
b.ResetTimer()
196+
for range b.N {
197+
rdr.Reset(data)
198+
199+
record, _, err := array.RecordFromJSON(pool, schema, &rdr)
200+
if err != nil {
201+
b.Error(err)
202+
}
203+
204+
if record.NumRows() != size {
205+
b.Errorf("expected %d rows, got %d", size, record.NumRows())
206+
}
207+
record.Release()
208+
}
209+
})
210+
}
211+
}
212+
213+
func BenchmarkJSONReader(b *testing.B) {
214+
schema := arrow.NewSchema([]arrow.Field{
215+
{Name: "id", Type: arrow.PrimitiveTypes.Int64},
216+
{Name: "name", Type: arrow.BinaryTypes.String},
217+
{Name: "value", Type: arrow.PrimitiveTypes.Float64},
218+
{Name: "active", Type: arrow.FixedWidthTypes.Boolean},
219+
{Name: "metadata", Type: arrow.BinaryTypes.String},
220+
}, nil)
221+
222+
testSizes := []int64{1000, 5000, 10000}
223+
224+
for _, size := range testSizes {
225+
b.Run(fmt.Sprintf("Size_%d", size), func(b *testing.B) {
226+
data := generateJSONData(int(size))
227+
data, err := jsonArrayToNDJSON(data)
228+
if err != nil {
229+
b.Fatalf("failed to convert JSON to NDJSON: %v", err)
230+
}
231+
232+
var rdr bytes.Reader
233+
for _, chkSize := range []int{-1, int(size / 2), int(size)} {
234+
b.Run(fmt.Sprintf("ChunkSize_%d", chkSize), func(b *testing.B) {
235+
pool := memory.NewGoAllocator()
236+
b.SetBytes(int64(len(data)))
237+
b.ResetTimer()
238+
for range b.N {
239+
rdr.Reset(data)
240+
241+
jsonRdr := array.NewJSONReader(&rdr, schema, array.WithAllocator(pool),
242+
array.WithChunk(chkSize))
243+
244+
var totalRows int64
245+
for jsonRdr.Next() {
246+
rec := jsonRdr.Record()
247+
totalRows += rec.NumRows()
248+
}
249+
250+
if err := jsonRdr.Err(); err != nil {
251+
b.Errorf("error reading JSON: %v", err)
252+
}
253+
jsonRdr.Release()
254+
255+
if totalRows != size {
256+
b.Errorf("expected %d rows, got %d", size, totalRows)
257+
}
258+
}
259+
})
260+
}
261+
})
262+
}
263+
}

arrow/array/struct.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,9 @@ func (b *StructBuilder) UnmarshalOne(dec *json.Decoder) error {
473473
idx, ok := b.dtype.(*arrow.StructType).FieldIdx(key)
474474
if !ok {
475475
var extra interface{}
476-
dec.Decode(&extra)
476+
if err := dec.Decode(&extra); err != nil {
477+
return err
478+
}
477479
continue
478480
}
479481

arrow/array/util.go

Lines changed: 62 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -210,15 +210,71 @@ func RecordFromStructArray(in *Struct, schema *arrow.Schema) arrow.Record {
210210
//
211211
// A record batch from JSON is equivalent to reading a struct array in from json and then
212212
// converting it to a record batch.
213+
//
214+
// See https://github.com/apache/arrow-go/issues/448 for more details on
215+
// why this isn't a simple wrapper around FromJSON.
213216
func RecordFromJSON(mem memory.Allocator, schema *arrow.Schema, r io.Reader, opts ...FromJSONOption) (arrow.Record, int64, error) {
214-
st := arrow.StructOf(schema.Fields()...)
215-
arr, off, err := FromJSON(mem, st, r, opts...)
216-
if err != nil {
217-
return nil, off, err
217+
var cfg fromJSONCfg
218+
for _, o := range opts {
219+
o(&cfg)
220+
}
221+
222+
if cfg.startOffset != 0 {
223+
seeker, ok := r.(io.ReadSeeker)
224+
if !ok {
225+
return nil, 0, errors.New("using StartOffset option requires reader to be a ReadSeeker, cannot seek")
226+
}
227+
if _, err := seeker.Seek(cfg.startOffset, io.SeekStart); err != nil {
228+
return nil, 0, fmt.Errorf("failed to seek to start offset %d: %w", cfg.startOffset, err)
229+
}
230+
}
231+
232+
if mem == nil {
233+
mem = memory.DefaultAllocator
234+
}
235+
236+
bldr := NewRecordBuilder(mem, schema)
237+
defer bldr.Release()
238+
239+
dec := json.NewDecoder(r)
240+
if cfg.useNumber {
241+
dec.UseNumber()
242+
}
243+
244+
if !cfg.multiDocument {
245+
t, err := dec.Token()
246+
if err != nil {
247+
return nil, dec.InputOffset(), err
248+
}
249+
if delim, ok := t.(json.Delim); !ok || delim != '[' {
250+
return nil, dec.InputOffset(), fmt.Errorf("json doc must be an array, found %s", delim)
251+
}
252+
253+
for dec.More() {
254+
if err := dec.Decode(bldr); err != nil {
255+
return nil, dec.InputOffset(), fmt.Errorf("failed to decode json: %w", err)
256+
}
257+
}
258+
259+
// consume the last ']'
260+
if _, err = dec.Token(); err != nil {
261+
return nil, dec.InputOffset(), fmt.Errorf("failed to decode json: %w", err)
262+
}
263+
264+
return bldr.NewRecord(), dec.InputOffset(), nil
265+
}
266+
267+
for {
268+
err := dec.Decode(bldr)
269+
if err != nil {
270+
if errors.Is(err, io.EOF) {
271+
break
272+
}
273+
return nil, dec.InputOffset(), fmt.Errorf("failed to decode json: %w", err)
274+
}
218275
}
219-
defer arr.Release()
220276

221-
return RecordFromStructArray(arr.(*Struct), schema), off, nil
277+
return bldr.NewRecord(), dec.InputOffset(), nil
222278
}
223279

224280
// RecordToJSON writes out the given record following the format of each row is a single object

0 commit comments

Comments
 (0)