Skip to content

Commit cca9f4d

Browse files
committed
Also make the decompression buffer temporary
1 parent 3b2b706 commit cca9f4d

File tree

1 file changed

+7
-12
lines changed

1 file changed

+7
-12
lines changed

cpp/src/parquet/column_reader.cc

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,6 @@ class SerializedPageReader : public PageReader {
223223
const CryptoContext* crypto_ctx, bool always_compressed)
224224
: properties_(properties),
225225
stream_(std::move(stream)),
226-
decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
227226
page_ordinal_(0),
228227
seen_num_values_(0),
229228
total_num_values_(total_num_values) {
@@ -238,10 +237,8 @@ class SerializedPageReader : public PageReader {
238237

239238
// Implement the PageReader interface
240239
//
241-
// The returned Page contains references that aren't guaranteed to live
242-
// beyond the next call to NextPage(). SerializedPageReader reuses the
243-
// decryption and decompression buffers internally, so if NextPage() is
244-
// called then the content of previous page might be invalidated.
240+
// The returned Page may contain references that aren't guaranteed to live
241+
// beyond the next call to NextPage().
245242
std::shared_ptr<Page> NextPage() override;
246243

247244
void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
@@ -267,7 +264,6 @@ class SerializedPageReader : public PageReader {
267264

268265
// Compression codec to use.
269266
std::unique_ptr<::arrow::util::Codec> decompressor_;
270-
std::shared_ptr<ResizableBuffer> decompression_buffer_;
271267

272268
bool always_compressed_;
273269

@@ -550,13 +546,12 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
550546
throw ParquetException("Invalid page header");
551547
}
552548

553-
// Grow the uncompressed buffer if we need to.
554-
PARQUET_THROW_NOT_OK(
555-
decompression_buffer_->Resize(uncompressed_len, /*shrink_to_fit=*/false));
549+
// Allocate a buffer to decompress into
550+
auto decompression_buffer = AllocateBuffer(properties_.memory_pool(), uncompressed_len);
556551

557552
if (levels_byte_len > 0) {
558553
// First copy the levels as-is
559-
uint8_t* decompressed = decompression_buffer_->mutable_data();
554+
uint8_t* decompressed = decompression_buffer->mutable_data();
560555
memcpy(decompressed, page_buffer->data(), levels_byte_len);
561556
}
562557

@@ -572,7 +567,7 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
572567
decompressor_->Decompress(
573568
compressed_len - levels_byte_len, page_buffer->data() + levels_byte_len,
574569
uncompressed_len - levels_byte_len,
575-
decompression_buffer_->mutable_data() + levels_byte_len));
570+
decompression_buffer->mutable_data() + levels_byte_len));
576571
}
577572

578573
if (decompressed_len != uncompressed_len - levels_byte_len) {
@@ -581,7 +576,7 @@ std::shared_ptr<Buffer> SerializedPageReader::DecompressIfNeeded(
581576
", but got:" + std::to_string(decompressed_len));
582577
}
583578

584-
return decompression_buffer_;
579+
return decompression_buffer;
585580
}
586581

587582
} // namespace

0 commit comments

Comments
 (0)