Skip to content

Commit 28efee4

Browse files
committed
Revert "Also make the decompression buffer temporary"
This reverts commit 7d639fd82e329c8047f6d1fa132eedd70c783b61.
1 parent cca9f4d commit 28efee4

File tree

1 file changed

+12
-7
lines changed

1 file changed

+12
-7
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,7 @@ class SerializedPageReader : public PageReader {
223223
const CryptoContext* crypto_ctx, bool always_compressed)
224224
: properties_(properties),
225225
stream_(std::move(stream)),
226+
decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
226227
page_ordinal_(0),
227228
seen_num_values_(0),
228229
total_num_values_(total_num_values) {
@@ -237,8 +238,10 @@ class SerializedPageReader : public PageReader {
237238

238239
// Implement the PageReader interface
239240
//
240-
// The returned Page may contain references that aren't guaranteed to live
241-
// beyond the next call to NextPage().
241+
// The returned Page contains references that aren't guaranteed to live
242+
// beyond the next call to NextPage(). SerializedPageReader reuses the
243+
// decryption and decompression buffers internally, so if NextPage() is
244+
// called then the content of previous page might be invalidated.
242245
std::shared_ptr<Page> NextPage() override;
243246

244247
void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
@@ -264,6 +267,7 @@ class SerializedPageReader : public PageReader {
264267

265268
// Compression codec to use.
266269
std::unique_ptr<::arrow::util::Codec> decompressor_;
270+
std::shared_ptr<ResizableBuffer> decompression_buffer_;
267271

268272
bool always_compressed_;
269273

@@ -546,12 +550,13 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
546550
throw ParquetException("Invalid page header");
547551
}
548552

549-
// Allocate a buffer to decompress into
550-
auto decompression_buffer = AllocateBuffer(properties_.memory_pool(), uncompressed_len);
553+
// Grow the uncompressed buffer if we need to.
554+
PARQUET_THROW_NOT_OK(
555+
decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false));
551556

552557
if (levels_byte_len > 0) {
553558
// First copy the levels as-is
554-
uint8_t* decompressed = decompression_buffer->mutable_data();
559+
uint8_t* decompressed = decompression_buffer_->mutable_data();
555560
memcpy(decompressed, page_buffer->data(), levels_byte_len);
556561
}
557562

@@ -567,7 +572,7 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
567572
decompressor_->Decompress(
568573
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
569574
uncompressed_len - levels_byte_len,
570-
decompression_buffer->mutable_data() + levels_byte_len));
575+
decompression_buffer_->mutable_data() + levels_byte_len));
571576
}
572577

573578
if (decompressed_len != uncompressed_len - levels_byte_len) {
@@ -576,7 +581,7 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
576581
", but got:" + std::to_string(decompressed_len));
577582
}
578583

579-
return decompression_buffer;
584+
return decompression_buffer_;
580585
}
581586

582587
} // namespace

0 commit comments

Comments
 (0)