Skip to content

Commit c7bee5e

Browse files
Read uncompressed data directly into the page buffer
1 parent 8edb901 commit c7bee5e

File tree

3 files changed

+61
-16
lines changed

3 files changed

+61
-16
lines changed

internal/utils/buf_reader.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,8 @@ func (r *byteReader) Reset(Reader) {}
111111

112112
func (r *byteReader) BufferSize() int { return len(r.buf) }
113113

114+
func (r *byteReader) Buffered() int { return len(r.buf) - r.pos }
115+
114116
func (r *byteReader) Free() {}
115117

116118
// bytesBufferReader is a byte slice with a bytes reader wrapped around it.

parquet/file/page_reader.go

Lines changed: 57 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,8 @@ type serializedPageReader struct {
374374
dataPageBuffer *memory.Buffer
375375
dictPageBuffer *memory.Buffer
376376
err error
377+
378+
isCompressed bool
377379
}
378380

379381
func (p *serializedPageReader) Close() error {
@@ -402,6 +404,7 @@ func (p *serializedPageReader) init(compressType compress.Compression, ctx *Cryp
402404
return err
403405
}
404406
p.codec = codec
407+
p.isCompressed = compressType != compress.Codecs.Uncompressed
405408

406409
if ctx != nil {
407410
p.cryptoCtx = *ctx
@@ -444,6 +447,7 @@ func NewPageReader(r parquet.BufferedReader, nrows int64, compressType compress.
444447
dictPageBuffer: memory.NewResizableBuffer(mem),
445448
}
446449
rdr.decompressBuffer.ResizeNoShrink(defaultPageHeaderSize)
450+
rdr.isCompressed = compressType != compress.Codecs.Uncompressed
447451
if ctx != nil {
448452
rdr.cryptoCtx = *ctx
449453
rdr.initDecryption()
@@ -460,6 +464,8 @@ func (p *serializedPageReader) Reset(r parquet.BufferedReader, nrows int64, comp
460464
if p.err != nil {
461465
return
462466
}
467+
p.isCompressed = compressType != compress.Codecs.Uncompressed
468+
463469
if ctx != nil {
464470
p.cryptoCtx = *ctx
465471
p.initDecryption()
@@ -502,6 +508,36 @@ func (p *serializedPageReader) Page() Page {
502508
return p.curPage
503509
}
504510

511+
func (p *serializedPageReader) stealFromBuffer(br parquet.BufferedReader, lenUncompressed int) ([]byte, error) {
512+
data, err := br.Peek(lenUncompressed)
513+
if err != nil {
514+
return nil, err
515+
}
516+
if p.cryptoCtx.DataDecryptor != nil {
517+
data = p.cryptoCtx.DataDecryptor.Decrypt(data)
518+
}
519+
// advance the reader
520+
_, err = br.Discard(lenUncompressed)
521+
if err != nil && err != io.EOF {
522+
return nil, err
523+
}
524+
return data, nil
525+
}
526+
527+
func (p *serializedPageReader) readUncompressed(br parquet.BufferedReader, lenUncompressed int, buf []byte) ([]byte, error) {
528+
n, err := io.ReadFull(br, buf[:lenUncompressed])
529+
if err != nil {
530+
return nil, err
531+
}
532+
if n != lenUncompressed {
533+
return nil, fmt.Errorf("parquet: expected to read %d bytes but only read %d", lenUncompressed, n)
534+
}
535+
if p.cryptoCtx.DataDecryptor != nil {
536+
buf = p.cryptoCtx.DataDecryptor.Decrypt(buf)
537+
}
538+
return buf, nil
539+
}
540+
505541
func (p *serializedPageReader) decompress(rd io.Reader, lenCompressed int, buf []byte) ([]byte, error) {
506542
p.decompressBuffer.ResizeNoShrink(lenCompressed)
507543
data := p.decompressBuffer.Bytes()
@@ -583,20 +619,17 @@ func (p *serializedPageReader) GetDictionaryPage() (*DictionaryPage, error) {
583619
return nil, errors.New("parquet: invalid page header (negative number of values)")
584620
}
585621

586-
p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
587-
buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
588-
589-
data, err := p.decompress(rd, lenCompressed, buf.Bytes())
622+
data, err := p.getPageBytes(rd, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer)
590623
if err != nil {
591-
return nil, err
624+
return nil, fmt.Errorf("parquet: could not read dictionary page data: %w", err)
592625
}
593626
if len(data) != lenUncompressed {
594627
return nil, fmt.Errorf("parquet: metadata said %d bytes uncompressed dictionary page, got %d bytes", lenUncompressed, len(data))
595628
}
596629

597630
return &DictionaryPage{
598631
page: page{
599-
buf: buf,
632+
buf: memory.NewBufferBytes(data),
600633
typ: hdr.Type,
601634
nvals: dictHeader.GetNumValues(),
602635
encoding: dictHeader.GetEncoding(),
@@ -693,6 +726,20 @@ func (p *serializedPageReader) SeekToPageWithRow(rowIdx int64) error {
693726
return p.err
694727
}
695728

729+
func (p *serializedPageReader) getPageBytes(
730+
r parquet.BufferedReader, isCompressed bool, lenCompressed, lenUncompressed int, buffer *memory.Buffer,
731+
) ([]byte, error) {
732+
if isCompressed {
733+
buffer.ResizeNoShrink(lenUncompressed)
734+
return p.decompress(r, lenCompressed, buffer.Bytes())
735+
}
736+
if r.Buffered() >= lenCompressed {
737+
return p.stealFromBuffer(r, lenCompressed)
738+
}
739+
buffer.ResizeNoShrink(lenUncompressed)
740+
return p.readUncompressed(r, lenCompressed, buffer.Bytes())
741+
}
742+
696743
func (p *serializedPageReader) Next() bool {
697744
// Loop here because there may be unhandled page types that we skip until
698745
// finding a page that we do know what to do with
@@ -732,10 +779,7 @@ func (p *serializedPageReader) Next() bool {
732779
return false
733780
}
734781

735-
p.dictPageBuffer.ResizeNoShrink(lenUncompressed)
736-
buf := memory.NewBufferBytes(p.dictPageBuffer.Bytes())
737-
738-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
782+
data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dictPageBuffer)
739783
if err != nil {
740784
p.err = err
741785
return false
@@ -748,7 +792,7 @@ func (p *serializedPageReader) Next() bool {
748792
// make dictionary page
749793
p.curPage = &DictionaryPage{
750794
page: page{
751-
buf: buf,
795+
buf: memory.NewBufferBytes(data),
752796
typ: p.curPageHdr.Type,
753797
nvals: dictHeader.GetNumValues(),
754798
encoding: dictHeader.GetEncoding(),
@@ -764,13 +808,10 @@ func (p *serializedPageReader) Next() bool {
764808
return false
765809
}
766810

767-
p.dataPageBuffer.ResizeNoShrink(lenUncompressed)
768-
buf := memory.NewBufferBytes(p.dataPageBuffer.Bytes())
769-
770811
firstRowIdx := p.rowsSeen
771812
p.rowsSeen += int64(dataHeader.GetNumValues())
772813

773-
data, err := p.decompress(p.r, lenCompressed, buf.Bytes())
814+
data, err := p.getPageBytes(p.r, p.isCompressed, lenCompressed, lenUncompressed, p.dataPageBuffer)
774815
if err != nil {
775816
p.err = err
776817
return false
@@ -783,7 +824,7 @@ func (p *serializedPageReader) Next() bool {
783824
// make datapagev1
784825
p.curPage = &DataPageV1{
785826
page: page{
786-
buf: buf,
827+
buf: memory.NewBufferBytes(data),
787828
typ: p.curPageHdr.Type,
788829
nvals: dataHeader.GetNumValues(),
789830
encoding: dataHeader.GetEncoding(),

parquet/reader_properties.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ type BufferedReader interface {
5050
Peek(int) ([]byte, error)
5151
Discard(int) (int, error)
5252
Outer() utils.Reader
53+
// Buffered returns the number of bytes already read and stored in the buffer
54+
Buffered() int
5355
BufferSize() int
5456
Reset(utils.Reader)
5557
Free()

0 commit comments

Comments
 (0)