Skip to content

Commit c816023

Browse files
authored
Merge pull request #1232 from Altinity/backports/antalya-25.8/79012
Antalya 25.8 Backport of ClickHouse#79012, ClickHouse#87735 and ClickHouse#88827 - Enable parquet reader v3 by default
2 parents 02ac2fb + 0d3285f commit c816023

File tree

79 files changed

+1148
-1221
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

79 files changed

+1148
-1221
lines changed

src/Client/BuzzHouse/Generator/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ std::unordered_map<String, CHSetting> serverSettings = {
559559
{"input_format_parquet_case_insensitive_column_matching", trueOrFalseSettingNoOracle},
560560
{"input_format_parquet_enable_json_parsing", trueOrFalseSettingNoOracle},
561561
{"input_format_parquet_enable_row_group_prefetch", trueOrFalseSettingNoOracle},
562+
{"input_format_parquet_verify_checksums", trueOrFalseSettingNoOracle},
562563
{"input_format_parquet_filter_push_down", trueOrFalseSetting},
563564
{"input_format_parquet_preserve_order", trueOrFalseSettingNoOracle},
564565
{"input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference", trueOrFalseSettingNoOracle},

src/Common/threadPoolCallbackRunner.cpp

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ void ThreadPoolCallbackRunnerFast::operator()(std::function<void()> f)
6969
{
7070
std::unique_lock lock(mutex);
7171
queue.push_back(std::move(f));
72+
7273
startMoreThreadsIfNeeded(active_tasks_, lock);
7374
}
7475

@@ -92,23 +93,35 @@ void ThreadPoolCallbackRunnerFast::bulkSchedule(std::vector<std::function<void()
9293
if (mode == Mode::Disabled)
9394
throw Exception(ErrorCodes::LOGICAL_ERROR, "Thread pool runner is not initialized");
9495

95-
size_t active_tasks_ = fs.size() + active_tasks.fetch_add(fs.size(), std::memory_order_relaxed);
96+
size_t n = fs.size();
97+
size_t active_tasks_ = n + active_tasks.fetch_add(n, std::memory_order_relaxed);
9698

9799
{
98100
std::unique_lock lock(mutex);
99101
queue.insert(queue.end(), std::move_iterator(fs.begin()), std::move_iterator(fs.end()));
100-
startMoreThreadsIfNeeded(active_tasks_, lock);
102+
103+
try
104+
{
105+
startMoreThreadsIfNeeded(active_tasks_, lock);
106+
}
107+
catch (...)
108+
{
109+
/// Keep `queue` consistent with `queue_size`.
110+
queue.erase(queue.end() - n, queue.end());
111+
active_tasks.fetch_sub(n, std::memory_order_relaxed);
112+
throw;
113+
}
101114
}
102115

103116
if (mode == Mode::ThreadPool)
104117
{
105118
#ifdef OS_LINUX
106-
UInt32 prev_size = queue_size.fetch_add(fs.size(), std::memory_order_release);
119+
UInt32 prev_size = queue_size.fetch_add(n, std::memory_order_release);
107120
if (prev_size < max_threads)
108-
futexWake(&queue_size, fs.size());
121+
futexWake(&queue_size, n);
109122
#else
110-
if (fs.size() < 4)
111-
for (size_t i = 0; i < fs.size(); ++i)
123+
if (n < 4)
124+
for (size_t i = 0; i < n; ++i)
112125
queue_cv.notify_one();
113126
else
114127
queue_cv.notify_all();

src/Core/FormatFactorySettings.h

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -182,9 +182,9 @@ When reading Parquet files, parse JSON columns as ClickHouse JSON Column.
182182
DECLARE(Bool, input_format_parquet_use_native_reader, false, R"(
183183
Use native parquet reader v1. It's relatively fast but unfinished. Deprecated.
184184
)", 0) \
185-
DECLARE(Bool, input_format_parquet_use_native_reader_v3, false, R"(
186-
Use Parquet reader v3. Experimental.
187-
)", EXPERIMENTAL) \
185+
DECLARE(Bool, input_format_parquet_use_native_reader_v3, true, R"(
186+
Use Parquet reader v3.
187+
)", 0) \
188188
DECLARE(UInt64, input_format_parquet_memory_low_watermark, 2ul << 20, R"(
189189
Schedule prefetches more aggressively if memory usage is below than threshold. Potentially useful e.g. if there are many small bloom filters to read over network.
190190
)", 0) \
@@ -196,6 +196,9 @@ Skip pages using min/max values from column index.
196196
)", 0) \
197197
DECLARE(Bool, input_format_parquet_use_offset_index, true, R"(
198198
Minor tweak to how pages are read from parquet file when no page filtering is used.
199+
)", 0) \
200+
DECLARE(Bool, input_format_parquet_verify_checksums, true, R"(
201+
Verify page checksums when reading parquet files.
199202
)", 0) \
200203
DECLARE(Bool, input_format_allow_seeks, true, R"(
201204
Allow seeks while reading in ORC/Parquet/Arrow input formats.
@@ -1127,6 +1130,9 @@ If dictionary size grows bigger than this many bytes, switch to encoding without
11271130
)", 0) \
11281131
DECLARE(Bool, output_format_parquet_enum_as_byte_array, true, R"(
11291132
Write enum using parquet physical type: BYTE_ARRAY and logical type: ENUM
1133+
)", 0) \
1134+
DECLARE(Bool, output_format_parquet_write_checksums, true, R"(
1135+
Put crc32 checksums in parquet page headers.
11301136
)", 0) \
11311137
DECLARE(String, output_format_avro_codec, "", R"(
11321138
Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.

src/Core/SettingsChangesHistory.cpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5656
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
5757
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
5858
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
59+
{"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
60+
{"input_format_parquet_verify_checksums", true, true, "New setting."},
61+
{"output_format_parquet_write_checksums", false, true, "New setting."},
5962
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
6063
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
6164
});
@@ -82,7 +85,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
8285
{"distributed_cache_connect_max_tries", 20, 5, "Changed setting value"},
8386
{"opentelemetry_trace_cpu_scheduling", false, false, "New setting to trace `cpu_slot_preemption` feature."},
8487
{"output_format_parquet_max_dictionary_size", 1024 * 1024, 1024 * 1024, "New setting"},
85-
{"input_format_parquet_use_native_reader_v3", false, true, "New setting"},
88+
{"input_format_parquet_use_native_reader_v3", false, false, "New setting"},
8689
{"input_format_parquet_memory_low_watermark", 2ul << 20, 2ul << 20, "New setting"},
8790
{"input_format_parquet_memory_high_watermark", 4ul << 30, 4ul << 30, "New setting"},
8891
{"input_format_parquet_page_filter_push_down", true, true, "New setting (no effect when input_format_parquet_use_native_reader_v3 is disabled)"},

src/Formats/FormatFactory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
210210
format_settings.parquet.output_date_as_uint16 = settings[Setting::output_format_parquet_date_as_uint16];
211211
format_settings.parquet.max_dictionary_size = settings[Setting::output_format_parquet_max_dictionary_size];
212212
format_settings.parquet.output_enum_as_byte_array = settings[Setting::output_format_parquet_enum_as_byte_array];
213+
format_settings.parquet.write_checksums = settings[Setting::output_format_parquet_write_checksums];
213214
format_settings.parquet.max_block_size = settings[Setting::input_format_parquet_max_block_size];
214215
format_settings.parquet.prefer_block_bytes = settings[Setting::input_format_parquet_prefer_block_bytes];
215216
format_settings.parquet.output_compression_method = settings[Setting::output_format_parquet_compression_method];
@@ -225,6 +226,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
225226
format_settings.parquet.bloom_filter_flush_threshold_bytes = settings[Setting::output_format_parquet_bloom_filter_flush_threshold_bytes];
226227
format_settings.parquet.local_read_min_bytes_for_seek = settings[Setting::input_format_parquet_local_file_min_bytes_for_seek];
227228
format_settings.parquet.enable_row_group_prefetch = settings[Setting::input_format_parquet_enable_row_group_prefetch];
229+
format_settings.parquet.verify_checksums = settings[Setting::input_format_parquet_verify_checksums];
228230
format_settings.parquet.allow_geoparquet_parser = settings[Setting::input_format_parquet_allow_geoparquet_parser];
229231
format_settings.parquet.write_geometadata = settings[Setting::output_format_parquet_geometadata];
230232
format_settings.pretty.charset = settings[Setting::output_format_pretty_grid_charset].toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;

src/Formats/FormatSettings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ struct FormatSettings
292292
bool enable_json_parsing = true;
293293
bool preserve_order = false;
294294
bool enable_row_group_prefetch = true;
295+
bool verify_checksums = true;
295296
std::unordered_set<int> skip_row_groups = {};
296297
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
297298
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
@@ -312,6 +313,7 @@ struct FormatSettings
312313
bool output_compliant_nested_types = true;
313314
bool write_page_index = false;
314315
bool write_bloom_filter = false;
316+
bool write_checksums = true;
315317
ParquetVersion output_version = ParquetVersion::V2_LATEST;
316318
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
317319
uint64_t output_compression_level;

src/Processors/Formats/Impl/Parquet/Decoding.cpp

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1003,7 +1003,7 @@ void Dictionary::index(const ColumnUInt32 & indexes_col, IColumn & out)
10031003
c.reserve(c.size() + indexes.size());
10041004
for (UInt32 idx : indexes)
10051005
{
1006-
size_t start = offsets[size_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
1006+
size_t start = offsets[ssize_t(idx) - 1] + 4; // offsets[-1] is ok because of padding
10071007
size_t len = offsets[idx] - start;
10081008
/// TODO [parquet]: Try optimizing short memcpy by taking advantage of padding (maybe memcpySmall.h helps). Also in PlainStringDecoder.
10091009
c.insertData(data.data() + start, len);
@@ -1219,7 +1219,7 @@ void TrivialStringConverter::convertColumn(std::span<const char> chars, const UI
12191219
{
12201220
col_str.getChars().reserve(col_str.getChars().size() + (offsets[num_values - 1] - offsets[-1]) - separator_bytes * num_values);
12211221
for (size_t i = 0; i < num_values; ++i)
1222-
col_str.insertData(chars.data() + offsets[i - 1], offsets[i] - offsets[i - 1] - separator_bytes);
1222+
col_str.insertData(chars.data() + offsets[ssize_t(i) - 1], offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes);
12231223
}
12241224
}
12251225

@@ -1345,8 +1345,8 @@ void BigEndianDecimalStringConverter<T>::convertColumn(std::span<const char> cha
13451345

13461346
for (size_t i = 0; i < num_values; ++i)
13471347
{
1348-
const char * data = chars.data() + offsets[i - 1];
1349-
size_t size = offsets[i] - offsets[i - 1] - separator_bytes;
1348+
const char * data = chars.data() + offsets[ssize_t(i) - 1];
1349+
size_t size = offsets[i] - offsets[ssize_t(i) - 1] - separator_bytes;
13501350
if (size > sizeof(T))
13511351
throw Exception(ErrorCodes::CANNOT_PARSE_NUMBER, "Unexpectedly wide Decimal value: {} > {} bytes", size, sizeof(T));
13521352

src/Processors/Formats/Impl/Parquet/Prefetcher.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void Prefetcher::determineReadModeAndFileSize(ReadBuffer * reader_, const ReadOp
7474
if (!reader_->eof() && reader_->available() >= expected_prefix.size() &&
7575
memcmp(reader_->position(), expected_prefix.data(), expected_prefix.size()) != 0)
7676
{
77-
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a parquet file (wrong magic bytes at the start)");
77+
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a Parquet file (wrong magic bytes at the start)");
7878
}
7979

8080
WriteBufferFromVector<PaddedPODArray<char>> out(entire_file);

src/Processors/Formats/Impl/Parquet/ReadManager.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -846,7 +846,7 @@ ReadManager::ReadResult ReadManager::read()
846846
bool thread_pool_was_idle = parser_shared_resources->parsing_runner.isIdle();
847847

848848
if (exception)
849-
std::rethrow_exception(exception);
849+
std::rethrow_exception(copyMutableException(exception));
850850

851851
/// If `preserve_order`, only deliver chunks from `first_incomplete_row_group`.
852852
/// This ensures that row groups are delivered in order. Within a row group, row

0 commit comments

Comments
 (0)