Skip to content

Commit 70bf34d

Browse files
nikitamikhaylovmkmkme
authored andcommitted
Merge pull request ClickHouse#88827 from ClickHouse/pqe
Enable parquet reader v3 by default
1 parent c1f266b commit 70bf34d

19 files changed

+147
-53
lines changed

src/Common/threadPoolCallbackRunner.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ void ThreadPoolCallbackRunnerFast::operator()(std::function<void()> f)
6969
{
7070
std::unique_lock lock(mutex);
7171
queue.push_back(std::move(f));
72+
7273
startMoreThreadsIfNeeded(active_tasks_, lock);
7374
}
7475

@@ -92,23 +93,35 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector<std::function<void()
9293
if (mode == Mode::Disabled)
9394
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");
9495

95-
size_t active_tasks_ = fs.size() + active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
96+
size_t n = fs.size();
97+
size_t active_tasks_ = n + active_tasks.fetch_add(n, std::memory_order_relaxed);
9698

9799
{
98100
std::unique_lock lock(mutex);
99101
queue.insert(queue.end(), std::move_iterator(fs.begin()), std::move_iterator(fs.end()));
100-
startMoreThreadsIfNeeded(active_tasks_, lock);
102+
103+
try
104+
{
105+
startMoreThreadsIfNeeded(active_tasks_, lock);
106+
}
107+
catch (...)
108+
{
109+
/// Keep `queue` consistent with `queue_size`.
110+
queue.erase(queue.end() - n, queue.end());
111+
active_tasks.fetch_sub(n, std::memory_order_relaxed);
112+
throw;
113+
}
101114
}
102115

103116
if (mode == Mode::ThreadPool)
104117
{
105118
#ifdef OS_LINUX
106-
UInt32 prev_size = queue_size.fetch_add(fs.size(), std::memory_order_release);
119+
UInt32 prev_size = queue_size.fetch_add(n, std::memory_order_release);
107120
if (prev_size < max_threads)
108-
futexWake(&queue_size, fs.size());
121+
futexWake(&queue_size, n);
109122
#else
110-
if (fs.size() < 4)
111-
for (size_t i = 0; i < fs.size(); ++i)
123+
if (n < 4)
124+
for (size_t i = 0; i < n; ++i)
112125
queue_cv.notify_one();
113126
else
114127
queue_cv.notify_all();

src/Core/FormatFactorySettings.h

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ When reading Parquet files, parse JSON columns as ClickHouse JSON Column.
182182
DECLARE(Bool, input_format_parquet_use_native_reader, false, R"(
183183
Use native parquet reader v1. It's relatively fast but unfinished. Deprecated.
184184
)", 0) \
185-
DECLARE(Bool, input_format_parquet_use_native_reader_v3, false, R"(
186-
Use Parquet reader v3. Experimental.
187-
)", EXPERIMENTAL) \
185+
DECLARE(Bool, input_format_parquet_use_native_reader_v3, true, R"(
186+
Use Parquet reader v3.
187+
)", 0) \
188188
DECLARE(UInt64, input_format_parquet_memory_low_watermark, 2ul << 20, R"(
189189
Schedule prefetches more aggressively if memory usage is below than threshold. Potentially useful e.g. if there are many small bloom filters to read over network.
190190
)", 0) \

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5656
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
5757
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
5858
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
59+
{"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
5960
{"input_format_parquet_verify_checksums", true, true, "New setting."},
6061
{"output_format_parquet_write_checksums", false, true, "New setting."},
6162
});
@@ -82,7 +83,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
8283
{"distributed_cache_connect_max_tries", 20, 5, "Changed setting value"},
8384
{"opentelemetry_trace_cpu_scheduling", false, false, "New setting to trace `cpu_slot_preemption` feature."},
8485
{"output_format_parquet_max_dictionary_size", 1024 * 1024, 1024 * 1024, "New setting"},
85-
{"input_format_parquet_use_native_reader_v3", false, true, "New setting"},
86+
{"input_format_parquet_use_native_reader_v3", false, false, "New setting"},
8687
{"input_format_parquet_memory_low_watermark", 2ul << 20, 2ul << 20, "New setting"},
8788
{"input_format_parquet_memory_high_watermark", 4ul << 30, 4ul << 30, "New setting"},
8889
{"input_format_parquet_page_filter_push_down", true, true, "New setting (no effect when input_format_parquet_use_native_reader_v3 is disabled)"},

src/Processors/Formats/Impl/Parquet/Decoding.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ void Dictionary::index(const ColumnUInt32 & indexes_col, IColumn & out)
10031003
c.reserve(c.size() + indexes.size());
10041004
for (UInt32 idx : indexes)
10051005
{
1006-
size_t start = offsets[size_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
1006+
size_t start = offsets[ssize_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
10071007
size_t len = offsets[idx] - start;
10081008
/// TODO [parquet]: Try optimizing short memcpy by taking advantage of padding (maybe memcpySmall.h helps). Also in PlainStringDecoder.
10091009
c.insertData(data.data() + start, len);
@@ -1219,7 +1219,7 @@ void TrivialStringConverter::convertColumn(std::span<const char> chars, const UI
12191219
{
12201220
col_str.getChars().reserve(col_str.getChars().size() + (offsets[num_values - 1] - offsets[-1]) - separator_bytes * num_values);
12211221
for (size_t i = 0; i < num_values; ++i)
1222-
col_str.insertData(chars.data() + offsets[i - 1], offsets[i] - offsets[i - 1] - separator_bytes);
1222+
col_str.insertData(chars.data() + offsets[ssize_t(i) - 1], offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes);
12231223
}
12241224
}
12251225

@@ -1345,8 +1345,8 @@ void BigEndianDecimalStringConverter<T>::convertColumn(std::span<const char> cha
13451345

13461346
for (size_t i = 0; i < num_values; ++i)
13471347
{
1348-
const char * data = chars.data() + offsets[i - 1];
1349-
size_t size = offsets[i] - offsets[i - 1] - separator_bytes;
1348+
const char * data = chars.data() + offsets[ssize_t(i) - 1];
1349+
size_t size = offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes;
13501350
if (size > sizeof(T))
13511351
throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpectedly wide Decimal value: {} > {} bytes", size, sizeof(T));
13521352

src/Processors/Formats/Impl/Parquet/Prefetcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void Prefetcher::determineReadModeAndFileSize(ReadBuffer * reader_, const ReadOp
7474
if (!reader_->eof() && reader_->available() >= expected_prefix.size() &&
7575
memcmp(reader_->position(), expected_prefix.data(), expected_prefix.size()) != 0)
7676
{
77-
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a parquet file (wrong magic bytes at the start)");
77+
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a Parquet file (wrong magic bytes at the start)");
7878
}
7979

8080
WriteBufferFromVector<PaddedPODArray<char>> out(entire_file);

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,7 @@ ReadManager::ReadResult ReadManager::read()
846846
bool thread_pool_was_idle = parser_shared_resources->parsing_runner.isIdle();
847847

848848
if (exception)
849-
std::rethrow_exception(exception);
849+
std::rethrow_exception(copyMutableException(exception));
850850

851851
/// If `preserve_order`, only deliver chunks from `first_incomplete_row_group`.
852852
/// This ensures that row groups are delivered in order. Within a row group, row

src/Processors/Formats/Impl/Parquet/Reader.cpp

Lines changed: 34 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <Storages/SelectQueryInfo.h>
1616

1717
#include <lz4.h>
18+
#include <arrow/util/crc32.h>
1819

1920
#if USE_SNAPPY
2021
#include <snappy.h>
@@ -28,6 +29,7 @@ namespace DB::ErrorCodes
2829
extern const int INCORRECT_DATA;
2930
extern const int LOGICAL_ERROR;
3031
extern const int NOT_IMPLEMENTED;
32+
extern const int CHECKSUM_DOESNT_MATCH;
3133
}
3234

3335
namespace DB::Parquet
@@ -176,7 +178,7 @@ parq::FileMetaData Reader::readFileMetaData(Prefetcher & prefetcher)
176178
prefetcher.readSync(buf.data(), initial_read_size, file_size - initial_read_size);
177179

178180
if (memcmp(buf.data() + initial_read_size - 4, "PAR1", 4) != 0)
179-
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a parquet file (wrong magic bytes at the end of file)");
181+
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a Parquet file (wrong magic bytes at the end of file)");
180182

181183
int32_t metadata_size_i32;
182184
memcpy(&metadata_size_i32, buf.data() + initial_read_size - 8, 4);
@@ -216,7 +218,7 @@ parq::FileMetaData Reader::readFileMetaData(Prefetcher & prefetcher)
216218
/// present. Instead, data_page_offset points to the dictionary page.
217219
/// (2) Old DuckDB versions (<= 0.10.2) wrote incorrect data_page_offset when dictionary is
218220
/// present.
219-
/// We work around (1) in initializePage by allowing dictionary page in place of data page.
221+
/// We work around (1) in initializeDataPage by allowing dictionary page in place of data page.
220222
/// We work around (2) here by converting it to case (1):
221223
/// data_page_offset = dictionary_page_offset
222224
/// dictionary_page_offset.reset()
@@ -756,8 +758,9 @@ void Reader::processBloomFilterHeader(ColumnChunk & column, const PrimitiveColum
756758
bool Reader::decodeDictionaryPage(ColumnChunk & column, const PrimitiveColumnInfo & column_info)
757759
{
758760
auto data = prefetcher.getRangeData(column.dictionary_page_prefetch);
759-
parq::PageHeader header;
760-
size_t header_size = deserializeThriftStruct(header, data.data(), data.size());
761+
const char * data_ptr = data.data();
762+
const char * data_end = data.data() + data.size();
763+
auto [header, page_data] = decodeAndCheckPageHeader(data_ptr, data_end);
761764

762765
if (header.type != parq::PageType::DICTIONARY_PAGE)
763766
{
@@ -768,15 +771,14 @@ bool Reader::decodeDictionaryPage(ColumnChunk & column, const PrimitiveColumnInf
768771
return false;
769772
}
770773

771-
decodeDictionaryPageImpl(header, data.subspan(header_size), column, column_info);
774+
decodeDictionaryPageImpl(header, page_data, column, column_info);
772775
return true;
773776
}
774777

775778
void Reader::decodeDictionaryPageImpl(const parq::PageHeader & header, std::span<const char> data, ColumnChunk & column, const PrimitiveColumnInfo & column_info)
776779
{
777780
chassert(header.type == parq::PageType::DICTIONARY_PAGE);
778781

779-
/// TODO [parquet]: Check checksum.
780782
size_t compressed_page_size = size_t(header.compressed_page_size);
781783
if (header.compressed_page_size < 0 || compressed_page_size > data.size())
782784
throw Exception(ErrorCodes::INCORRECT_DATA, "Dictionary page size out of bounds: {} > {}", header.compressed_page_size, data.size());
@@ -1381,7 +1383,7 @@ void Reader::skipToRow(size_t row_idx, ColumnChunk & column, const PrimitiveColu
13811383

13821384
auto data = prefetcher.getRangeData(page_info.prefetch);
13831385
const char * ptr = data.data();
1384-
if (!initializePage(ptr, ptr + data.size(), first_row_idx, page_info.end_row_idx, row_idx, column, column_info))
1386+
if (!initializeDataPage(ptr, ptr + data.size(), first_row_idx, page_info.end_row_idx, row_idx, column, column_info))
13851387
throw Exception(ErrorCodes::LOGICAL_ERROR, "Page doesn't contain requested row");
13861388
found_page = true;
13871389
}
@@ -1403,12 +1405,33 @@ void Reader::skipToRow(size_t row_idx, ColumnChunk & column, const PrimitiveColu
14031405
chassert(column.next_page_offset <= all_pages.size());
14041406
const char * ptr = all_pages.data() + column.next_page_offset;
14051407
const char * end = all_pages.data() + all_pages.size();
1406-
initializePage(ptr, end, page.next_row_idx, /*end_row_idx=*/ std::nullopt, row_idx, column, column_info);
1408+
initializeDataPage(ptr, end, page.next_row_idx, /*end_row_idx=*/ std::nullopt, row_idx, column, column_info);
14071409
column.next_page_offset = ptr - all_pages.data();
14081410
}
14091411
}
14101412

1411-
bool Reader::initializePage(const char * & data_ptr, const char * data_end, size_t next_row_idx, std::optional<size_t> end_row_idx, size_t target_row_idx, ColumnChunk & column, const PrimitiveColumnInfo & column_info)
1413+
std::tuple<parq::PageHeader, std::span<const char>> Reader::decodeAndCheckPageHeader(const char * & data_ptr, const char * data_end) const
1414+
{
1415+
parq::PageHeader header;
1416+
data_ptr += deserializeThriftStruct(header, data_ptr, data_end - data_ptr);
1417+
size_t compressed_page_size = size_t(header.compressed_page_size);
1418+
if (header.compressed_page_size < 0 || compressed_page_size > size_t(data_end - data_ptr))
1419+
throw Exception(ErrorCodes::INCORRECT_DATA, "Page size out of bounds: {} > {}", header.compressed_page_size, data_end - data_ptr);
1420+
1421+
std::span page_data(data_ptr, compressed_page_size);
1422+
data_ptr += compressed_page_size;
1423+
1424+
if (header.__isset.crc && options.format.parquet.verify_checksums)
1425+
{
1426+
uint32_t crc = arrow::internal::crc32(0, page_data.data(), page_data.size());
1427+
if (crc != uint32_t(header.crc))
1428+
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Page CRC checksum verification failed");
1429+
}
1430+
1431+
return {header, page_data};
1432+
}
1433+
1434+
bool Reader::initializeDataPage(const char * & data_ptr, const char * data_end, size_t next_row_idx, std::optional<size_t> end_row_idx, size_t target_row_idx, ColumnChunk & column, const PrimitiveColumnInfo & column_info)
14121435
{
14131436
PageState & page = column.page;
14141437
/// We reuse PageState instance across pages to reuse memory in buffers like decompressed_buf.
@@ -1425,13 +1448,7 @@ bool Reader::initializePage(const char * & data_ptr, const char * data_end, size
14251448
/// Decode page header.
14261449

14271450
parq::PageHeader header;
1428-
data_ptr += deserializeThriftStruct(header, data_ptr, data_end - data_ptr);
1429-
/// TODO [parquet]: Check checksum.
1430-
size_t compressed_page_size = size_t(header.compressed_page_size);
1431-
if (header.compressed_page_size < 0 || compressed_page_size > size_t(data_end - data_ptr))
1432-
throw Exception(ErrorCodes::INCORRECT_DATA, "Page size out of bounds: {} > {}", header.compressed_page_size, data_end - data_ptr);
1433-
page.data = std::span(data_ptr, compressed_page_size);
1434-
data_ptr += compressed_page_size;
1451+
std::tie(header, page.data) = decodeAndCheckPageHeader(data_ptr, data_end);
14351452

14361453
/// Check if all rows of the page are filtered out, if we have enough information.
14371454

@@ -1525,7 +1542,7 @@ bool Reader::initializePage(const char * & data_ptr, const char * data_end, size
15251542
page.codec = parq::CompressionCodec::UNCOMPRESSED;
15261543
}
15271544

1528-
if (encoded_def_size + encoded_rep_size > compressed_page_size)
1545+
if (encoded_def_size + encoded_rep_size > page.data.size())
15291546
throw Exception(ErrorCodes::INCORRECT_DATA, "Page data is too short (def+rep)");
15301547
encoded_rep = page.data.data();
15311548
encoded_def = page.data.data() + encoded_rep_size;

src/Processors/Formats/Impl/Parquet/Reader.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ namespace DB::Parquet
6161
// - no columns to read outside prewhere
6262
// - no columns to read, but not trivial count either
6363
// - ROW POLICY, with and without prewhere, with old and new reader
64-
// - prewhere with defaults (it probably doesn't fill them correctly, see MergeTreeRangeReader::executeActionsBeforePrewhere)
64+
// - prewhere and other skipping with defaults (it probably doesn't fill them correctly, see MergeTreeRangeReader::executeActionsBeforePrewhere)
6565
// - prewhere on virtual columns (do they end up in additional_columns?)
6666
// - prewhere with weird filter type (LowCardinality(UInt8), Nullable(UInt8), const UInt8)
6767
// - prewhere involving arrays and tuples
@@ -523,7 +523,8 @@ struct Reader
523523
double estimateAverageStringLengthPerRow(const ColumnChunk & column, const RowGroup & row_group) const;
524524
void decodeDictionaryPageImpl(const parq::PageHeader & header, std::span<const char> data, ColumnChunk & column, const PrimitiveColumnInfo & column_info);
525525
void skipToRow(size_t row_idx, ColumnChunk & column, const PrimitiveColumnInfo & column_info);
526-
bool initializePage(const char * & data_ptr, const char * data_end, size_t next_row_idx, std::optional<size_t> end_row_idx, size_t target_row_idx, ColumnChunk & column, const PrimitiveColumnInfo & column_info);
526+
std::tuple<parq::PageHeader, std::span<const char>> decodeAndCheckPageHeader(const char * & data_ptr, const char * data_end) const;
527+
bool initializeDataPage(const char * & data_ptr, const char * data_end, size_t next_row_idx, std::optional<size_t> end_row_idx, size_t target_row_idx, ColumnChunk & column, const PrimitiveColumnInfo & column_info);
527528
void decompressPageIfCompressed(PageState & page);
528529
void createPageDecoder(PageState & page, ColumnChunk & column, const PrimitiveColumnInfo & column_info);
529530
bool skipRowsInPage(size_t target_row_idx, PageState & page, ColumnChunk & column, const PrimitiveColumnInfo & column_info);

src/Processors/Formats/Impl/Parquet/SchemaConverter.cpp

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,12 @@ std::string_view SchemaConverter::useColumnMapperIfNeeded(const parq::SchemaElem
133133
return element.name;
134134
const auto & map = column_mapper->getFieldIdToClickHouseName();
135135
if (!element.__isset.field_id)
136-
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Missing field_id for column {}", element.name);
136+
{
137+
/// Does iceberg require that parquet files have field ids?
138+
/// Our iceberg writer currently doesn't write them.
139+
//throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Missing field_id for column {}", element.name);
140+
return element.name;
141+
}
137142
auto it = map.find(element.field_id);
138143
if (it == map.end())
139144
throw Exception(ErrorCodes::ICEBERG_SPECIFICATION_VIOLATION, "Parquet file has column {} with field_id {} that is not in datalake metadata", element.name, element.field_id);
@@ -270,15 +275,19 @@ void SchemaConverter::processSubtree(TraversalNode & node)
270275
}
271276
}
272277

273-
bool SchemaConverter::processSubtreePrimitive(TraversalNode & node)
278+
static bool isPrimitiveNode(const parq::SchemaElement & elem)
274279
{
275280
/// `parquet.thrift` says "[num_children] is not set when the element is a primitive type".
276-
/// If it's set but has value 0, logically it would make sense to interpret it as empty tuple/struct.
281+
/// If it's set but has value 0, logically it should be an empty tuple/struct.
277282
/// But in practice some writers are sloppy about it and set this field to 0 (rather than unset)
278283
/// for primitive columns. E.g.
279284
/// tests/queries/0_stateless/data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet
280-
bool is_primitive = !node.element->__isset.num_children || (node.element->num_children == 0 && node.element->__isset.type);
281-
if (!is_primitive)
285+
return !elem.__isset.num_children || (elem.num_children == 0 && elem.__isset.type);
286+
}
287+
288+
bool SchemaConverter::processSubtreePrimitive(TraversalNode & node)
289+
{
290+
if (!isPrimitiveNode(*node.element))
282291
return false;
283292

284293
primitive_column_idx += 1;
@@ -468,13 +477,18 @@ bool SchemaConverter::processSubtreeMap(TraversalNode & node)
468477
bool SchemaConverter::processSubtreeArrayOuter(TraversalNode & node)
469478
{
470479
/// Array:
471-
/// required group `name` (List):
480+
/// required/optional group `name` (List):
472481
/// repeated group "list":
473482
/// <recurse> "element"
474483
///
475484
/// I.e. it's a double-wrapped burrito. To unwrap it into one Array, we have to coordinate
476485
/// across two levels of recursion: processSubtreeArrayOuter for the outer wrapper,
477486
/// processSubtreeArrayInner for the inner wrapper.
487+
///
488+
/// But hudi writes arrays differently, without the inner group:
489+
/// required/optional group `name` (List):
490+
/// repeated <recurse> "array"
491+
/// This probably makes it indinsinguishable from a single-element tuple.
478492

479493
if (node.element->converted_type != parq::ConvertedType::LIST && !node.element->logicalType.__isset.LIST)
480494
return false;
@@ -483,10 +497,12 @@ bool SchemaConverter::processSubtreeArrayOuter(TraversalNode & node)
483497
if (node.element->num_children != 1)
484498
return false;
485499
const parq::SchemaElement & child = file_metadata.schema.at(schema_idx);
486-
if (child.repetition_type != parq::FieldRepetitionType::REPEATED || child.num_children != 1)
500+
if (child.repetition_type != parq::FieldRepetitionType::REPEATED)
487501
return false;
488502

489-
TraversalNode subnode = node.prepareToRecurse(SchemaContext::ListTuple, node.type_hint);
503+
bool has_inner_group = child.num_children == 1;
504+
505+
TraversalNode subnode = node.prepareToRecurse(has_inner_group ? SchemaContext::ListTuple : SchemaContext::ListElement, node.type_hint);
490506
processSubtree(subnode);
491507

492508
if (!node.requested || !subnode.output_idx.has_value())

0 commit comments

Comments
 (0)