Skip to content

Commit c87cef9

Browse files
committed
Merge pull request ClickHouse#79012 from ClickHouse/pqcs
Settings to write and verify parquet checksums
1 parent 49be26b commit c87cef9

File tree

11 files changed

+81
-3
lines changed

11 files changed

+81
-3
lines changed

src/Client/BuzzHouse/Generator/ServerSettings.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -559,6 +559,7 @@ std::unordered_map<String, CHSetting> serverSettings = {
559559
{"input_format_parquet_case_insensitive_column_matching", trueOrFalseSettingNoOracle},
560560
{"input_format_parquet_enable_json_parsing", trueOrFalseSettingNoOracle},
561561
{"input_format_parquet_enable_row_group_prefetch", trueOrFalseSettingNoOracle},
562+
{"input_format_parquet_verify_checksums", trueOrFalseSettingNoOracle},
562563
{"input_format_parquet_filter_push_down", trueOrFalseSetting},
563564
{"input_format_parquet_preserve_order", trueOrFalseSettingNoOracle},
564565
{"input_format_parquet_skip_columns_with_unsupported_types_in_schema_inference", trueOrFalseSettingNoOracle},

src/Core/FormatFactorySettings.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,9 @@ Skip pages using min/max values from column index.
196196
)", 0) \
197197
DECLARE(Bool, input_format_parquet_use_offset_index, true, R"(
198198
Minor tweak to how pages are read from parquet file when no page filtering is used.
199+
)", 0) \
200+
DECLARE(Bool, input_format_parquet_verify_checksums, true, R"(
201+
Verify page checksums when reading parquet files.
199202
)", 0) \
200203
DECLARE(Bool, input_format_allow_seeks, true, R"(
201204
Allow seeks while reading in ORC/Parquet/Arrow input formats.
@@ -1127,6 +1130,9 @@ If dictionary size grows bigger than this many bytes, switch to encoding without
11271130
)", 0) \
11281131
DECLARE(Bool, output_format_parquet_enum_as_byte_array, true, R"(
11291132
Write enum using parquet physical type: BYTE_ARRAY and logical type: ENUM
1133+
)", 0) \
1134+
DECLARE(Bool, output_format_parquet_write_checksums, true, R"(
1135+
Put crc32 checksums in parquet page headers.
11301136
)", 0) \
11311137
DECLARE(String, output_format_avro_codec, "", R"(
11321138
Compression codec used for output. Possible values: 'null', 'deflate', 'snappy', 'zstd'.

src/Core/SettingsChangesHistory.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5555
{"export_merge_tree_part_file_already_exists_policy", "skip", "skip", "New setting."},
5656
{"iceberg_timezone_for_timestamptz", "UTC", "UTC", "New setting."},
5757
{"hybrid_table_auto_cast_columns", true, true, "New setting to automatically cast Hybrid table columns when segments disagree on types. Default enabled."},
58-
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
58+
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
59+
{"input_format_parquet_verify_checksums", true, true, "New setting."},
60+
{"output_format_parquet_write_checksums", false, true, "New setting."},
5961
});
6062
addSettingsChanges(settings_changes_history, "25.8",
6163
{

src/Formats/FormatFactory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -210,6 +210,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
210210
format_settings.parquet.output_date_as_uint16 = settings[Setting::output_format_parquet_date_as_uint16];
211211
format_settings.parquet.max_dictionary_size = settings[Setting::output_format_parquet_max_dictionary_size];
212212
format_settings.parquet.output_enum_as_byte_array = settings[Setting::output_format_parquet_enum_as_byte_array];
213+
format_settings.parquet.write_checksums = settings[Setting::output_format_parquet_write_checksums];
213214
format_settings.parquet.max_block_size = settings[Setting::input_format_parquet_max_block_size];
214215
format_settings.parquet.prefer_block_bytes = settings[Setting::input_format_parquet_prefer_block_bytes];
215216
format_settings.parquet.output_compression_method = settings[Setting::output_format_parquet_compression_method];
@@ -225,6 +226,7 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
225226
format_settings.parquet.bloom_filter_flush_threshold_bytes = settings[Setting::output_format_parquet_bloom_filter_flush_threshold_bytes];
226227
format_settings.parquet.local_read_min_bytes_for_seek = settings[Setting::input_format_parquet_local_file_min_bytes_for_seek];
227228
format_settings.parquet.enable_row_group_prefetch = settings[Setting::input_format_parquet_enable_row_group_prefetch];
229+
format_settings.parquet.verify_checksums = settings[Setting::input_format_parquet_verify_checksums];
228230
format_settings.parquet.allow_geoparquet_parser = settings[Setting::input_format_parquet_allow_geoparquet_parser];
229231
format_settings.parquet.write_geometadata = settings[Setting::output_format_parquet_geometadata];
230232
format_settings.pretty.charset = settings[Setting::output_format_pretty_grid_charset].toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;

src/Formats/FormatSettings.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,7 @@ struct FormatSettings
292292
bool enable_json_parsing = true;
293293
bool preserve_order = false;
294294
bool enable_row_group_prefetch = true;
295+
bool verify_checksums = true;
295296
std::unordered_set<int> skip_row_groups = {};
296297
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
297298
size_t prefer_block_bytes = DEFAULT_BLOCK_SIZE * 256;
@@ -312,6 +313,7 @@ struct FormatSettings
312313
bool output_compliant_nested_types = true;
313314
bool write_page_index = false;
314315
bool write_bloom_filter = false;
316+
bool write_checksums = true;
315317
ParquetVersion output_version = ParquetVersion::V2_LATEST;
316318
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
317319
uint64_t output_compression_level;

src/Processors/Formats/Impl/Parquet/Write.cpp

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <parquet/encoding.h>
55
#include <parquet/schema.h>
66
#include <arrow/util/rle_encoding.h>
7+
#include <arrow/util/crc32.h>
78
#include <lz4.h>
89
#include <Poco/JSON/JSON.h>
910
#include <Poco/JSON/Object.h>
@@ -815,8 +816,12 @@ void writeColumnImpl(
815816
d.__set_encoding(use_dictionary ? parq::Encoding::RLE_DICTIONARY : encoding);
816817
d.__set_definition_level_encoding(parq::Encoding::RLE);
817818
d.__set_repetition_level_encoding(parq::Encoding::RLE);
818-
/// We could also put checksum in `header.crc`, but apparently no one uses it:
819-
/// https://issues.apache.org/jira/browse/PARQUET-594
819+
820+
if (options.write_checksums)
821+
{
822+
uint32_t crc = arrow::internal::crc32(0, compressed.data(), compressed.size());
823+
header.__set_crc(crc);
824+
}
820825

821826
parq::Statistics page_stats = page_statistics.get(options);
822827
bool has_null_count = s.max_def == 1 && s.max_rep == 0;
@@ -878,6 +883,12 @@ void writeColumnImpl(
878883
header.dictionary_page_header.__set_num_values(dict_encoder->num_entries());
879884
header.dictionary_page_header.__set_encoding(parq::Encoding::PLAIN);
880885

886+
if (options.write_checksums)
887+
{
888+
uint32_t crc = arrow::internal::crc32(0, compressed.data(), compressed.size());
889+
header.__set_crc(crc);
890+
}
891+
881892
writePage(header, compressed, s, /*add_to_offset_index*/ false, /*first_row_index*/ 0, out);
882893

883894
for (auto & p : dict_encoded_pages)

src/Processors/Formats/Impl/Parquet/Write.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ struct WriteOptions
4646
bool write_page_statistics = true;
4747
bool write_page_index = true;
4848
bool write_bloom_filter = true;
49+
bool write_checksums = true;
4950

5051
size_t max_statistics_size = 4096;
5152

src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -906,6 +906,7 @@ void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_bat
906906
parquet::ReaderProperties reader_properties(ArrowMemoryPool::instance());
907907
arrow_properties.set_use_threads(false);
908908
arrow_properties.set_batch_size(row_group_batch.adaptive_chunk_size);
909+
reader_properties.set_page_checksum_verification(format_settings.parquet.verify_checksums);
909910

910911
// When reading a row group, arrow will:
911912
// 1. Look at `metadata` to get all byte ranges it'll need to read from the file (typically one

src/Processors/Formats/Impl/ParquetBlockOutputFormat.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ ParquetBlockOutputFormat::ParquetBlockOutputFormat(WriteBuffer & out_, SharedHea
111111
options.write_batch_size = format_settings.parquet.write_batch_size;
112112
options.write_page_index = format_settings.parquet.write_page_index;
113113
options.write_bloom_filter = format_settings.parquet.write_bloom_filter;
114+
options.write_checksums = format_settings.parquet.write_checksums;
114115
options.bloom_filter_bits_per_value = format_settings.parquet.bloom_filter_bits_per_value;
115116
options.bloom_filter_flush_threshold_bytes = format_settings.parquet.bloom_filter_flush_threshold_bytes;
116117
options.write_geometadata = format_settings.parquet.write_geometadata;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
1234567890123456
2+
CRC checksum verification failed
3+
no checksum error, as expected
4+
1234567890123456
5+
no checksum error, as expected

0 commit comments

Comments
 (0)