Skip to content

Commit 1687efd

Browse files
authored
sio/parquetio: fix VectorReader bugs (#6374)
* Fix a data race accessing VectorReader.rr. * Fix a nil pointer panic accessing VectorReader.metadataFilters.
1 parent 68fe51a commit 1687efd

File tree

2 files changed

+34
-19
lines changed

2 files changed

+34
-19
lines changed

sio/parquetio/vectorreader.go

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ type VectorReader struct {
3636
metadataColIndexes []int
3737
metadataFilters []expr.Evaluator
3838
nextRowGroup *atomic.Int64
39-
rr pqarrow.RecordReader
39+
rrs []pqarrow.RecordReader
4040
vbs []vectorBuilder
4141
}
4242

@@ -68,24 +68,26 @@ func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sb
6868
var metadataColIndexes []int
6969
var metadataFilters []expr.Evaluator
7070
if p != nil {
71-
_, projection, err := p.MetaFilter()
71+
filter, projection, err := p.MetaFilter()
7272
if err != nil {
7373
return nil, err
7474
}
75-
paths := projection.Paths()
76-
for i, p := range paths {
77-
// Trim trailing "max" or "min".
78-
paths[i] = p[:len(p)-1]
79-
}
80-
colIndexes := columnIndexes(pr.MetaData().Schema, paths)
81-
// Remove duplicates created above by trimming "max" and "min".
82-
metadataColIndexes = slices.Compact(colIndexes)
83-
for range concurrentReaders {
84-
filter, _, err := p.MetaFilter()
85-
if err != nil {
86-
return nil, err
75+
if filter != nil {
76+
paths := projection.Paths()
77+
for i, p := range paths {
78+
// Trim trailing "max" or "min".
79+
paths[i] = p[:len(p)-1]
80+
}
81+
colIndexes := columnIndexes(pr.MetaData().Schema, paths)
82+
// Remove duplicates created above by trimming "max" and "min".
83+
metadataColIndexes = slices.Compact(colIndexes)
84+
for range concurrentReaders {
85+
filter, _, err := p.MetaFilter()
86+
if err != nil {
87+
return nil, err
88+
}
89+
metadataFilters = append(metadataFilters, filter)
8790
}
88-
metadataFilters = append(metadataFilters, filter)
8991
}
9092
}
9193
var vbs []vectorBuilder
@@ -101,6 +103,7 @@ func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sb
101103
metadataColIndexes: metadataColIndexes,
102104
metadataFilters: metadataFilters,
103105
nextRowGroup: &atomic.Int64{},
106+
rrs: make([]pqarrow.RecordReader, concurrentReaders),
104107
vbs: vbs,
105108
}, nil
106109
}
@@ -117,7 +120,7 @@ func (p *VectorReader) ConcurrentPull(done bool, id int) (vector.Any, error) {
117120
if err := p.ctx.Err(); err != nil {
118121
return nil, err
119122
}
120-
if p.rr == nil {
123+
if p.rrs[id] == nil {
121124
pr := p.fr.ParquetReader()
122125
rowGroup := int(p.nextRowGroup.Add(1) - 1)
123126
if rowGroup >= pr.NumRowGroups() {
@@ -134,12 +137,12 @@ func (p *VectorReader) ConcurrentPull(done bool, id int) (vector.Any, error) {
134137
if err != nil {
135138
return nil, err
136139
}
137-
p.rr = rr
140+
p.rrs[id] = rr
138141
}
139-
rec, err := p.rr.Read()
142+
rec, err := p.rrs[id].Read()
140143
if err != nil {
141144
if err == io.EOF {
142-
p.rr = nil
145+
p.rrs[id] = nil
143146
continue
144147
}
145148
return nil, err
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
script: |
2+
super -s -c 'from conn.parquet | aggregate count()'
3+
4+
vector: true
5+
6+
inputs:
7+
- name: conn.parquet
8+
9+
outputs:
10+
- name: stdout
11+
data: |
12+
10::uint64

0 commit comments

Comments
 (0)