Skip to content

Commit c3afad3

Browse files
committed
Restart cluster tasks on connection lost
1 parent 482b406 commit c3afad3

12 files changed

+207
-31
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6130,6 +6130,9 @@ Limit for hosts used for request in object storage cluster table functions - azu
61306130
Possible values:
61316131
- Positive integer.
61326132
- 0 — All hosts in cluster.
6133+
)", EXPERIMENTAL) \
6134+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
6135+
Allow retries in cluster request, when one node goes offline
61336136
)", EXPERIMENTAL) \
61346137
\
61356138
/** Experimental tsToGrid aggregate function. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
9494
{"iceberg_snapshot_id", 0, 0, "New setting."},
9595
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
9696
/// Release closed. Please use 25.4
97+
{"use_object_storage_list_objects_cache", true, false, "New setting."},
98+
{"allow_retries_in_cluster_requests", false, false, "New setting."},
9799
});
98100
addSettingsChanges(settings_changes_history, "24.12.2.20000",
99101
{

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ namespace Setting
4949
extern const SettingsOverflowMode timeout_overflow_mode;
5050
extern const SettingsBool use_hedged_requests;
5151
extern const SettingsBool push_external_roles_in_interserver_queries;
52+
extern const SettingsBool allow_retries_in_cluster_requests;
5253
}
5354

5455
namespace ErrorCodes
@@ -77,6 +78,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
7778
, extension(extension_)
7879
, priority_func(priority_func_)
7980
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
81+
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8082
{
8183
}
8284

@@ -458,7 +460,8 @@ int RemoteQueryExecutor::sendQueryAsync()
458460
read_context = std::make_unique<ReadContext>(
459461
*this,
460462
/*suspend_when_query_sent*/ true,
461-
read_packet_type_separately);
463+
read_packet_type_separately,
464+
allow_retries_in_cluster_requests);
462465

463466
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
464467
/// because we can still be in process of sending scalars or external tables.
@@ -531,7 +534,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
531534
read_context = std::make_unique<ReadContext>(
532535
*this,
533536
/*suspend_when_query_sent*/ false,
534-
read_packet_type_separately);
537+
read_packet_type_separately,
538+
allow_retries_in_cluster_requests);
535539
recreate_read_context = false;
536540
}
537541

@@ -655,7 +659,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
655659
/// We can actually return it, and the first call to RemoteQueryExecutor::read
656660
/// will return earlier. We should consider doing it.
657661
if (packet.block && (packet.block.rows() > 0))
662+
{
663+
if (extension && extension->replica_info)
664+
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
658665
return ReadResult(adaptBlockStructure(packet.block, header));
666+
}
659667
break; /// If the block is empty - we will receive other packets before EndOfStream.
660668

661669
case Protocol::Server::Exception:
@@ -717,6 +725,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
717725
case Protocol::Server::TimezoneUpdate:
718726
break;
719727

728+
case Protocol::Server::ConnectionLost:
729+
if (allow_retries_in_cluster_requests)
730+
{
731+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
732+
{
733+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
734+
{
735+
finished = true;
736+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
737+
return ReadResult(Block{});
738+
}
739+
}
740+
}
741+
packet.exception->rethrow();
742+
break;
743+
720744
default:
721745
got_unknown_packet_from_replica = true;
722746
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,22 @@ class RemoteQueryExecutorReadContext;
2727

2828
class ParallelReplicasReadingCoordinator;
2929

30-
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String(size_t)>;
30+
namespace ErrorCodes
31+
{
32+
extern const int NOT_IMPLEMENTED;
33+
};
34+
35+
class TaskIterator
36+
{
37+
public:
38+
virtual ~TaskIterator() = default;
39+
virtual bool supportRerunTask() const { return false; }
40+
virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */)
41+
{
42+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented");
43+
}
44+
virtual std::string operator()(size_t number_of_current_replica) const = 0;
45+
};
3246

3347
/// This class allows one to launch queries on remote replicas of one shard and get results
3448
class RemoteQueryExecutor
@@ -320,6 +334,10 @@ class RemoteQueryExecutor
320334

321335
const bool read_packet_type_separately = false;
322336

337+
const bool allow_retries_in_cluster_requests = false;
338+
339+
std::unordered_set<size_t> replica_has_processed_data;
340+
323341
/// Send all scalars to remote servers
324342
void sendScalars();
325343

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ namespace ErrorCodes
1919
}
2020

2121
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
22-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
22+
RemoteQueryExecutor & executor_,
23+
bool suspend_when_query_sent_,
24+
bool read_packet_type_separately_,
25+
bool allow_retries_in_cluster_requests_)
2326
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2427
, executor(executor_)
2528
, suspend_when_query_sent(suspend_when_query_sent_)
2629
, read_packet_type_separately(read_packet_type_separately_)
30+
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2731
{
2832
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
2933
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5458
if (read_context.executor.needToSkipUnavailableShard())
5559
return;
5660

57-
while (true)
61+
try
5862
{
59-
read_context.has_read_packet_part = PacketPart::None;
60-
61-
if (read_context.read_packet_type_separately)
63+
while (true)
6264
{
63-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
64-
read_context.has_read_packet_part = PacketPart::Type;
65+
read_context.has_read_packet_part = PacketPart::None;
66+
67+
if (read_context.read_packet_type_separately)
68+
{
69+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
70+
read_context.has_read_packet_part = PacketPart::Type;
71+
suspend_callback();
72+
}
73+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
74+
read_context.has_read_packet_part = PacketPart::Body;
6575
suspend_callback();
6676
}
67-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
68-
read_context.has_read_packet_part = PacketPart::Body;
77+
}
78+
catch (const Exception &)
79+
{
80+
if (!read_context.allow_retries_in_cluster_requests)
81+
throw;
82+
read_context.packet.type = Protocol::Server::ConnectionLost;
83+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
6984
suspend_callback();
7085
}
7186
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2626
{
2727
public:
2828
explicit RemoteQueryExecutorReadContext(
29-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
29+
RemoteQueryExecutor & executor_,
30+
bool suspend_when_query_sent_,
31+
bool read_packet_type_separately_,
32+
bool allow_retries_in_cluster_requests_);
3033

3134
~RemoteQueryExecutorReadContext() override;
3235

@@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
108111
bool suspend_when_query_sent = false;
109112
bool is_query_sent = false;
110113
const bool read_packet_type_separately = false;
114+
const bool allow_retries_in_cluster_requests = false;
111115
};
112116

113117
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -363,6 +363,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
363363
args.insert(args.end(), object_storage_type_arg);
364364
}
365365

366+
class TaskDistributor : public TaskIterator
367+
{
368+
public:
369+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
370+
const std::vector<std::string> & ids_of_hosts)
371+
: task_distributor(iterator, ids_of_hosts) {}
372+
~TaskDistributor() override = default;
373+
bool supportRerunTask() const override { return true; }
374+
void rescheduleTasksFromReplica(size_t number_of_current_replica) override
375+
{
376+
task_distributor.rescheduleTasksFromReplica(number_of_current_replica);
377+
}
378+
379+
std::string operator()(size_t number_of_current_replica) const override
380+
{
381+
return task_distributor.getNextTask(number_of_current_replica).value_or("");
382+
}
383+
384+
private:
385+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
386+
};
387+
366388
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
367389
const ActionsDAG::Node * predicate,
368390
const std::optional<ActionsDAG> & filter_actions_dag,
@@ -386,12 +408,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
386408
}
387409
}
388410

389-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
390-
391-
auto callback = std::make_shared<TaskIterator>(
392-
[task_distributor](size_t number_of_current_replica) mutable -> String {
393-
return task_distributor->getNextTask(number_of_current_replica).value_or("");
394-
});
411+
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts);
395412

396413
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
397414
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
namespace DB
77
{
88

9+
namespace ErrorCodes
10+
{
11+
extern const int LOGICAL_ERROR;
12+
extern const int CANNOT_READ_ALL_DATA;
13+
};
14+
915
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1016
std::shared_ptr<IObjectIterator> iterator_,
1117
std::vector<std::string> ids_of_nodes_)
@@ -14,6 +20,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
1420
, ids_of_nodes(ids_of_nodes_)
1521
, iterator_exhausted(false)
1622
{
23+
size_t nodes = ids_of_nodes.size();
24+
for (size_t i = 0; i < nodes; ++i)
25+
replica_to_files_to_be_processed[i] = std::list<String>{};
1726
}
1827

1928
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
@@ -24,16 +33,27 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
2433
number_of_current_replica
2534
);
2635

27-
// 1. Check pre-queued files first
28-
if (auto file = getPreQueuedFile(number_of_current_replica))
29-
return file;
36+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
37+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
38+
throw Exception(
39+
ErrorCodes::LOGICAL_ERROR,
40+
"Replica number {} was marked as lost, can't set task for it anymore",
41+
number_of_current_replica
42+
);
3043

44+
// 1. Check pre-queued files first
45+
std::optional<String> file = getPreQueuedFile(number_of_current_replica);
3146
// 2. Try to find a matching file from the iterator
32-
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
33-
return file;
34-
47+
if (!file.has_value())
48+
file = getMatchingFileFromIterator(number_of_current_replica);
3549
// 3. Process unprocessed files if iterator is exhausted
36-
return getAnyUnprocessedFile(number_of_current_replica);
50+
if (!file.has_value())
51+
file = getAnyUnprocessedFile(number_of_current_replica);
52+
53+
if (file.has_value())
54+
processed_file_list_ptr->second.push_back(*file);
55+
56+
return file;
3757
}
3858

3959
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
@@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
179199
return std::nullopt;
180200
}
181201

202+
void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica)
203+
{
204+
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
205+
std::lock_guard lock(mutex);
206+
207+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
208+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
209+
throw Exception(
210+
ErrorCodes::LOGICAL_ERROR,
211+
"Replica number {} was marked as lost already",
212+
number_of_current_replica
213+
);
214+
215+
if (replica_to_files_to_be_processed.size() < 2)
216+
throw Exception(
217+
ErrorCodes::CANNOT_READ_ALL_DATA,
218+
"All replicas were marked as lost"
219+
);
220+
221+
for (const auto & file_path : processed_file_list_ptr->second)
222+
unprocessed_files.insert(file_path);
223+
replica_to_files_to_be_processed.erase(number_of_current_replica);
224+
}
225+
182226
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
88
#include <unordered_set>
9+
#include <unordered_map>
10+
#include <list>
911
#include <vector>
1012
#include <mutex>
1113
#include <memory>
@@ -22,6 +24,9 @@ class StorageObjectStorageStableTaskDistributor
2224

2325
std::optional<String> getNextTask(size_t number_of_current_replica);
2426

27+
/// Insert objects back to unprocessed files
28+
void rescheduleTasksFromReplica(size_t number_of_current_replica);
29+
2530
private:
2631
size_t getReplicaForFile(const String & file_path);
2732
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
@@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor
3439
std::unordered_set<String> unprocessed_files;
3540

3641
std::vector<std::string> ids_of_nodes;
42+
std::unordered_map<size_t, std::list<String>> replica_to_files_to_be_processed;
3743

3844
std::mutex mutex;
3945
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)