Skip to content

Commit 89d7ee8

Browse files
authored
Merge pull request #780 from Altinity/feature/retries_in_cluster_functions
Restart cluster tasks on connection lost
2 parents abb3c0b + f5d732e commit 89d7ee8

12 files changed

+240
-38
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6883,6 +6883,9 @@ Possible values:
68836883
- '' - do not force any kind of Exchange operators, let the optimizer choose,
68846884
- 'Persisted' - use temporary files in object storage,
68856885
- 'Streaming' - stream exchange data over network.
6886+
)", EXPERIMENTAL) \
6887+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
6888+
Allow retries in cluster request, when one node goes offline
68866889
)", EXPERIMENTAL) \
68876890
DECLARE(Bool, object_storage_remote_initiator, false, R"(
68886891
Execute request to object storage as remote on one of object_storage_cluster nodes.

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
196196
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
197197
{"parallel_hash_join_threshold", 0, 0, "New setting"},
198198
/// Release closed. Please use 25.4
199+
{"use_object_storage_list_objects_cache", true, false, "New setting."},
200+
{"allow_retries_in_cluster_requests", false, false, "New setting."},
199201
});
200202
addSettingsChanges(settings_changes_history, "24.12.2.20000",
201203
{

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55+
extern const SettingsBool allow_retries_in_cluster_requests;
5556
}
5657

5758
namespace ErrorCodes
@@ -82,6 +83,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8283
, extension(extension_)
8384
, priority_func(priority_func_)
8485
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86+
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8587
{
8688
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8789
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -484,7 +486,8 @@ int RemoteQueryExecutor::sendQueryAsync()
484486
read_context = std::make_unique<ReadContext>(
485487
*this,
486488
/*suspend_when_query_sent*/ true,
487-
read_packet_type_separately);
489+
read_packet_type_separately,
490+
allow_retries_in_cluster_requests);
488491

489492
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
490493
/// because we can still be in process of sending scalars or external tables.
@@ -557,7 +560,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
557560
read_context = std::make_unique<ReadContext>(
558561
*this,
559562
/*suspend_when_query_sent*/ false,
560-
read_packet_type_separately);
563+
read_packet_type_separately,
564+
allow_retries_in_cluster_requests);
561565
recreate_read_context = false;
562566
}
563567

@@ -681,7 +685,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
681685
/// We can actually return it, and the first call to RemoteQueryExecutor::read
682686
/// will return earlier. We should consider doing it.
683687
if (packet.block && (packet.block.rows() > 0))
688+
{
689+
if (extension && extension->replica_info)
690+
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
684691
return ReadResult(adaptBlockStructure(packet.block, header));
692+
}
685693
break; /// If the block is empty - we will receive other packets before EndOfStream.
686694

687695
case Protocol::Server::Exception:
@@ -743,6 +751,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
743751
case Protocol::Server::TimezoneUpdate:
744752
break;
745753

754+
case Protocol::Server::ConnectionLost:
755+
if (allow_retries_in_cluster_requests)
756+
{
757+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
758+
{
759+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
760+
{
761+
finished = true;
762+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
763+
return ReadResult(Block{});
764+
}
765+
}
766+
}
767+
packet.exception->rethrow();
768+
break;
769+
746770
default:
747771
got_unknown_packet_from_replica = true;
748772
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,22 @@ class RemoteQueryExecutorReadContext;
2828

2929
class ParallelReplicasReadingCoordinator;
3030

31-
/// This is the same type as StorageS3Source::IteratorWrapper
32-
using TaskIterator = std::function<String(size_t)>;
31+
namespace ErrorCodes
32+
{
33+
extern const int NOT_IMPLEMENTED;
34+
};
35+
36+
class TaskIterator
37+
{
38+
public:
39+
virtual ~TaskIterator() = default;
40+
virtual bool supportRerunTask() const { return false; }
41+
virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */)
42+
{
43+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented");
44+
}
45+
virtual std::string operator()(size_t number_of_current_replica) const = 0;
46+
};
3347

3448
/// This class allows one to launch queries on remote replicas of one shard and get results
3549
class RemoteQueryExecutor
@@ -331,6 +345,10 @@ class RemoteQueryExecutor
331345

332346
const bool read_packet_type_separately = false;
333347

348+
const bool allow_retries_in_cluster_requests = false;
349+
350+
std::unordered_set<size_t> replica_has_processed_data;
351+
334352
/// Send all scalars to remote servers
335353
void sendScalars();
336354

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ namespace ErrorCodes
1919
}
2020

2121
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
22-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
22+
RemoteQueryExecutor & executor_,
23+
bool suspend_when_query_sent_,
24+
bool read_packet_type_separately_,
25+
bool allow_retries_in_cluster_requests_)
2326
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2427
, executor(executor_)
2528
, suspend_when_query_sent(suspend_when_query_sent_)
2629
, read_packet_type_separately(read_packet_type_separately_)
30+
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2731
{
2832
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
2933
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5458
if (read_context.executor.needToSkipUnavailableShard())
5559
return;
5660

57-
while (true)
61+
try
5862
{
59-
read_context.has_read_packet_part = PacketPart::None;
60-
61-
if (read_context.read_packet_type_separately)
63+
while (true)
6264
{
63-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
64-
read_context.has_read_packet_part = PacketPart::Type;
65+
read_context.has_read_packet_part = PacketPart::None;
66+
67+
if (read_context.read_packet_type_separately)
68+
{
69+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
70+
read_context.has_read_packet_part = PacketPart::Type;
71+
suspend_callback();
72+
}
73+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
74+
read_context.has_read_packet_part = PacketPart::Body;
6575
suspend_callback();
6676
}
67-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
68-
read_context.has_read_packet_part = PacketPart::Body;
77+
}
78+
catch (const Exception &)
79+
{
80+
if (!read_context.allow_retries_in_cluster_requests)
81+
throw;
82+
read_context.packet.type = Protocol::Server::ConnectionLost;
83+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
6984
suspend_callback();
7085
}
7186
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2626
{
2727
public:
2828
explicit RemoteQueryExecutorReadContext(
29-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
29+
RemoteQueryExecutor & executor_,
30+
bool suspend_when_query_sent_,
31+
bool read_packet_type_separately_,
32+
bool allow_retries_in_cluster_requests_);
3033

3134
~RemoteQueryExecutorReadContext() override;
3235

@@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
108111
bool suspend_when_query_sent = false;
109112
bool is_query_sent = false;
110113
const bool read_packet_type_separately = false;
114+
const bool allow_retries_in_cluster_requests = false;
111115
};
112116

113117
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -439,6 +439,30 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
439439
args.insert(args.end(), object_storage_type_arg);
440440
}
441441

442+
class TaskDistributor : public TaskIterator
443+
{
444+
public:
445+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
446+
const std::vector<std::string> & ids_of_hosts,
447+
uint64_t lock_object_storage_task_distribution_ms
448+
)
449+
: task_distributor(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms) {}
450+
~TaskDistributor() override = default;
451+
bool supportRerunTask() const override { return true; }
452+
void rescheduleTasksFromReplica(size_t number_of_current_replica) override
453+
{
454+
task_distributor.rescheduleTasksFromReplica(number_of_current_replica);
455+
}
456+
457+
std::string operator()(size_t number_of_current_replica) const override
458+
{
459+
return task_distributor.getNextTask(number_of_current_replica).value_or("");
460+
}
461+
462+
private:
463+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
464+
};
465+
442466
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
443467
const ActionsDAG::Node * predicate,
444468
const std::optional<ActionsDAG> & filter_actions_dag,
@@ -474,14 +498,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
474498
lock_object_storage_task_distribution_ms_max
475499
);
476500

477-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
478-
iterator,
479-
ids_of_hosts,
480-
lock_object_storage_task_distribution_ms);
481-
482-
auto callback = std::make_shared<TaskIterator>(
483-
[task_distributor](size_t number_of_current_replica) mutable -> String
484-
{ return task_distributor->getNextTask(number_of_current_replica).value_or(""); });
501+
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts, lock_object_storage_task_distribution_ms);
485502

486503
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
487504
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ namespace DB
99
namespace ErrorCodes
1010
{
1111
extern const int LOGICAL_ERROR;
12-
}
12+
extern const int CANNOT_READ_ALL_DATA;
13+
};
1314

1415
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1516
std::shared_ptr<IObjectIterator> iterator_,
@@ -21,6 +22,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
2122
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
2223
, iterator_exhausted(false)
2324
{
25+
size_t nodes = ids_of_nodes.size();
26+
for (size_t i = 0; i < nodes; ++i)
27+
replica_to_files_to_be_processed[i] = std::list<String>{};
2428
}
2529

2630
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
@@ -36,16 +40,27 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
3640

3741
saveLastNodeActivity(number_of_current_replica);
3842

39-
// 1. Check pre-queued files first
40-
if (auto file = getPreQueuedFile(number_of_current_replica))
41-
return file;
43+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
44+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
45+
throw Exception(
46+
ErrorCodes::LOGICAL_ERROR,
47+
"Replica number {} was marked as lost, can't set task for it anymore",
48+
number_of_current_replica
49+
);
4250

51+
// 1. Check pre-queued files first
52+
std::optional<String> file = getPreQueuedFile(number_of_current_replica);
4353
// 2. Try to find a matching file from the iterator
44-
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
45-
return file;
46-
54+
if (!file.has_value())
55+
file = getMatchingFileFromIterator(number_of_current_replica);
4756
// 3. Process unprocessed files if iterator is exhausted
48-
return getAnyUnprocessedFile(number_of_current_replica);
57+
if (!file.has_value())
58+
file = getAnyUnprocessedFile(number_of_current_replica);
59+
60+
if (file.has_value())
61+
processed_file_list_ptr->second.push_back(*file);
62+
63+
return file;
4964
}
5065

5166
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
@@ -57,16 +72,27 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String
5772
return 0;
5873

5974
/// Rendezvous hashing
60-
size_t best_id = 0;
61-
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
62-
for (size_t id = 1; id < nodes_count; ++id)
75+
auto replica = replica_to_files_to_be_processed.begin();
76+
if (replica == replica_to_files_to_be_processed.end())
77+
throw Exception(
78+
ErrorCodes::LOGICAL_ERROR,
79+
"No active replicas, can't find best replica for file {}",
80+
file_path
81+
);
82+
83+
size_t best_id = replica->first;
84+
UInt64 best_weight = sipHash64(ids_of_nodes[best_id] + file_path);
85+
++replica;
86+
while (replica != replica_to_files_to_be_processed.end())
6387
{
88+
size_t id = replica->first;
6489
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
6590
if (weight > best_weight)
6691
{
6792
best_weight = weight;
6893
best_id = id;
6994
}
95+
++replica;
7096
}
7197
return best_id;
7298
}
@@ -230,4 +256,28 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb
230256
last_node_activity[number_of_current_replica] = now;
231257
}
232258

259+
void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica)
260+
{
261+
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
262+
std::lock_guard lock(mutex);
263+
264+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
265+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
266+
throw Exception(
267+
ErrorCodes::LOGICAL_ERROR,
268+
"Replica number {} was marked as lost already",
269+
number_of_current_replica
270+
);
271+
272+
if (replica_to_files_to_be_processed.size() < 2)
273+
throw Exception(
274+
ErrorCodes::CANNOT_READ_ALL_DATA,
275+
"All replicas were marked as lost"
276+
);
277+
278+
replica_to_files_to_be_processed.erase(number_of_current_replica);
279+
for (const auto & file_path : processed_file_list_ptr->second)
280+
unprocessed_files[file_path] = getReplicaForFile(file_path);
281+
}
282+
233283
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
#include <unordered_set>
1212
#include <unordered_map>
13+
#include <list>
1314
#include <vector>
1415
#include <mutex>
1516
#include <memory>
@@ -27,6 +28,9 @@ class StorageObjectStorageStableTaskDistributor
2728

2829
std::optional<String> getNextTask(size_t number_of_current_replica);
2930

31+
/// Insert objects back to unprocessed files
32+
void rescheduleTasksFromReplica(size_t number_of_current_replica);
33+
3034
private:
3135
size_t getReplicaForFile(const String & file_path);
3236
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
@@ -44,6 +48,7 @@ class StorageObjectStorageStableTaskDistributor
4448
std::vector<std::string> ids_of_nodes;
4549
std::unordered_map<size_t, Poco::Timestamp> last_node_activity;
4650
Poco::Timestamp::TimeDiff lock_object_storage_task_distribution_us;
51+
std::unordered_map<size_t, std::list<String>> replica_to_files_to_be_processed;
4752

4853
std::mutex mutex;
4954
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)