diff --git a/sio/parquetio/vectorreader.go b/sio/parquetio/vectorreader.go index 15ee7a29c..fe981880b 100644 --- a/sio/parquetio/vectorreader.go +++ b/sio/parquetio/vectorreader.go @@ -36,7 +36,7 @@ type VectorReader struct { metadataColIndexes []int metadataFilters []expr.Evaluator nextRowGroup *atomic.Int64 - rr pqarrow.RecordReader + rrs []pqarrow.RecordReader vbs []vectorBuilder } @@ -68,24 +68,26 @@ func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sb var metadataColIndexes []int var metadataFilters []expr.Evaluator if p != nil { - _, projection, err := p.MetaFilter() + filter, projection, err := p.MetaFilter() if err != nil { return nil, err } - paths := projection.Paths() - for i, p := range paths { - // Trim trailing "max" or "min". - paths[i] = p[:len(p)-1] - } - colIndexes := columnIndexes(pr.MetaData().Schema, paths) - // Remove duplicates created above by trimming "max" and "min". - metadataColIndexes = slices.Compact(colIndexes) - for range concurrentReaders { - filter, _, err := p.MetaFilter() - if err != nil { - return nil, err + if filter != nil { + paths := projection.Paths() + for i, p := range paths { + // Trim trailing "max" or "min". + paths[i] = p[:len(p)-1] + } + colIndexes := columnIndexes(pr.MetaData().Schema, paths) + // Remove duplicates created above by trimming "max" and "min". + metadataColIndexes = slices.Compact(colIndexes) + for range concurrentReaders { + filter, _, err := p.MetaFilter() + if err != nil { + return nil, err + } + metadataFilters = append(metadataFilters, filter) } - metadataFilters = append(metadataFilters, filter) } } var vbs []vectorBuilder @@ -101,6 +103,7 @@ func NewVectorReader(ctx context.Context, sctx *super.Context, r io.Reader, p sb metadataColIndexes: metadataColIndexes, metadataFilters: metadataFilters, nextRowGroup: &atomic.Int64{}, + rrs: make([]pqarrow.RecordReader, concurrentReaders), vbs: vbs, }, nil } @@ -117,7 +120,7 @@ func (p *VectorReader) ConcurrentPull(done bool, id int) (vector.Any, error) { if err := p.ctx.Err(); err != nil { return nil, err } - if p.rr == nil { + if p.rrs[id] == nil { pr := p.fr.ParquetReader() rowGroup := int(p.nextRowGroup.Add(1) - 1) if rowGroup >= pr.NumRowGroups() { @@ -134,12 +137,12 @@ func (p *VectorReader) ConcurrentPull(done bool, id int) (vector.Any, error) { if err != nil { return nil, err } - p.rr = rr + p.rrs[id] = rr } - rec, err := p.rr.Read() + rec, err := p.rrs[id].Read() if err != nil { if err == io.EOF { - p.rr = nil + p.rrs[id] = nil continue } return nil, err diff --git a/sio/parquetio/ztests/aggregate.yaml b/sio/parquetio/ztests/aggregate.yaml new file mode 100644 index 000000000..25b05ab3a --- /dev/null +++ b/sio/parquetio/ztests/aggregate.yaml @@ -0,0 +1,12 @@ +script: | + super -s -c 'from conn.parquet | aggregate count()' + +vector: true + +inputs: + - name: conn.parquet + +outputs: + - name: stdout + data: | + 10::uint64