Skip to content

Commit 948b085

Browse files
scanhex12zvonand
authored andcommitted
Merge pull request ClickHouse#87508 from scanhex12/distributed_execution_better_spread
Distributed execution: better split tasks
1 parent 02ac2fb commit 948b085

29 files changed

+521
-40
lines changed

src/Core/ProtocolDefines.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
3535

3636
static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
3737
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
38-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
39-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;
38+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
39+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
40+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5;
41+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO;
4042

4143
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
4244
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;

src/Core/Settings.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6789,6 +6789,19 @@ Both database and table names have to be unquoted - only simple identifiers are
67896789
)", 0) \
67906790
DECLARE(Bool, allow_general_join_planning, true, R"(
67916791
Allows a more general join planning algorithm that can handle more complex conditions, but only works with hash join. If hash join is not enabled, then the usual join planning algorithm is used regardless of the value of this setting.
6792+
)", 0) \
6793+
DECLARE(ObjectStorageGranularityLevel, cluster_table_function_split_granularity, ObjectStorageGranularityLevel::FILE, R"(
6794+
Controls how data is split into tasks when executing a CLUSTER TABLE FUNCTION.
6795+
6796+
This setting defines the granularity of work distribution across the cluster:
6797+
- `file` — each task processes an entire file.
6798+
- `bucket` — tasks are created per internal data block within a file (for example, Parquet row groups).
6799+
6800+
Choosing finer granularity (like `bucket`) can improve parallelism when working with a small number of large files.
6801+
For instance, if a Parquet file contains multiple row groups, enabling `bucket` granularity allows each group to be processed independently by different workers.
6802+
)", 0) \
6803+
DECLARE(UInt64, cluster_table_function_buckets_batch_size, 0, R"(
6804+
Defines the approximate size of a batch (in bytes) used in distributed processing of tasks in cluster table functions with `bucket` split granularity. The system accumulates data until at least this amount is reached. The actual size may be slightly larger to align with data boundaries.
67926805
)", 0) \
67936806
DECLARE(UInt64, merge_table_max_tables_to_look_for_schema_inference, 1000, R"(
67946807
When creating a `Merge` table without an explicit schema or when using the `merge` table function, infer schema as a union of not more than the specified number of matching tables.

src/Core/Settings.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,9 @@ class WriteBuffer;
109109
M(CLASS_NAME, UInt64Auto) \
110110
M(CLASS_NAME, URI) \
111111
M(CLASS_NAME, VectorSearchFilterStrategy) \
112-
M(CLASS_NAME, GeoToH3ArgumentOrder)
112+
M(CLASS_NAME, GeoToH3ArgumentOrder) \
113+
M(CLASS_NAME, ObjectStorageGranularityLevel) \
114+
M(CLASS_NAME, DecorrelationJoinKind)
113115

114116

115117
COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)

src/Core/SettingsChangesHistory.cpp

Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,94 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
5858
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
5959
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
6060
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
61+
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."}
62+
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
63+
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
64+
{"arrow_flight_request_descriptor_type", "path", "path", "New setting. Type of descriptor to use for Arrow Flight requests: 'path' or 'command'. Dremio requires 'command'."},
65+
{"send_profile_events", true, true, "New setting. Whether to send profile events to the clients."},
66+
{"into_outfile_create_parent_directories", false, false, "New setting"},
67+
{"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."},
68+
{"use_statistics_cache", 0, 0, "New setting"},
69+
{"input_format_parquet_use_native_reader_v3", false, true, "Seems stable"},
70+
{"max_projection_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"},
71+
{"min_table_rows_to_use_projection_index", 1'000'000, 1'000'000, "New setting"},
72+
{"use_text_index_dictionary_cache", false, false, "New setting"},
73+
{"use_text_index_header_cache", false, false, "New setting"},
74+
{"use_text_index_postings_cache", false, false, "New setting"},
75+
{"s3_retry_attempts", 500, 500, "Changed the value of the obsolete setting"},
76+
{"http_write_exception_in_output_format", true, false, "Changed for consistency across formats"},
77+
{"optimize_const_name_size", -1, 256, "Replace with scalar and use hash as a name for large constants (size is estimated by name length)"},
78+
{"enable_lazy_columns_replication", false, true, "Enable lazy columns replication in JOIN and ARRAY JOIN by default"},
79+
{"allow_special_serialization_kinds_in_output_formats", false, true, "Enable direct output of special columns representations like Sparse/Replicated in some output formats"},
80+
{"allow_experimental_alias_table_engine", false, false, "New setting"},
81+
{"input_format_parquet_local_time_as_utc", false, true, "Use more appropriate type DateTime64(..., 'UTC') for parquet 'local time without timezone' type."},
82+
{"input_format_parquet_verify_checksums", true, true, "New setting."},
83+
{"output_format_parquet_write_checksums", false, true, "New setting."},
84+
{"database_shared_drop_table_delay_seconds", 8 * 60 * 60, 8 * 60 * 60, "New setting."},
85+
{"filesystem_cache_allow_background_download", true, true, "New setting to control background downloads in filesystem cache per query."},
86+
});
87+
addSettingsChanges(settings_changes_history, "25.10",
88+
{
89+
{"allow_special_serialization_kinds_in_output_formats", false, false, "Add a setting to allow output of special columns representations like Sparse/Replicated without converting them to full columns"},
90+
{"enable_lazy_columns_replication", false, false, "Add a setting to enable lazy columns replication in JOIN and ARRAY JOIN"},
91+
{"correlated_subqueries_default_join_kind", "left", "right", "New setting. Default join kind for decorrelated query plan."},
92+
{"show_data_lake_catalogs_in_system_tables", true, false, "Disable catalogs in system tables by default"},
93+
{"optimize_rewrite_like_perfect_affix", false, true, "New setting"},
94+
{"allow_dynamic_type_in_join_keys", true, false, "Disallow using Dynamic type in JOIN keys by default"},
95+
{"s3queue_keeper_fault_injection_probability", 0, 0, "New setting."},
96+
{"enable_join_runtime_filters", false, false, "New setting"},
97+
{"join_runtime_filter_exact_values_limit", 10000, 10000, "New setting"},
98+
{"join_runtime_bloom_filter_bytes", 512_KiB, 512_KiB, "New setting"},
99+
{"join_runtime_bloom_filter_hash_functions", 3, 3, "New setting"},
100+
{"use_join_disjunctions_push_down", false, false, "New setting."},
101+
{"joined_block_split_single_row", false, false, "New setting"},
102+
{"temporary_files_buffer_size", DBMS_DEFAULT_BUFFER_SIZE, DBMS_DEFAULT_BUFFER_SIZE, "New setting"},
103+
{"rewrite_in_to_join", false, false, "New experimental setting"},
104+
{"delta_lake_log_metadata", false, false, "New setting."},
105+
{"distributed_cache_prefer_bigger_buffer_size", false, false, "New setting."},
106+
{"allow_experimental_qbit_type", false, false, "New experimental setting"},
107+
{"optimize_qbit_distance_function_reads", true, true, "New setting"},
108+
{"read_from_distributed_cache_if_exists_otherwise_bypass_cache", false, false, "New setting"},
109+
{"s3_slow_all_threads_after_retryable_error", false, false, "Disable the setting by default"},
110+
{"backup_slow_all_threads_after_retryable_s3_error", false, false, "Disable the setting by default"},
111+
{"enable_http_compression", false, true, "It should be beneficial in general"},
112+
{"inject_random_order_for_select_without_order_by", false, false, "New setting"},
113+
{"exclude_materialize_skip_indexes_on_insert", "", "", "New setting."},
114+
{"optimize_empty_string_comparisons", false, true, "A new setting."},
115+
{"query_plan_use_logical_join_step", true, true, "Added alias"},
116+
{"schema_inference_make_columns_nullable", 1, 3, "Take nullability information from Parquet/ORC/Arrow metadata by default, instead of making everything nullable."},
117+
{"materialized_views_squash_parallel_inserts", false, true, "Added setting to preserve old behavior if needed."},
118+
{"distributed_cache_connect_timeout_ms", 50, 50, "New setting"},
119+
{"distributed_cache_receive_timeout_ms", 3000, 3000, "New setting"},
120+
{"distributed_cache_send_timeout_ms", 3000, 3000, "New setting"},
121+
{"distributed_cache_tcp_keep_alive_timeout_ms", 2900, 2900, "New setting"},
122+
});
123+
addSettingsChanges(settings_changes_history, "25.9",
124+
{
125+
{"input_format_protobuf_oneof_presence", false, false, "New setting"},
126+
{"iceberg_delete_data_on_drop", false, false, "New setting"},
127+
{"use_skip_indexes_on_data_read", false, false, "New setting"},
128+
{"s3_slow_all_threads_after_retryable_error", false, false, "Added an alias for setting `backup_slow_all_threads_after_retryable_s3_error`"},
129+
{"iceberg_metadata_log_level", "none", "none", "New setting."},
130+
{"iceberg_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."},
131+
{"iceberg_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."},
132+
{"query_plan_optimize_join_order_limit", 1, 1, "New setting"},
133+
{"query_plan_display_internal_aliases", false, false, "New setting"},
134+
{"query_plan_max_step_description_length", 1000000000, 500, "New setting"},
135+
{"allow_experimental_delta_lake_writes", false, false, "New setting."},
136+
{"query_plan_convert_any_join_to_semi_or_anti_join", true, true, "New setting."},
137+
{"text_index_use_bloom_filter", true, true, "New setting."},
138+
{"query_plan_direct_read_from_text_index", true, true, "New setting."},
139+
{"enable_producing_buckets_out_of_order_in_aggregation", false, true, "New setting"},
140+
{"jemalloc_enable_profiler", false, false, "New setting"},
141+
{"jemalloc_collect_profile_samples_in_trace_log", false, false, "New setting"},
142+
{"delta_lake_insert_max_bytes_in_data_file", 1_GiB, 1_GiB, "New setting."},
143+
{"delta_lake_insert_max_rows_in_data_file", 1000000, 1000000, "New setting."},
144+
{"promql_evaluation_time", Field{"auto"}, Field{"auto"}, "The setting was renamed. The previous name is `evaluation_time`."},
145+
{"evaluation_time", 0, 0, "Old setting which popped up here being renamed."},
146+
{"os_threads_nice_value_query", 0, 0, "New setting."},
147+
{"os_threads_nice_value_materialized_view", 0, 0, "New setting."},
148+
{"os_thread_priority", 0, 0, "Alias for os_threads_nice_value_query."},
61149
});
62150
addSettingsChanges(settings_changes_history, "25.8",
63151
{

src/Core/SettingsEnums.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,10 @@ IMPLEMENT_SETTING_ENUM(
372372
{"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}})
373373

374374
IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS);
375+
376+
IMPLEMENT_SETTING_ENUM(
377+
ObjectStorageGranularityLevel,
378+
ErrorCodes::BAD_ARGUMENTS,
379+
{{"file", ObjectStorageGranularityLevel::FILE},
380+
{"bucket", ObjectStorageGranularityLevel::BUCKET}})
375381
}

src/Core/SettingsEnums.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,14 @@ enum class IcebergMetadataLogLevel : uint8_t
481481

482482
DECLARE_SETTING_ENUM(IcebergMetadataLogLevel)
483483

484+
enum class ObjectStorageGranularityLevel : uint8_t
485+
{
486+
FILE = 0,
487+
BUCKET = 1,
488+
};
489+
490+
DECLARE_SETTING_ENUM(ObjectStorageGranularityLevel)
491+
484492
enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t
485493
{
486494
skip,

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <Disks/WriteMode.h>
3030

3131
#include <Processors/ISimpleTransform.h>
32+
#include <Processors/Formats/IInputFormat.h>
3233
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
3334

3435
#include <Interpreters/Context_fwd.h>

src/Formats/FormatFactory.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,6 +374,20 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
374374
return format_settings;
375375
}
376376

377+
FileBucketInfoPtr FormatFactory::getFileBucketInfo(const String & format)
378+
{
379+
auto creator = getCreators(format);
380+
return creator.file_bucket_info_creator();
381+
}
382+
383+
void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info)
384+
{
385+
chassert(bucket_info);
386+
auto & creators = getOrCreateCreators(format);
387+
if (creators.file_bucket_info_creator)
388+
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
389+
creators.file_bucket_info_creator = std::move(bucket_info);
390+
}
377391

378392
InputFormatPtr FormatFactory::getInput(
379393
const String & name,
@@ -694,6 +708,21 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_
694708
KnownFormatNames::instance().add(name, /* case_insensitive = */ true);
695709
}
696710

711+
void FormatFactory::registerSplitter(const String & format, BucketSplitterCreator splitter)
712+
{
713+
chassert(splitter);
714+
auto & creators = getOrCreateCreators(format);
715+
if (creators.bucket_splitter_creator)
716+
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
717+
creators.bucket_splitter_creator = std::move(splitter);
718+
}
719+
720+
BucketSplitter FormatFactory::getSplitter(const String & format)
721+
{
722+
auto creator = getCreators(format);
723+
return creator.bucket_splitter_creator();
724+
}
725+
697726
void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator)
698727
{
699728
chassert(input_creator);

src/Formats/FormatFactory.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <base/types.h>
1111
#include <Common/Allocator.h>
1212

13+
#include <Processors/Formats/IInputFormat.h>
14+
1315
#include <boost/noncopyable.hpp>
1416

1517
#include <functional>
@@ -94,6 +96,10 @@ class FormatFactory final : private boost::noncopyable
9496
const RowInputFormatParams & params,
9597
const FormatSettings & settings)>;
9698

99+
using FileBucketInfoCreator = std::function<FileBucketInfoPtr()>;
100+
101+
using BucketSplitterCreator = std::function<BucketSplitter()>;
102+
97103
// Incompatible with FileSegmentationEngine.
98104
using RandomAccessInputCreator = std::function<InputFormatPtr(
99105
ReadBuffer & buf,
@@ -142,6 +148,8 @@ class FormatFactory final : private boost::noncopyable
142148
{
143149
String name;
144150
InputCreator input_creator;
151+
FileBucketInfoCreator file_bucket_info_creator;
152+
BucketSplitterCreator bucket_splitter_creator;
145153
RandomAccessInputCreator random_access_input_creator;
146154
OutputCreator output_creator;
147155
FileSegmentationEngineCreator file_segmentation_engine_creator;
@@ -286,6 +294,11 @@ class FormatFactory final : private boost::noncopyable
286294
void checkFormatName(const String & name) const;
287295
bool exists(const String & name) const;
288296

297+
FileBucketInfoPtr getFileBucketInfo(const String & format);
298+
void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info);
299+
void registerSplitter(const String & format, BucketSplitterCreator splitter);
300+
BucketSplitter getSplitter(const String & format);
301+
289302
private:
290303
FormatsDictionary dict;
291304
FileExtensionFormats file_extension_formats;

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
#include <IO/ReadHelpers.h>
88
#include <Interpreters/ActionsDAG.h>
99
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
10+
#include <Common/Exception.h>
1011
#include <Common/logger_useful.h>
12+
#include <Formats/FormatFactory.h>
13+
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
1114

1215
namespace DB
1316
{
@@ -40,6 +43,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
4043
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
4144
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
4245
absolute_path = object->getAbsolutePath();
46+
file_bucket_info = object->file_bucket_info;
4347
}
4448
}
4549

@@ -58,7 +62,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
5862
object->file_meta_info = file_meta_info;
5963
if (absolute_path.has_value() && !absolute_path.value().empty())
6064
object->absolute_path = absolute_path;
61-
65+
66+
object->file_bucket_info = file_bucket_info;
67+
6268
return object;
6369
}
6470

@@ -76,6 +82,21 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
7682
ActionsDAG().serialize(out, registry);
7783
}
7884

85+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
86+
{
87+
if (file_bucket_info)
88+
{
89+
/// Write format name so we can create appropriate file bucket info during deserialization.
90+
writeStringBinary(file_bucket_info->getFormatName(), out);
91+
file_bucket_info->serialize(out);
92+
}
93+
else
94+
{
95+
/// Write empty string as format name if file_bucket_info is not set.
96+
writeStringBinary("", out);
97+
}
98+
}
99+
79100
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
80101
{
81102
/// This info is not used when optimization is disabled, so there is no need to send it.
@@ -111,6 +132,17 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
111132
}
112133
}
113134

135+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
136+
{
137+
String format;
138+
readStringBinary(format, in);
139+
if (!format.empty())
140+
{
141+
file_bucket_info = FormatFactory::instance().getFileBucketInfo(format);
142+
file_bucket_info->deserialize(in);
143+
}
144+
}
145+
114146
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
115147
{
116148
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));

0 commit comments

Comments
 (0)