Skip to content

Commit fe4eee1

Browse files
committed
Restart cluster tasks on connection lost
1 parent c25e57f commit fe4eee1

12 files changed

+208
-31
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6111,6 +6111,9 @@ Possible values:
61116111
)", EXPERIMENTAL) \
61126112
DECLARE(Bool, use_object_storage_list_objects_cache, false, R"(
61136113
Cache the list of objects returned by list objects calls in object storage
6114+
)", EXPERIMENTAL) \
6115+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
6116+
Allow retries in cluster request, when one node goes offline
61146117
)", EXPERIMENTAL) \
61156118
\
61166119
/* ####################################################### */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
7373
{"iceberg_timestamp_ms", 0, 0, "New setting."},
7474
{"iceberg_snapshot_id", 0, 0, "New setting."},
7575
{"use_object_storage_list_objects_cache", true, false, "New setting."},
76+
{"allow_retries_in_cluster_requests", false, false, "New setting."},
7677
});
7778
addSettingsChanges(settings_changes_history, "24.12.2.20000",
7879
{

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ namespace Setting
4848
extern const SettingsOverflowMode timeout_overflow_mode;
4949
extern const SettingsBool use_hedged_requests;
5050
extern const SettingsBool push_external_roles_in_interserver_queries;
51+
extern const SettingsBool allow_retries_in_cluster_requests;
5152
}
5253

5354
namespace ErrorCodes
@@ -76,6 +77,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
7677
, extension(extension_)
7778
, priority_func(priority_func_)
7879
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
80+
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
7981
{
8082
}
8183

@@ -457,7 +459,8 @@ int RemoteQueryExecutor::sendQueryAsync()
457459
read_context = std::make_unique<ReadContext>(
458460
*this,
459461
/*suspend_when_query_sent*/ true,
460-
read_packet_type_separately);
462+
read_packet_type_separately,
463+
allow_retries_in_cluster_requests);
461464

462465
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
463466
/// because we can still be in process of sending scalars or external tables.
@@ -530,7 +533,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
530533
read_context = std::make_unique<ReadContext>(
531534
*this,
532535
/*suspend_when_query_sent*/ false,
533-
read_packet_type_separately);
536+
read_packet_type_separately,
537+
allow_retries_in_cluster_requests);
534538
recreate_read_context = false;
535539
}
536540

@@ -654,7 +658,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
654658
/// We can actually return it, and the first call to RemoteQueryExecutor::read
655659
/// will return earlier. We should consider doing it.
656660
if (packet.block && (packet.block.rows() > 0))
661+
{
662+
if (extension && extension->replica_info)
663+
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
657664
return ReadResult(adaptBlockStructure(packet.block, header));
665+
}
658666
break; /// If the block is empty - we will receive other packets before EndOfStream.
659667

660668
case Protocol::Server::Exception:
@@ -716,6 +724,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
716724
case Protocol::Server::TimezoneUpdate:
717725
break;
718726

727+
case Protocol::Server::ConnectionLost:
728+
if (allow_retries_in_cluster_requests)
729+
{
730+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
731+
{
732+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
733+
{
734+
finished = true;
735+
extension->task_iterator->rerunTasksForReplica(extension->replica_info->number_of_current_replica);
736+
return ReadResult(Block{});
737+
}
738+
}
739+
}
740+
packet.exception->rethrow();
741+
break;
742+
719743
default:
720744
got_unknown_packet_from_replica = true;
721745
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <Interpreters/StorageID.h>
99
#include <sys/types.h>
1010

11+
#include <list>
12+
1113

1214
namespace DB
1315
{
@@ -27,8 +29,22 @@ class RemoteQueryExecutorReadContext;
2729

2830
class ParallelReplicasReadingCoordinator;
2931

30-
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String(size_t)>;
32+
namespace ErrorCodes
33+
{
34+
extern const int NOT_IMPLEMENTED;
35+
};
36+
37+
class TaskIterator
38+
{
39+
public:
40+
virtual ~TaskIterator() = default;
41+
virtual bool supportRerunTask() const { return false; }
42+
virtual void rerunTasksForReplica(size_t /* number_of_current_replica */)
43+
{
44+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rerunTasksForReplica is not implemented");
45+
}
46+
virtual std::string operator()(size_t number_of_current_replica) const = 0;
47+
};
3248

3349
/// This class allows one to launch queries on remote replicas of one shard and get results
3450
class RemoteQueryExecutor
@@ -320,6 +336,10 @@ class RemoteQueryExecutor
320336

321337
const bool read_packet_type_separately = false;
322338

339+
const bool allow_retries_in_cluster_requests = false;
340+
341+
std::unordered_set<size_t> replica_has_processed_data;
342+
323343
/// Send all scalars to remote servers
324344
void sendScalars();
325345

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ namespace ErrorCodes
1919
}
2020

2121
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
22-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
22+
RemoteQueryExecutor & executor_,
23+
bool suspend_when_query_sent_,
24+
bool read_packet_type_separately_,
25+
bool allow_retries_in_cluster_requests_)
2326
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2427
, executor(executor_)
2528
, suspend_when_query_sent(suspend_when_query_sent_)
2629
, read_packet_type_separately(read_packet_type_separately_)
30+
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2731
{
2832
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
2933
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5458
if (read_context.executor.needToSkipUnavailableShard())
5559
return;
5660

57-
while (true)
61+
try
5862
{
59-
read_context.has_read_packet_part = PacketPart::None;
60-
61-
if (read_context.read_packet_type_separately)
63+
while (true)
6264
{
63-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
64-
read_context.has_read_packet_part = PacketPart::Type;
65+
read_context.has_read_packet_part = PacketPart::None;
66+
67+
if (read_context.read_packet_type_separately)
68+
{
69+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
70+
read_context.has_read_packet_part = PacketPart::Type;
71+
suspend_callback();
72+
}
73+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
74+
read_context.has_read_packet_part = PacketPart::Body;
6575
suspend_callback();
6676
}
67-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
68-
read_context.has_read_packet_part = PacketPart::Body;
77+
}
78+
catch (const Exception &)
79+
{
80+
if (!read_context.allow_retries_in_cluster_requests)
81+
throw;
82+
read_context.packet.type = Protocol::Server::ConnectionLost;
83+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
6984
suspend_callback();
7085
}
7186
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2626
{
2727
public:
2828
explicit RemoteQueryExecutorReadContext(
29-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
29+
RemoteQueryExecutor & executor_,
30+
bool suspend_when_query_sent_,
31+
bool read_packet_type_separately_,
32+
bool allow_retries_in_cluster_requests_);
3033

3134
~RemoteQueryExecutorReadContext() override;
3235

@@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
108111
bool suspend_when_query_sent = false;
109112
bool is_query_sent = false;
110113
const bool read_packet_type_separately = false;
114+
const bool allow_retries_in_cluster_requests = false;
111115
};
112116

113117
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
337337
args.insert(args.end(), object_storage_type_arg);
338338
}
339339

340+
class TaskDistributor : public TaskIterator
341+
{
342+
public:
343+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
344+
const std::vector<std::string> & ids_of_hosts)
345+
: task_distributor(iterator, ids_of_hosts) {}
346+
~TaskDistributor() override = default;
347+
bool supportRerunTask() const override { return true; }
348+
void rerunTasksForReplica(size_t number_of_current_replica) override
349+
{
350+
task_distributor.rerunTasksForReplica(number_of_current_replica);
351+
}
352+
353+
std::string operator()(size_t number_of_current_replica) const override
354+
{
355+
return task_distributor.getNextTask(number_of_current_replica).value_or("");
356+
}
357+
358+
private:
359+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
360+
};
361+
340362
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
341363
const ActionsDAG::Node * predicate,
342364
const std::optional<ActionsDAG> & filter_actions_dag,
@@ -360,12 +382,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
360382
}
361383
}
362384

363-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
364-
365-
auto callback = std::make_shared<TaskIterator>(
366-
[task_distributor](size_t number_of_current_replica) mutable -> String {
367-
return task_distributor->getNextTask(number_of_current_replica).value_or("");
368-
});
385+
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts);
369386

370387
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
371388
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
namespace DB
77
{
88

9+
namespace ErrorCodes
10+
{
11+
extern const int LOGICAL_ERROR;
12+
extern const int CANNOT_READ_ALL_DATA;
13+
};
14+
915
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1016
std::shared_ptr<IObjectIterator> iterator_,
1117
std::vector<std::string> ids_of_nodes_)
@@ -14,6 +20,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
1420
, ids_of_nodes(ids_of_nodes_)
1521
, iterator_exhausted(false)
1622
{
23+
size_t nodes = ids_of_nodes.size();
24+
for (size_t i = 0; i < nodes; ++i)
25+
processed_files[i] = std::list<String>{};
1726
}
1827

1928
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
@@ -24,16 +33,27 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
2433
number_of_current_replica
2534
);
2635

27-
// 1. Check pre-queued files first
28-
if (auto file = getPreQueuedFile(number_of_current_replica))
29-
return file;
36+
auto processed_file_list_ptr = processed_files.find(number_of_current_replica);
37+
if (processed_file_list_ptr == processed_files.end())
38+
throw Exception(
39+
ErrorCodes::LOGICAL_ERROR,
40+
"Replica number {} was marked as lost, can't set satk for it anymore",
41+
number_of_current_replica
42+
);
3043

44+
// 1. Check pre-queued files first
45+
std::optional<String> file = getPreQueuedFile(number_of_current_replica);
3146
// 2. Try to find a matching file from the iterator
32-
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
33-
return file;
34-
47+
if (!file.has_value())
48+
file = getMatchingFileFromIterator(number_of_current_replica);
3549
// 3. Process unprocessed files if iterator is exhausted
36-
return getAnyUnprocessedFile(number_of_current_replica);
50+
if (!file.has_value())
51+
file = getAnyUnprocessedFile(number_of_current_replica);
52+
53+
if (file.has_value())
54+
processed_file_list_ptr->second.push_back(*file);
55+
56+
return file;
3757
}
3858

3959
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
@@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
179199
return std::nullopt;
180200
}
181201

202+
void StorageObjectStorageStableTaskDistributor::rerunTasksForReplica(size_t number_of_current_replica)
203+
{
204+
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
205+
std::lock_guard lock(mutex);
206+
207+
auto processed_file_list_ptr = processed_files.find(number_of_current_replica);
208+
if (processed_file_list_ptr == processed_files.end())
209+
throw Exception(
210+
ErrorCodes::LOGICAL_ERROR,
211+
"Replica number {} was marked as lost already",
212+
number_of_current_replica
213+
);
214+
215+
if (processed_files.size() < 2)
216+
throw Exception(
217+
ErrorCodes::CANNOT_READ_ALL_DATA,
218+
"All replicas were marked as lost"
219+
);
220+
221+
for (const auto & file_path : processed_file_list_ptr->second)
222+
unprocessed_files.insert(file_path);
223+
processed_files.erase(number_of_current_replica);
224+
}
225+
182226
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
88
#include <unordered_set>
9+
#include <unordered_map>
10+
#include <list>
911
#include <vector>
1012
#include <mutex>
1113
#include <memory>
@@ -22,6 +24,9 @@ class StorageObjectStorageStableTaskDistributor
2224

2325
std::optional<String> getNextTask(size_t number_of_current_replica);
2426

27+
/// Insert objects back to unprocessed files
28+
void rerunTasksForReplica(size_t number_of_current_replica);
29+
2530
private:
2631
size_t getReplicaForFile(const String & file_path);
2732
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
@@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor
3439
std::unordered_set<String> unprocessed_files;
3540

3641
std::vector<std::string> ids_of_nodes;
42+
std::unordered_map<size_t, std::list<String>> processed_files;
3743

3844
std::mutex mutex;
3945
bool iterator_exhausted = false;

0 commit comments

Comments
 (0)