Skip to content

Commit 3a01d9f

Browse files
authored
feat: support conversion of chunked arrays (apache#553)
### Rationale for this change Fixes apache#552 ### What changes are included in this PR? Make `chunksToSingle` concatenate multiple chunks into a single array using `array.Concatenate` instead of returning `ErrNotImplemented` ### Are these changes tested? Unit tests + tested end-to-end locally on a parquet file that contains chunked arrays ### Are there any user-facing changes? Changes are backward compatible
1 parent 9b9fdd0 commit 3a01d9f

File tree

3 files changed

+292
-8
lines changed

3 files changed

+292
-8
lines changed

parquet/pqarrow/column_readers.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -319,7 +319,7 @@ func (sr *structReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
319319
return nil, err
320320
}
321321

322-
childArrData[i], err = chunksToSingle(field)
322+
childArrData[i], err = chunksToSingle(field, sr.rctx.mem)
323323
field.Release() // release field before checking
324324
if err != nil {
325325
return nil, err
@@ -442,7 +442,7 @@ func (lr *listReader) BuildArray(lenBound int64) (*arrow.Chunked, error) {
442442
validityBuffer.Resize(int(bitutil.BytesForBits(validityIO.Read)))
443443
}
444444

445-
item, err := chunksToSingle(arr)
445+
item, err := chunksToSingle(arr, lr.rctx.mem)
446446
if err != nil {
447447
return nil, err
448448
}
@@ -489,18 +489,25 @@ func newFixedSizeListReader(rctx *readerCtx, field *arrow.Field, info file.Level
489489
}
490490

491491
// helper function to combine chunks into a single array.
492-
//
493-
// nested data conversion for chunked array outputs not yet implemented
494-
func chunksToSingle(chunked *arrow.Chunked) (arrow.ArrayData, error) {
492+
func chunksToSingle(chunked *arrow.Chunked, mem memory.Allocator) (arrow.ArrayData, error) {
495493
switch len(chunked.Chunks()) {
496494
case 0:
497495
return array.NewData(chunked.DataType(), 0, []*memory.Buffer{nil, nil}, nil, 0, 0), nil
498496
case 1:
499497
data := chunked.Chunk(0).Data()
500498
data.Retain() // we pass control to the caller
501499
return data, nil
502-
default: // if an item reader yields a chunked array, this is not yet implemented
503-
return nil, arrow.ErrNotImplemented
500+
default:
501+
// concatenate multiple chunks into a single array
502+
concatenated, err := array.Concatenate(chunked.Chunks(), mem)
503+
if err != nil {
504+
return nil, err
505+
}
506+
defer concatenated.Release()
507+
508+
data := concatenated.Data()
509+
data.Retain()
510+
return data, nil
504511
}
505512
}
506513

Lines changed: 275 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,275 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package pqarrow
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"io"
23+
"testing"
24+
25+
"github.com/apache/arrow-go/v18/arrow"
26+
"github.com/apache/arrow-go/v18/arrow/array"
27+
"github.com/apache/arrow-go/v18/arrow/memory"
28+
"github.com/apache/arrow-go/v18/parquet"
29+
"github.com/apache/arrow-go/v18/parquet/compress"
30+
"github.com/apache/arrow-go/v18/parquet/file"
31+
32+
"github.com/stretchr/testify/assert"
33+
"github.com/stretchr/testify/require"
34+
)
35+
36+
func TestChunksToSingle(t *testing.T) {
37+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
38+
defer mem.AssertSize(t, 0)
39+
40+
t.Run("empty chunked array", func(t *testing.T) {
41+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{})
42+
defer chunked.Release()
43+
44+
result, err := chunksToSingle(chunked, mem)
45+
require.NoError(t, err)
46+
defer result.Release()
47+
48+
assert.Equal(t, 0, result.Len())
49+
})
50+
51+
t.Run("single chunk", func(t *testing.T) {
52+
bldr := array.NewInt32Builder(mem)
53+
defer bldr.Release()
54+
bldr.AppendValues([]int32{1, 2, 3, 4, 5}, nil)
55+
arr := bldr.NewInt32Array()
56+
defer arr.Release()
57+
58+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{arr})
59+
defer chunked.Release()
60+
61+
result, err := chunksToSingle(chunked, mem)
62+
require.NoError(t, err)
63+
defer result.Release()
64+
65+
assert.Equal(t, 5, result.Len())
66+
})
67+
68+
t.Run("multiple chunks", func(t *testing.T) {
69+
bldr := array.NewInt32Builder(mem)
70+
defer bldr.Release()
71+
72+
bldr.AppendValues([]int32{1, 2, 3}, nil)
73+
chunk1 := bldr.NewInt32Array()
74+
defer chunk1.Release()
75+
76+
bldr.AppendValues([]int32{4, 5, 6}, nil)
77+
chunk2 := bldr.NewInt32Array()
78+
defer chunk2.Release()
79+
80+
bldr.AppendValues([]int32{7, 8, 9, 10}, nil)
81+
chunk3 := bldr.NewInt32Array()
82+
defer chunk3.Release()
83+
84+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2, chunk3})
85+
defer chunked.Release()
86+
87+
result, err := chunksToSingle(chunked, mem)
88+
require.NoError(t, err)
89+
defer result.Release()
90+
91+
assert.Equal(t, 10, result.Len())
92+
93+
// Verify concatenated values
94+
resultArr := array.MakeFromData(result).(*array.Int32)
95+
defer resultArr.Release()
96+
for i, expected := range []int32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10} {
97+
assert.Equal(t, expected, resultArr.Value(i))
98+
}
99+
})
100+
101+
t.Run("multiple chunks with nulls", func(t *testing.T) {
102+
bldr := array.NewInt32Builder(mem)
103+
defer bldr.Release()
104+
105+
bldr.AppendValues([]int32{1, 0, 3}, []bool{true, false, true})
106+
chunk1 := bldr.NewInt32Array()
107+
defer chunk1.Release()
108+
109+
bldr.AppendValues([]int32{4, 0, 6}, []bool{true, false, true})
110+
chunk2 := bldr.NewInt32Array()
111+
defer chunk2.Release()
112+
113+
chunked := arrow.NewChunked(arrow.PrimitiveTypes.Int32, []arrow.Array{chunk1, chunk2})
114+
defer chunked.Release()
115+
116+
result, err := chunksToSingle(chunked, mem)
117+
require.NoError(t, err)
118+
defer result.Release()
119+
120+
assert.Equal(t, 6, result.Len())
121+
assert.Equal(t, 2, result.NullN())
122+
123+
resultArr := array.MakeFromData(result).(*array.Int32)
124+
defer resultArr.Release()
125+
assert.False(t, resultArr.IsValid(1))
126+
assert.False(t, resultArr.IsValid(4))
127+
assert.Equal(t, int32(1), resultArr.Value(0))
128+
assert.Equal(t, int32(3), resultArr.Value(2))
129+
})
130+
131+
t.Run("multiple chunks string type", func(t *testing.T) {
132+
bldr := array.NewStringBuilder(mem)
133+
defer bldr.Release()
134+
135+
bldr.AppendValues([]string{"hello", "world"}, nil)
136+
chunk1 := bldr.NewStringArray()
137+
defer chunk1.Release()
138+
139+
bldr.AppendValues([]string{"arrow", "parquet"}, nil)
140+
chunk2 := bldr.NewStringArray()
141+
defer chunk2.Release()
142+
143+
chunked := arrow.NewChunked(arrow.BinaryTypes.String, []arrow.Array{chunk1, chunk2})
144+
defer chunked.Release()
145+
146+
result, err := chunksToSingle(chunked, mem)
147+
require.NoError(t, err)
148+
defer result.Release()
149+
150+
assert.Equal(t, 4, result.Len())
151+
152+
resultArr := array.MakeFromData(result).(*array.String)
153+
defer resultArr.Release()
154+
assert.Equal(t, "hello", resultArr.Value(0))
155+
assert.Equal(t, "parquet", resultArr.Value(3))
156+
})
157+
}
158+
159+
func TestChunkedTableRoundTrip(t *testing.T) {
160+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
161+
defer mem.AssertSize(t, 0)
162+
163+
schema := arrow.NewSchema(
164+
[]arrow.Field{
165+
{Name: "int64_col", Type: arrow.PrimitiveTypes.Int64, Nullable: false},
166+
{Name: "string_col", Type: arrow.BinaryTypes.String, Nullable: true},
167+
},
168+
nil,
169+
)
170+
171+
// Test data across 3 chunks: 5 + 3 + 2 = 10 rows
172+
allInt64Values := []int64{10, 20, 30, 40, 50, 60, 70, 80, 90, 100}
173+
allStringValues := []string{"hello", "world", "arrow", "parquet", "go", "chunked", "table", "test", "final", "chunk"}
174+
175+
var buf bytes.Buffer
176+
props := parquet.NewWriterProperties(
177+
parquet.WithCompression(compress.Codecs.Snappy),
178+
parquet.WithAllocator(mem),
179+
)
180+
181+
writerProps := NewArrowWriterProperties(
182+
WithAllocator(mem),
183+
)
184+
185+
writer, err := NewFileWriter(schema, &buf, props, writerProps)
186+
require.NoError(t, err)
187+
188+
// Write three chunks: 5 rows, 3 rows, 2 rows
189+
chunks := []struct{ start, end int }{
190+
{0, 5}, // First chunk: 5 rows
191+
{5, 8}, // Second chunk: 3 rows
192+
{8, 10}, // Third chunk: 2 rows
193+
}
194+
195+
for _, chunk := range chunks {
196+
int64Builder := array.NewInt64Builder(mem)
197+
int64Builder.AppendValues(allInt64Values[chunk.start:chunk.end], nil)
198+
int64Arr := int64Builder.NewInt64Array()
199+
int64Builder.Release()
200+
201+
stringBuilder := array.NewStringBuilder(mem)
202+
stringBuilder.AppendValues(allStringValues[chunk.start:chunk.end], nil)
203+
stringArr := stringBuilder.NewStringArray()
204+
stringBuilder.Release()
205+
206+
rec := array.NewRecordBatch(schema, []arrow.Array{int64Arr, stringArr}, int64(chunk.end-chunk.start))
207+
208+
err = writer.Write(rec)
209+
require.NoError(t, err)
210+
211+
rec.Release()
212+
int64Arr.Release()
213+
stringArr.Release()
214+
}
215+
216+
err = writer.Close()
217+
require.NoError(t, err)
218+
219+
// Read back from parquet
220+
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()),
221+
file.WithReadProps(parquet.NewReaderProperties(mem)))
222+
require.NoError(t, err)
223+
defer pf.Close()
224+
225+
reader, err := NewFileReader(pf, ArrowReadProperties{}, mem)
226+
require.NoError(t, err)
227+
228+
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
229+
require.NoError(t, err)
230+
defer rr.Release()
231+
232+
var records []arrow.RecordBatch
233+
for {
234+
rec, err := rr.Read()
235+
if err == io.EOF {
236+
break
237+
}
238+
require.NoError(t, err)
239+
rec.Retain()
240+
records = append(records, rec)
241+
}
242+
243+
readTable := array.NewTableFromRecords(schema, records)
244+
defer readTable.Release()
245+
246+
for _, rec := range records {
247+
rec.Release()
248+
}
249+
250+
// Verify the read table
251+
require.Equal(t, int64(10), readTable.NumRows())
252+
require.Equal(t, int64(2), readTable.NumCols())
253+
254+
// Verify int64 column values
255+
int64Col := readTable.Column(0).Data()
256+
int64Single, err := chunksToSingle(int64Col, mem)
257+
require.NoError(t, err)
258+
defer int64Single.Release()
259+
int64Arr := array.MakeFromData(int64Single).(*array.Int64)
260+
defer int64Arr.Release()
261+
for i := 0; i < int64Arr.Len(); i++ {
262+
assert.Equal(t, allInt64Values[i], int64Arr.Value(i))
263+
}
264+
265+
// Verify string column values
266+
stringCol := readTable.Column(1).Data()
267+
stringSingle, err := chunksToSingle(stringCol, mem)
268+
require.NoError(t, err)
269+
defer stringSingle.Release()
270+
stringArr := array.MakeFromData(stringSingle).(*array.String)
271+
defer stringArr.Release()
272+
for i := 0; i < stringArr.Len(); i++ {
273+
assert.Equal(t, allStringValues[i], stringArr.Value(i))
274+
}
275+
}

parquet/pqarrow/file_reader.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -527,6 +527,7 @@ func (fr *FileReader) GetRecordReader(ctx context.Context, colIndices, rowGroups
527527
parallel: fr.Props.Parallel,
528528
sc: sc,
529529
fieldReaders: readers,
530+
mem: fr.mem,
530531
}
531532
rr.refCount.Add(1)
532533
return rr, nil
@@ -721,6 +722,7 @@ type recordReader struct {
721722
fieldReaders []*ColumnReader
722723
cur arrow.RecordBatch
723724
err error
725+
mem memory.Allocator
724726

725727
refCount atomic.Int64
726728
}
@@ -789,7 +791,7 @@ func (r *recordReader) next() bool {
789791
return io.EOF
790792
}
791793

792-
arrdata, err := chunksToSingle(data)
794+
arrdata, err := chunksToSingle(data, r.mem)
793795
if err != nil {
794796
return err
795797
}

0 commit comments

Comments
 (0)