diff --git a/src/Client/Connection.h b/src/Client/Connection.h index e149822f2d7a..4a7ffb967a86 100644 --- a/src/Client/Connection.h +++ b/src/Client/Connection.h @@ -176,6 +176,9 @@ class Connection : public IServerConnection format_settings = settings; } + String getHost() { return host; } + UInt16 getPort() { return port; } + private: String host; UInt16 port; diff --git a/src/Client/HedgedConnections.cpp b/src/Client/HedgedConnections.cpp index d8201f07b0ef..6386454f9ad5 100644 --- a/src/Client/HedgedConnections.cpp +++ b/src/Client/HedgedConnections.cpp @@ -457,6 +457,8 @@ Packet HedgedConnections::receivePacketFromReplica(const ReplicaLocation & repli { ReplicaState & replica = offset_states[replica_location.offset].replicas[replica_location.index]; Packet packet = std::move(last_received_packet); + last_packet_connection = replica.connection + switch (packet.type) { case Protocol::Server::Data: diff --git a/src/Client/HedgedConnections.h b/src/Client/HedgedConnections.h index 4d30d5134ebe..c9e320df5438 100644 --- a/src/Client/HedgedConnections.h +++ b/src/Client/HedgedConnections.h @@ -109,6 +109,8 @@ class HedgedConnections : public IConnections UInt64 receivePacketTypeUnlocked(AsyncCallback async_callback) override; + Connection * getLastPacketConnection() override { return last_packet_connection; } + void disconnect() override; void sendCancel() override; @@ -199,6 +201,9 @@ class HedgedConnections : public IConnections /// to resume it's packet receiver when new packet is needed. std::optional replica_with_last_received_packet; + /// Connection that received last packet. + Connection * last_packet_connection = nullptr; + Packet last_received_packet; Epoll epoll; diff --git a/src/Client/IConnections.h b/src/Client/IConnections.h index b0559c4b0014..9cce08ec4321 100644 --- a/src/Client/IConnections.h +++ b/src/Client/IConnections.h @@ -37,6 +37,8 @@ class IConnections : boost::noncopyable virtual UInt64 receivePacketTypeUnlocked(AsyncCallback async_callback) = 0; + virtual Connection * getLastPacketConnection() = 0; + /// Break all active connections. virtual void disconnect() = 0; diff --git a/src/Client/MultiplexedConnections.h b/src/Client/MultiplexedConnections.h index 7dab6e3b5fdb..db96eeeebc37 100644 --- a/src/Client/MultiplexedConnections.h +++ b/src/Client/MultiplexedConnections.h @@ -44,6 +44,8 @@ class MultiplexedConnections final : public IConnections Packet receivePacket() override; + Connection * getLastPacketConnection() override { return current_connection; } + void disconnect() override; void sendCancel() override; diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index 376505392ff6..2ebbda36cca8 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -495,6 +495,9 @@ Ignore absence of file if it does not exist when reading certain keys. Possible values: - 1 — `SELECT` returns empty result. - 0 — `SELECT` throws an exception. +)", 0) \ + DECLARE(Bool, object_storage_stable_cluster_task_distribution, false, R"( +Use stable task distribution for object storage cluster table functions in order to better utilize filesystem cache. )", 0) \ DECLARE(Bool, hdfs_ignore_file_doesnt_exist, false, R"( Ignore absence of file if it does not exist when reading certain keys. diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 80ee8d86040a..586b8430ef61 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -739,7 +739,8 @@ void RemoteQueryExecutor::processReadTaskRequest() throw Exception(ErrorCodes::LOGICAL_ERROR, "Distributed task iterator is not initialized"); ProfileEvents::increment(ProfileEvents::ReadTaskRequestsReceived); - auto response = (*extension->task_iterator)(); + + auto response = (*extension->task_iterator)(connections->getLastPacketConnection()); connections->sendReadTaskResponse(response); } diff --git a/src/QueryPipeline/RemoteQueryExecutor.h b/src/QueryPipeline/RemoteQueryExecutor.h index 2077990da946..7fdbdc01e27c 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.h +++ b/src/QueryPipeline/RemoteQueryExecutor.h @@ -28,7 +28,7 @@ class RemoteQueryExecutorReadContext; class ParallelReplicasReadingCoordinator; /// This is the same type as StorageS3Source::IteratorWrapper -using TaskIterator = std::function; +using TaskIterator = std::function; /// This class allows one to launch queries on remote replicas of one shard and get results class RemoteQueryExecutor diff --git a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp index 07eecc655998..66e3d8dd01db 100644 --- a/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp +++ b/src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp @@ -11,6 +11,7 @@ #include #include #include +#include namespace DB @@ -18,6 +19,7 @@ namespace DB namespace Setting { extern const SettingsBool use_hive_partitioning; + extern const SettingsBool object_storage_stable_cluster_task_distribution; } namespace ErrorCodes @@ -118,14 +120,30 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten configuration, configuration->getQuerySettings(local_context), object_storage, /* distributed_processing */false, local_context, predicate, virtual_columns, nullptr, local_context->getFileProgressCallback()); - auto callback = std::make_shared>([iterator]() mutable -> String + auto cluster = getCluster(local_context); + + if (local_context->getSettingsRef()[Setting::object_storage_stable_cluster_task_distribution]) { - auto object_info = iterator->next(0); - if (object_info) - return object_info->getPath(); - return ""; - }); - return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; + auto task_distributor = std::make_shared(iterator, cluster); + + auto callback = std::make_shared( + [task_distributor](Connection * connection) mutable -> String { + return task_distributor->getNextTask(connection); + }); + + return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; + } + else + { + auto callback = std::make_shared( + [iterator](Connection *) mutable -> String { + if (auto object_info = iterator->next(0)) + return object_info->getPath(); + return ""; + }); + + return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) }; + } } } diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp new file mode 100644 index 000000000000..df6f69f05cd7 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp @@ -0,0 +1,223 @@ +#include "StorageObjectStorageStableTaskDistributor.h" +#include + +namespace DB +{ + +StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + const ClusterPtr & cluster) + : iterator(std::move(iterator_)) + , iterator_exhausted(false) + , total_files_processed(0) +{ + initializeConnectionMapping(cluster); + + LOG_INFO( + log, + "Initialized StorageObjectStorageStableTaskDistributor to distribute files across {} unique replicas", + total_replicas + ); +} + +void StorageObjectStorageStableTaskDistributor::initializeConnectionMapping(const ClusterPtr & cluster) +{ + connection_key_to_replica.clear(); + replica_to_connection_key.clear(); + + const auto & addresses_with_failover = cluster->getShardsAddresses(); + + for (size_t shard_idx = 0; shard_idx < addresses_with_failover.size(); ++shard_idx) + { + const auto & addresses = addresses_with_failover[shard_idx]; + + for (const auto & address : addresses) + { + String connection_key = address.host_name + ":" + std::to_string(address.port); + + if (connection_key_to_replica.contains(connection_key)) + continue; + + Int32 replica_idx = static_cast(replica_to_connection_key.size()); + connection_key_to_replica[connection_key] = replica_idx; + replica_to_connection_key.push_back(connection_key); + + LOG_TRACE( + log, + "Discovered shard {} replica with connection key {} (replica_idx: {})", + shard_idx, + connection_key, + replica_idx + ); + } + } + + total_replicas = static_cast(replica_to_connection_key.size()); + + LOG_INFO( + log, + "Mapping connections to {} unique replicas", + total_replicas + ); +} + +String StorageObjectStorageStableTaskDistributor::getNextTask(Connection * connection) +{ + String connection_key = "default"; + Int32 replica_idx = -1; + + if (connection) + { + connection_key = connection->getHost() + ":" + std::to_string(connection->getPort()); + auto it = connection_key_to_replica.find(connection_key); + if (it != connection_key_to_replica.end()) + { + replica_idx = it->second; + } + } + + LOG_TRACE( + log, + "Received a new connection ({}, replica_idx: {}) looking for a file", + connection_key, + replica_idx + ); + + // 1. Check pre-queued files first + String file = getPreQueuedFile(connection_key); + if (!file.empty()) + return file; + + // 2. Try to find a matching file from the iterator + file = getMatchingFileFromIterator(connection_key, replica_idx); + if (!file.empty()) + return file; + + if (!unprocessed_files.empty()) { + // Prevent initiator from stealing jobs from other replicas + sleepForMilliseconds(50); + } + + // 3. Process unprocessed files if iterator is exhausted + return getAnyUnprocessedFile(connection_key); +} + +Int32 StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path, Int32 total_replicas) +{ + if (total_replicas <= 0) + return 0; + + UInt64 hash_value = sipHash64(file_path); + return static_cast(hash_value % total_replicas); +} + +String StorageObjectStorageStableTaskDistributor::getPreQueuedFile(const String & connection_key) +{ + std::lock_guard lock(mutex); + auto & files_for_connection = connection_to_files[connection_key]; + + // Find the first file that's still unprocessed + while (!files_for_connection.empty()) + { + String next_file = files_for_connection.back(); + files_for_connection.pop_back(); + + // Skip if this file was already processed + if (!unprocessed_files.contains(next_file)) + continue; + + unprocessed_files.erase(next_file); + total_files_processed++; + + LOG_TRACE( + log, + "Assigning pre-queued file {} to connection {} (processed: {})", + next_file, + connection_key, + total_files_processed + ); + + return next_file; + } + + return ""; +} + +String StorageObjectStorageStableTaskDistributor::getMatchingFileFromIterator(const String & connection_key, Int32 replica_idx) +{ + while (!iterator_exhausted) + { + ObjectInfoPtr object_info; + + { + std::lock_guard lock(mutex); + object_info = iterator->next(0); + + if (!object_info) + { + iterator_exhausted = true; + break; + } + } + + String file_path = object_info->getPath(); + Int32 file_replica_idx = getReplicaForFile(file_path, total_replicas); + + if (file_replica_idx == replica_idx) + { + std::lock_guard lock(mutex); + total_files_processed++; + + LOG_TRACE( + log, + "Found file {} to connection {} (processed: {})", + file_path, + connection_key, + total_files_processed + ); + + return file_path; + } + + // Queue file for its assigned replica + { + std::lock_guard lock(mutex); + + unprocessed_files.insert(file_path); + if (file_replica_idx < total_replicas) + { + String target_connection_key = replica_to_connection_key[file_replica_idx]; + connection_to_files[target_connection_key].push_back(file_path); + } + } + } + + return ""; +} + +String StorageObjectStorageStableTaskDistributor::getAnyUnprocessedFile(const String & connection_key) +{ + std::lock_guard lock(mutex); + + if (!unprocessed_files.empty()) + { + auto it = unprocessed_files.begin(); + String next_file = *it; + unprocessed_files.erase(it); + total_files_processed++; + + LOG_TRACE( + log, + "Iterator exhausted. Assigning unprocessed file {} to connection {} (processed: {})", + next_file, + connection_key, + total_files_processed + ); + + return next_file; + } + + return ""; +} + +} diff --git a/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h new file mode 100644 index 000000000000..7d77f87c6d68 --- /dev/null +++ b/src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ + +class StorageObjectStorageStableTaskDistributor +{ +public: + using ConnectionKeyToReplicaMap = std::unordered_map; + using ReplicaToConnectionKeyMap = std::vector; + + StorageObjectStorageStableTaskDistributor( + std::shared_ptr iterator_, + const ClusterPtr & cluster); + + String getNextTask(Connection * connection); + +private: + void initializeConnectionMapping(const ClusterPtr & cluster); + + static Int32 getReplicaForFile(const String & file_path, Int32 total_replicas); + String getPreQueuedFile(const String & connection_key); + String getMatchingFileFromIterator(const String & connection_key, Int32 replica_idx); + String getAnyUnprocessedFile(const String & connection_key); + + std::shared_ptr iterator; + ConnectionKeyToReplicaMap connection_key_to_replica; + ReplicaToConnectionKeyMap replica_to_connection_key; + Int32 total_replicas; + + std::mutex mutex; + std::unordered_map> connection_to_files; + std::unordered_set unprocessed_files; + bool iterator_exhausted = false; + size_t total_files_processed = 0; + + LoggerPtr log = getLogger("StorageObjectStorageStableTaskDistributor"); +}; + +} diff --git a/src/Storages/StorageFileCluster.cpp b/src/Storages/StorageFileCluster.cpp index c01738067c40..0f7e5d3af436 100644 --- a/src/Storages/StorageFileCluster.cpp +++ b/src/Storages/StorageFileCluster.cpp @@ -77,7 +77,7 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(const ActionsDAG::Node * predicate, const ContextPtr & context) const { auto iterator = std::make_shared(paths, std::nullopt, predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](Connection *) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; } diff --git a/src/Storages/StorageURLCluster.cpp b/src/Storages/StorageURLCluster.cpp index 7beb73d2047f..f744d01f1604 100644 --- a/src/Storages/StorageURLCluster.cpp +++ b/src/Storages/StorageURLCluster.cpp @@ -97,7 +97,7 @@ RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(const { auto iterator = std::make_shared( uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context); - auto callback = std::make_shared([iter = std::move(iterator)]() mutable -> String { return iter->next(); }); + auto callback = std::make_shared([iter = std::move(iterator)](Connection *) mutable -> String { return iter->next(); }); return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)}; }