Skip to content

Commit 3de0abb

Browse files
authored
Merge pull request #1237 from Altinity/backports/antalya-25.8.12/87508
Antalya 25.8.12 Backport of ClickHouse#87508: Distributed execution: better split tasks
2 parents db031f6 + 98c5076 commit 3de0abb

31 files changed

+590
-49
lines changed

src/Core/ProtocolDefines.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,10 @@ static constexpr auto DBMS_MIN_REVISION_WITH_AGGREGATE_FUNCTIONS_VERSIONING = 54
3535

3636
static constexpr auto DBMS_CLUSTER_INITIAL_PROCESSING_PROTOCOL_VERSION = 1;
3737
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_METADATA = 2;
38-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 3;
39-
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = 3;
38+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_ICEBERG_METADATA = 3;
39+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO = 4;
40+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA = 5;
41+
static constexpr auto DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION = DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA;
4042

4143
static constexpr auto DBMS_MIN_SUPPORTED_PARALLEL_REPLICAS_PROTOCOL_VERSION = 3;
4244
static constexpr auto DBMS_PARALLEL_REPLICAS_MIN_VERSION_WITH_MARK_SEGMENT_SIZE_FIELD = 4;

src/Core/Settings.cpp

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6789,6 +6789,19 @@ Both database and table names have to be unquoted - only simple identifiers are
67896789
)", 0) \
67906790
DECLARE(Bool, allow_general_join_planning, true, R"(
67916791
Allows a more general join planning algorithm that can handle more complex conditions, but only works with hash join. If hash join is not enabled, then the usual join planning algorithm is used regardless of the value of this setting.
6792+
)", 0) \
6793+
DECLARE(ObjectStorageGranularityLevel, cluster_table_function_split_granularity, ObjectStorageGranularityLevel::FILE, R"(
6794+
Controls how data is split into tasks when executing a CLUSTER TABLE FUNCTION.
6795+
6796+
This setting defines the granularity of work distribution across the cluster:
6797+
- `file` — each task processes an entire file.
6798+
- `bucket` — tasks are created per internal data block within a file (for example, Parquet row groups).
6799+
6800+
Choosing finer granularity (like `bucket`) can improve parallelism when working with a small number of large files.
6801+
For instance, if a Parquet file contains multiple row groups, enabling `bucket` granularity allows each group to be processed independently by different workers.
6802+
)", 0) \
6803+
DECLARE(UInt64, cluster_table_function_buckets_batch_size, 0, R"(
6804+
Defines the approximate size of a batch (in bytes) used in distributed processing of tasks in cluster table functions with `bucket` split granularity. The system accumulates data until at least this amount is reached. The actual size may be slightly larger to align with data boundaries.
67926805
)", 0) \
67936806
DECLARE(UInt64, merge_table_max_tables_to_look_for_schema_inference, 1000, R"(
67946807
When creating a `Merge` table without an explicit schema or when using the `merge` table function, infer schema as a union of not more than the specified number of matching tables.

src/Core/Settings.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,8 @@ class WriteBuffer;
109109
M(CLASS_NAME, UInt64Auto) \
110110
M(CLASS_NAME, URI) \
111111
M(CLASS_NAME, VectorSearchFilterStrategy) \
112-
M(CLASS_NAME, GeoToH3ArgumentOrder)
112+
M(CLASS_NAME, GeoToH3ArgumentOrder) \
113+
M(CLASS_NAME, ObjectStorageGranularityLevel)
113114

114115

115116
COMMON_SETTINGS_SUPPORTED_TYPES(Settings, DECLARE_SETTING_TRAIT)

src/Core/SettingsChangesHistory.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,9 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
6161
{"output_format_parquet_write_checksums", false, true, "New setting."},
6262
{"export_merge_tree_part_max_bytes_per_file", 0, 0, "New setting."},
6363
{"export_merge_tree_part_max_rows_per_file", 0, 0, "New setting."},
64+
{"allow_experimental_hybrid_table", false, false, "Added new setting to allow the Hybrid table engine."},
65+
{"cluster_table_function_split_granularity", "file", "file", "New setting."},
66+
{"cluster_table_function_buckets_batch_size", 0, 0, "New setting."},
6467
});
6568
addSettingsChanges(settings_changes_history, "25.8",
6669
{

src/Core/SettingsEnums.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -372,4 +372,10 @@ IMPLEMENT_SETTING_ENUM(
372372
{"manifest_file_entry", IcebergMetadataLogLevel::ManifestFileEntry}})
373373

374374
IMPLEMENT_SETTING_AUTO_ENUM(MergeTreePartExportFileAlreadyExistsPolicy, ErrorCodes::BAD_ARGUMENTS);
375+
376+
IMPLEMENT_SETTING_ENUM(
377+
ObjectStorageGranularityLevel,
378+
ErrorCodes::BAD_ARGUMENTS,
379+
{{"file", ObjectStorageGranularityLevel::FILE},
380+
{"bucket", ObjectStorageGranularityLevel::BUCKET}})
375381
}

src/Core/SettingsEnums.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,14 @@ enum class IcebergMetadataLogLevel : uint8_t
481481

482482
DECLARE_SETTING_ENUM(IcebergMetadataLogLevel)
483483

484+
enum class ObjectStorageGranularityLevel : uint8_t
485+
{
486+
FILE = 0,
487+
BUCKET = 1,
488+
};
489+
490+
DECLARE_SETTING_ENUM(ObjectStorageGranularityLevel)
491+
484492
enum class MergeTreePartExportFileAlreadyExistsPolicy : uint8_t
485493
{
486494
skip,

src/Disks/ObjectStorages/IObjectStorage.h

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include <Disks/WriteMode.h>
3030

3131
#include <Processors/ISimpleTransform.h>
32+
#include <Processors/Formats/IInputFormat.h>
3233
#include <Storages/ObjectStorage/DataLakes/DataLakeObjectMetadata.h>
3334

3435
#include <Interpreters/Context_fwd.h>
@@ -149,6 +150,8 @@ struct PathWithMetadata
149150
std::optional<String> absolute_path;
150151
ObjectStoragePtr object_storage_to_use = nullptr;
151152

153+
FileBucketInfoPtr file_bucket_info;
154+
152155
PathWithMetadata() = default;
153156

154157
explicit PathWithMetadata(
@@ -188,6 +191,14 @@ struct PathWithMetadata
188191
void loadMetadata(ObjectStoragePtr object_storage, bool ignore_non_existent_file = true);
189192

190193
ObjectStoragePtr getObjectStorage() const { return object_storage_to_use; }
194+
195+
String getIdentifier() const
196+
{
197+
String result = getAbsolutePath().value_or(getPath());
198+
if (file_bucket_info)
199+
result += file_bucket_info->getIdentifier();
200+
return result;
201+
}
191202
};
192203

193204
struct ObjectKeyWithMetadata

src/Formats/FormatFactory.cpp

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -376,6 +376,20 @@ FormatSettings getFormatSettings(const ContextPtr & context, const Settings & se
376376
return format_settings;
377377
}
378378

379+
FileBucketInfoPtr FormatFactory::getFileBucketInfo(const String & format)
380+
{
381+
auto creator = getCreators(format);
382+
return creator.file_bucket_info_creator();
383+
}
384+
385+
void FormatFactory::registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info)
386+
{
387+
chassert(bucket_info);
388+
auto & creators = getOrCreateCreators(format);
389+
if (creators.file_bucket_info_creator)
390+
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
391+
creators.file_bucket_info_creator = std::move(bucket_info);
392+
}
379393

380394
InputFormatPtr FormatFactory::getInput(
381395
const String & name,
@@ -696,6 +710,21 @@ void FormatFactory::registerInputFormat(const String & name, InputCreator input_
696710
KnownFormatNames::instance().add(name, /* case_insensitive = */ true);
697711
}
698712

713+
void FormatFactory::registerSplitter(const String & format, BucketSplitterCreator splitter)
714+
{
715+
chassert(splitter);
716+
auto & creators = getOrCreateCreators(format);
717+
if (creators.bucket_splitter_creator)
718+
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: Bucket splitter for format {} is already registered", format);
719+
creators.bucket_splitter_creator = std::move(splitter);
720+
}
721+
722+
BucketSplitter FormatFactory::getSplitter(const String & format)
723+
{
724+
auto creator = getCreators(format);
725+
return creator.bucket_splitter_creator();
726+
}
727+
699728
void FormatFactory::registerRandomAccessInputFormat(const String & name, RandomAccessInputCreator input_creator)
700729
{
701730
chassert(input_creator);

src/Formats/FormatFactory.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
#include <base/types.h>
1111
#include <Common/Allocator.h>
1212

13+
#include <Processors/Formats/IInputFormat.h>
14+
1315
#include <boost/noncopyable.hpp>
1416

1517
#include <functional>
@@ -94,6 +96,10 @@ class FormatFactory final : private boost::noncopyable
9496
const RowInputFormatParams & params,
9597
const FormatSettings & settings)>;
9698

99+
using FileBucketInfoCreator = std::function<FileBucketInfoPtr()>;
100+
101+
using BucketSplitterCreator = std::function<BucketSplitter()>;
102+
97103
// Incompatible with FileSegmentationEngine.
98104
using RandomAccessInputCreator = std::function<InputFormatPtr(
99105
ReadBuffer & buf,
@@ -142,6 +148,8 @@ class FormatFactory final : private boost::noncopyable
142148
{
143149
String name;
144150
InputCreator input_creator;
151+
FileBucketInfoCreator file_bucket_info_creator;
152+
BucketSplitterCreator bucket_splitter_creator;
145153
RandomAccessInputCreator random_access_input_creator;
146154
OutputCreator output_creator;
147155
FileSegmentationEngineCreator file_segmentation_engine_creator;
@@ -286,6 +294,11 @@ class FormatFactory final : private boost::noncopyable
286294
void checkFormatName(const String & name) const;
287295
bool exists(const String & name) const;
288296

297+
FileBucketInfoPtr getFileBucketInfo(const String & format);
298+
void registerFileBucketInfo(const String & format, FileBucketInfoCreator bucket_info);
299+
void registerSplitter(const String & format, BucketSplitterCreator splitter);
300+
BucketSplitter getSplitter(const String & format);
301+
289302
private:
290303
FormatsDictionary dict;
291304
FileExtensionFormats file_extension_formats;

src/Interpreters/ClusterFunctionReadTask.cpp

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@
77
#include <IO/ReadHelpers.h>
88
#include <Interpreters/ActionsDAG.h>
99
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
10+
#include <Common/Exception.h>
1011
#include <Common/logger_useful.h>
12+
#include <Formats/FormatFactory.h>
13+
#include <Processors/Formats/Impl/ParquetBlockInputFormat.h>
1114

1215
namespace DB
1316
{
@@ -40,6 +43,7 @@ ClusterFunctionReadTaskResponse::ClusterFunctionReadTaskResponse(ObjectInfoPtr o
4043
const bool send_over_whole_archive = !context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes];
4144
path = send_over_whole_archive ? object->getPathOrPathToArchiveIfArchive() : object->getPath();
4245
absolute_path = object->getAbsolutePath();
46+
file_bucket_info = object->file_bucket_info;
4347
}
4448
}
4549

@@ -58,7 +62,9 @@ ObjectInfoPtr ClusterFunctionReadTaskResponse::getObjectInfo() const
5862
object->file_meta_info = file_meta_info;
5963
if (absolute_path.has_value() && !absolute_path.value().empty())
6064
object->absolute_path = absolute_path;
61-
65+
66+
object->file_bucket_info = file_bucket_info;
67+
6268
return object;
6369
}
6470

@@ -76,6 +82,21 @@ void ClusterFunctionReadTaskResponse::serialize(WriteBuffer & out, size_t protoc
7682
ActionsDAG().serialize(out, registry);
7783
}
7884

85+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
86+
{
87+
if (file_bucket_info)
88+
{
89+
/// Write format name so we can create appropriate file bucket info during deserialization.
90+
writeStringBinary(file_bucket_info->getFormatName(), out);
91+
file_bucket_info->serialize(out);
92+
}
93+
else
94+
{
95+
/// Write empty string as format name if file_bucket_info is not set.
96+
writeStringBinary("", out);
97+
}
98+
}
99+
79100
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
80101
{
81102
/// This info is not used when optimization is disabled, so there is no need to send it.
@@ -111,6 +132,17 @@ void ClusterFunctionReadTaskResponse::deserialize(ReadBuffer & in)
111132
}
112133
}
113134

135+
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_FILE_BUCKETS_INFO)
136+
{
137+
String format;
138+
readStringBinary(format, in);
139+
if (!format.empty())
140+
{
141+
file_bucket_info = FormatFactory::instance().getFileBucketInfo(format);
142+
file_bucket_info->deserialize(in);
143+
}
144+
}
145+
114146
if (protocol_version >= DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION_WITH_DATA_LAKE_COLUMNS_METADATA)
115147
{
116148
auto info = std::make_shared<DataFileMetaInfo>(DataFileMetaInfo::deserialize(in));

0 commit comments

Comments
 (0)