Skip to content

Commit eb0921e

Browse files
committed
add support for max_page_header_size for pyarrow
1 parent e5c8ccd commit eb0921e

File tree

8 files changed

+35
-12
lines changed

8 files changed

+35
-12
lines changed

cpp/src/arrow/dataset/file_parquet.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,9 @@ parquet::ReaderProperties MakeReaderProperties(
7676
properties.disable_buffered_stream();
7777
}
7878
properties.set_buffer_size(parquet_scan_options->reader_properties->buffer_size());
79+
if (format.reader_options.max_page_header_size != 0) {
80+
properties.set_max_page_header_size(format.reader_options.max_page_header_size);
81+
}
7982

8083
auto file_decryption_prop =
8184
parquet_scan_options->reader_properties->file_decryption_properties();

cpp/src/arrow/dataset/file_parquet.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
9292
arrow::TimeUnit::type coerce_int96_timestamp_unit = arrow::TimeUnit::NANO;
9393
Type::type binary_type = Type::BINARY;
9494
Type::type list_type = Type::LIST;
95+
uint32_t max_page_header_size = 0;
9596
/// @}
9697
} reader_options;
9798

cpp/src/parquet/column_reader.cc

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,6 @@ class SerializedPageReader : public PageReader {
231231
crypto_ctx_ = *crypto_ctx;
232232
InitDecryption();
233233
}
234-
max_page_header_size_ = kDefaultMaxPageHeaderSize;
235234
decompressor_ = GetCodec(codec);
236235
always_compressed_ = always_compressed;
237236
}
@@ -244,7 +243,9 @@ class SerializedPageReader : public PageReader {
244243
// called then the content of previous page might be invalidated.
245244
std::shared_ptr<Page> NextPage() override;
246245

247-
void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; }
246+
void set_max_page_header_size(uint32_t size) override {
247+
properties_.set_max_page_header_size(size);
248+
}
248249

249250
private:
250251
void UpdateDecryption(Decryptor* decryptor, int8_t module_type, std::string* page_aad);
@@ -260,7 +261,7 @@ class SerializedPageReader : public PageReader {
260261
// Fills in data_page_statistics.
261262
bool ShouldSkipPage(EncodedStatistics* data_page_statistics);
262263

263-
const ReaderProperties properties_;
264+
ReaderProperties properties_;
264265
std::shared_ptr<ArrowInputStream> stream_;
265266

266267
format::PageHeader current_page_header_;
@@ -288,9 +289,6 @@ class SerializedPageReader : public PageReader {
288289
// The ordinal fields in the context below are used for AAD suffix calculation.
289290
int32_t page_ordinal_; // page ordinal does not count the dictionary page
290291

291-
// Maximum allowed page size
292-
uint32_t max_page_header_size_;
293-
294292
// Number of values read in data pages so far
295293
int64_t seen_num_values_;
296294

@@ -422,7 +420,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
422420
std::stringstream ss;
423421
ss << e.what();
424422
allowed_page_size *= 2;
425-
if (allowed_page_size > max_page_header_size_) {
423+
if (allowed_page_size > properties_.max_page_header_size()) {
426424
ss << "Deserializing page header failed.\n";
427425
throw ParquetException(ss.str());
428426
}

cpp/src/parquet/column_reader.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,9 +50,6 @@ namespace parquet {
5050
class Decryptor;
5151
class Page;
5252

53-
// 16 MB is the default maximum page header size
54-
static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
55-
5653
// 16 KB is the default expected page header size
5754
static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024;
5855

cpp/src/parquet/properties.h

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ enum class SizeStatisticsLevel : uint8_t {
5757
PageAndColumnChunk
5858
};
5959

60+
// 16 MB is the default maximum page header size
61+
static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024;
62+
6063
/// Align the default buffer size to a small multiple of a page size.
6164
constexpr int64_t kDefaultBufferSize = 4096 * 4;
6265

@@ -101,6 +104,11 @@ class PARQUET_EXPORT ReaderProperties {
101104
/// Set the size of the buffered stream buffer in bytes.
102105
void set_buffer_size(int64_t size) { buffer_size_ = size; }
103106

107+
/// Return the size of the buffered stream buffer. 0 means default
108+
uint32_t max_page_header_size() const { return max_page_header_size_; }
109+
/// Set the size of the buffered stream buffer in bytes. 0 means default
110+
void set_max_page_header_size(uint32_t size) { max_page_header_size_ = size; }
111+
104112
/// \brief Return the size limit on thrift strings.
105113
///
106114
/// This limit helps prevent space and time bombs in files, but may need to
@@ -142,6 +150,7 @@ class PARQUET_EXPORT ReaderProperties {
142150
private:
143151
MemoryPool* pool_;
144152
int64_t buffer_size_ = kDefaultBufferSize;
153+
uint32_t max_page_header_size_ = kDefaultMaxPageHeaderSize;
145154
int32_t thrift_string_size_limit_ = kDefaultThriftStringSizeLimit;
146155
int32_t thrift_container_size_limit_ = kDefaultThriftContainerSizeLimit;
147156
bool buffered_stream_enabled_ = false;

python/pyarrow/_parquet.pyx

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1571,7 +1571,8 @@ cdef class ParquetReader(_Weakrefable):
15711571
thrift_string_size_limit=None,
15721572
thrift_container_size_limit=None,
15731573
page_checksum_verification=False,
1574-
arrow_extensions_enabled=False):
1574+
arrow_extensions_enabled=False,
1575+
max_page_header_size=None):
15751576
"""
15761577
Open a parquet file for reading.
15771578
@@ -1591,6 +1592,7 @@ cdef class ParquetReader(_Weakrefable):
15911592
thrift_container_size_limit : int, optional
15921593
page_checksum_verification : bool, default False
15931594
arrow_extensions_enabled : bool, default False
1595+
max_page_header_size : int, default None
15941596
"""
15951597
cdef:
15961598
shared_ptr[CFileMetaData] c_metadata
@@ -1624,6 +1626,11 @@ cdef class ParquetReader(_Weakrefable):
16241626
"must be larger than zero")
16251627
properties.set_thrift_container_size_limit(
16261628
thrift_container_size_limit)
1629+
if max_page_header_size is not None:
1630+
if max_page_header_size <= 0:
1631+
raise ValueError("max_page_header_size "
1632+
"must be larger than zero")
1633+
properties.set_max_page_header_size(max_page_header_size)
16271634

16281635
if decryption_properties is not None:
16291636
properties.file_decryption_properties(

python/pyarrow/includes/libparquet.pxd

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -422,6 +422,9 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
422422
void set_buffer_size(int64_t buf_size)
423423
int64_t buffer_size() const
424424

425+
void set_max_page_header_size(uint32_t size)
426+
uint32_t max_page_header_size() const
427+
425428
void set_thrift_string_size_limit(int32_t size)
426429
int32_t thrift_string_size_limit() const
427430

python/pyarrow/parquet/core.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,6 +265,9 @@ class ParquetFile:
265265
If True, read Parquet logical types as Arrow extension types where possible,
266266
(e.g., read JSON as the canonical `arrow.json` extension type or UUID as
267267
the canonical `arrow.uuid` extension type).
268+
max_page_header_size : int, default None
269+
If not None, override the maximum size of a page header.
270+
Deafults to 16MB, which should be sufficient for most Parquet files.
268271
269272
Examples
270273
--------
@@ -314,7 +317,8 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
314317
coerce_int96_timestamp_unit=None,
315318
decryption_properties=None, thrift_string_size_limit=None,
316319
thrift_container_size_limit=None, filesystem=None,
317-
page_checksum_verification=False, arrow_extensions_enabled=True):
320+
page_checksum_verification=False, arrow_extensions_enabled=True,
321+
max_page_header_size=None):
318322

319323
self._close_source = getattr(source, 'closed', True)
320324

@@ -336,6 +340,7 @@ def __init__(self, source, *, metadata=None, common_metadata=None,
336340
thrift_container_size_limit=thrift_container_size_limit,
337341
page_checksum_verification=page_checksum_verification,
338342
arrow_extensions_enabled=arrow_extensions_enabled,
343+
max_page_header_size=max_page_header_size,
339344
)
340345
self.common_metadata = common_metadata
341346
self._nested_paths_by_prefix = self._build_nested_paths()

0 commit comments

Comments
 (0)