Skip to content

Commit 9b9fdd0

Browse files
fix(parquet/pqarrow): decoding Parquet with Arrow dict in schema (apache#551)
Fix spurious `parquet: column chunk cannot have more than one dictionary.` with specific parquet file Resolve apache#546 Parquet with * Arrow Dict column * Arrow Schema serialied in Parquet Metadata * ColumnChunks with 1 dict page + at least 2 Data page # Bug When maybeWriteNewDictionary() resets `newDictionary = false` at line 965, it causes the next call to readDictionary() to try to read the dictionary page again from the pager, which then calls configureDict() again, which throws the "cannot have more than one dictionary" error! The sequence is: 1. Read DICTIONARY_PAGE → newDictionary = true 2. Read DATA_PAGE_1 → calls maybeWriteNewDictionary() → resets newDictionary = false 3. Read DATA_PAGE_2 → calls readDictionary() → since newDictionary = false, tries to get dictionary page again → calls configureDict() → ERROR because decoder already exists # Fix Added DictionaryState enum (column_reader.go): - DictNotRead: Dictionary page hasn't been read yet - DictReadNotInserted: Dictionary page read and decoder configured, but not inserted into Arrow builder - DictFullyProcessed: Dictionary fully processed (read + inserted into builder)
1 parent 95b3f76 commit 9b9fdd0

File tree

3 files changed

+173
-8
lines changed

3 files changed

+173
-8
lines changed

parquet/file/column_reader.go

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,19 @@ const (
3838
defaultPageHeaderSize = 16 * 1024
3939
)
4040

41+
// dictionaryState tracks the lifecycle of dictionary handling for a column chunk
42+
type dictionaryState int
43+
44+
const (
45+
// dictNotRead: Dictionary page has not been read yet
46+
dictNotRead dictionaryState = iota
47+
// dictReadNotInserted: Dictionary page has been read and decoder configured,
48+
// but not yet inserted into Arrow builder (for Arrow Dictionary types only)
49+
dictReadNotInserted
50+
// dictFullyProcessed: Dictionary has been read, configured, and inserted into builder
51+
dictFullyProcessed
52+
)
53+
4154
// cloneByteArray is a helper function to clone a slice of byte slices
4255
func cloneByteArray[T ~[]byte](src []T) {
4356
totalLength := 0
@@ -160,7 +173,7 @@ type columnChunkReader struct {
160173
defLvlBuffer []int16
161174
repLvlBuffer []int16
162175

163-
newDictionary bool
176+
dictState dictionaryState
164177
}
165178

166179
func newTypedColumnChunkReader(base columnChunkReader) ColumnChunkReader {
@@ -243,7 +256,7 @@ func (c *columnChunkReader) setPageReader(rdr PageReader) {
243256
c.Close()
244257
c.rdr, c.err = rdr, nil
245258
c.decoders = make(map[format.Encoding]encoding.TypedDecoder)
246-
c.newDictionary = false
259+
c.dictState = dictNotRead
247260
c.numBuffered, c.numDecoded = 0, 0
248261
}
249262

@@ -286,7 +299,8 @@ func (c *columnChunkReader) HasNext() bool {
286299
}
287300

288301
func (c *columnChunkReader) readDictionary() error {
289-
if c.newDictionary {
302+
// If dictionary has been read (in any state beyond dictNotRead), skip reading
303+
if c.dictState != dictNotRead {
290304
return nil
291305
}
292306

@@ -324,7 +338,10 @@ func (c *columnChunkReader) configureDict(page *DictionaryPage) error {
324338
return xerrors.New("parquet: dictionary index must be plain encoding")
325339
}
326340

327-
c.newDictionary = true
341+
// Dictionary page has been read and decoder configured
342+
// For non-Arrow Dictionary types, this is the final state
343+
// For Arrow Dictionary types, record reader will advance to dictFullyProcessed
344+
c.dictState = dictReadNotInserted
328345
c.curDecoder = c.decoders[enc]
329346
return nil
330347
}

parquet/file/record_reader.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -953,16 +953,17 @@ func (bd *byteArrayDictRecordReader) flushBuilder() {
953953

954954
func (bd *byteArrayDictRecordReader) maybeWriteNewDictionary() error {
955955
rdr := bd.ColumnChunkReader.(*ByteArrayColumnChunkReader)
956-
if rdr.newDictionary {
957-
// if there is a new dictionary, we may need to flush the builder,
958-
// then insert the new dictionary values
956+
// Only process dictionary if it's been read but not yet inserted into builder
957+
if rdr.dictState == dictReadNotInserted {
958+
// Flush the builder and insert the new dictionary values
959959
bd.flushBuilder()
960960
bd.bldr.(*array.BinaryDictionaryBuilder).ResetFull()
961961
dec := rdr.curDecoder.(*encoding.DictByteArrayDecoder)
962962
if err := dec.InsertDictionary(bd.bldr); err != nil {
963963
return err
964964
}
965-
rdr.newDictionary = false
965+
// Mark dictionary as fully processed (read + inserted)
966+
rdr.dictState = dictFullyProcessed
966967
}
967968
return nil
968969
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing, software
12+
// distributed under the License is distributed on an "AS IS" BASIS,
13+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
// See the License for the specific language governing permissions and
15+
// limitations under the License.
16+
17+
package pqarrow_test
18+
19+
import (
20+
"bytes"
21+
"context"
22+
"io"
23+
"testing"
24+
25+
"github.com/apache/arrow-go/v18/arrow"
26+
"github.com/apache/arrow-go/v18/arrow/array"
27+
"github.com/apache/arrow-go/v18/arrow/memory"
28+
"github.com/apache/arrow-go/v18/parquet"
29+
"github.com/apache/arrow-go/v18/parquet/compress"
30+
"github.com/apache/arrow-go/v18/parquet/file"
31+
"github.com/apache/arrow-go/v18/parquet/pqarrow"
32+
"github.com/stretchr/testify/require"
33+
)
34+
35+
// TestArrowDictionaryTypeMultiplePages tests reading Arrow Dictionary types
36+
// with multiple data pages in a single row group.
37+
//
38+
// This test exercises byteArrayDictRecordReader which has a bug at line 966
39+
// in maybeWriteNewDictionary() that resets newDictionary=false.
40+
//
41+
// The bug manifests when:
42+
// 1. Arrow schema has Dictionary type (not just parquet dictionary encoding)
43+
// 2. Multiple data pages exist in a row group
44+
// 3. Reading with large batch size that spans multiple pages
45+
func TestArrowDictionaryTypeMultiplePages(t *testing.T) {
46+
mem := memory.NewCheckedAllocator(memory.DefaultAllocator)
47+
defer mem.AssertSize(t, 0)
48+
49+
// Schema with Arrow Dictionary type
50+
schema := arrow.NewSchema(
51+
[]arrow.Field{
52+
{
53+
Name: "dict_col",
54+
Type: &arrow.DictionaryType{
55+
IndexType: arrow.PrimitiveTypes.Uint32,
56+
ValueType: arrow.BinaryTypes.String,
57+
},
58+
Nullable: false,
59+
},
60+
},
61+
nil,
62+
)
63+
64+
var buf bytes.Buffer
65+
66+
// CRITICAL: Use WithStoreSchema() to preserve Arrow Dictionary type metadata
67+
// Without this, arrow-go converts Dictionary type to plain string
68+
props := parquet.NewWriterProperties(
69+
parquet.WithDictionaryDefault(true),
70+
parquet.WithCompression(compress.Codecs.Snappy),
71+
parquet.WithDataPageSize(10), // Small page size to force multiple pages
72+
parquet.WithMaxRowGroupLength(100000),
73+
parquet.WithAllocator(mem),
74+
)
75+
76+
writerProps := pqarrow.NewArrowWriterProperties(
77+
pqarrow.WithStoreSchema(), // KEY: Preserve Arrow Dictionary type
78+
pqarrow.WithAllocator(mem),
79+
)
80+
81+
writer, err := pqarrow.NewFileWriter(schema, &buf, props, writerProps)
82+
require.NoError(t, err)
83+
84+
// Create dictionary array with many values to span multiple pages
85+
dictBuilder := array.NewDictionaryBuilder(mem, &arrow.DictionaryType{
86+
IndexType: arrow.PrimitiveTypes.Uint32,
87+
ValueType: arrow.BinaryTypes.String,
88+
}).(*array.BinaryDictionaryBuilder)
89+
defer dictBuilder.Release()
90+
91+
// Create data with few unique values (good for dictionary)
92+
values := []string{"ValueA", "ValueB", "ValueC", "ValueD"}
93+
numRows := 2000
94+
95+
for i := 0; i < numRows; i++ {
96+
require.NoError(t, dictBuilder.AppendString(values[i%len(values)]))
97+
}
98+
99+
dictArray := dictBuilder.NewDictionaryArray()
100+
defer dictArray.Release()
101+
102+
rec := array.NewRecordBatch(schema, []arrow.Array{dictArray}, int64(numRows))
103+
defer rec.Release()
104+
105+
err = writer.Write(rec)
106+
require.NoError(t, err)
107+
108+
err = writer.Close()
109+
require.NoError(t, err)
110+
111+
t.Logf("Written %d bytes", buf.Len())
112+
113+
// Read back
114+
pf, err := file.NewParquetReader(bytes.NewReader(buf.Bytes()),
115+
file.WithReadProps(parquet.NewReaderProperties(mem)))
116+
require.NoError(t, err)
117+
defer pf.Close()
118+
119+
t.Logf("File has %d row groups", pf.NumRowGroups())
120+
121+
reader, err := pqarrow.NewFileReader(pf,
122+
pqarrow.ArrowReadProperties{BatchSize: pf.NumRows()}, mem)
123+
require.NoError(t, err)
124+
125+
rr, err := reader.GetRecordReader(context.Background(), nil, nil)
126+
require.NoError(t, err)
127+
defer rr.Release()
128+
129+
// Read all data - this should trigger the bug if present
130+
totalRows := int64(0)
131+
for {
132+
rec, err := rr.Read()
133+
if err == io.EOF {
134+
break
135+
}
136+
137+
// This will fail with "parquet: column chunk cannot have more than one dictionary"
138+
// if the bug is present
139+
require.NoError(t, err, "Failed to read Arrow Dictionary type with multiple pages")
140+
141+
totalRows += rec.NumRows()
142+
// Note: Don't call rec.Release() here - the record reader manages record lifecycle
143+
}
144+
145+
require.Equal(t, int64(numRows), totalRows, "Should read all rows")
146+
t.Logf("Successfully read %d rows", totalRows)
147+
}

0 commit comments

Comments
 (0)