Skip to content

Commit 3be47a7

Browse files
Enmkianton-ru
authored andcommitted
Merge pull request #780 from Altinity/feature/retries_in_cluster_functions
Restart cluster tasks on connection lost
1 parent 0693e71 commit 3be47a7

12 files changed

+254
-57
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/Core/Settings.cpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7027,6 +7027,9 @@ DECLARE(Bool, allow_experimental_ytsaurus_dictionary_source, false, R"(
70277027
)", EXPERIMENTAL) \
70287028
DECLARE(Bool, distributed_plan_force_shuffle_aggregation, false, R"(
70297029
Use Shuffle aggregation strategy instead of PartialAggregation + Merge in distributed query plan.
7030+
)", EXPERIMENTAL) \
7031+
DECLARE(Bool, allow_retries_in_cluster_requests, false, R"(
7032+
Allow retries in cluster request, when one node goes offline
70307033
)", EXPERIMENTAL) \
70317034
\
70327035
/** Experimental timeSeries* aggregate functions. */ \

src/Core/SettingsChangesHistory.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,8 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory()
260260
{"parallel_replicas_for_cluster_engines", false, true, "New setting."},
261261
{"parallel_hash_join_threshold", 0, 0, "New setting"},
262262
/// Release closed. Please use 25.4
263+
{"use_object_storage_list_objects_cache", true, false, "New setting."},
264+
{"allow_retries_in_cluster_requests", false, false, "New setting."},
263265
});
264266
addSettingsChanges(settings_changes_history, "25.2",
265267
{

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ namespace Setting
5252
extern const SettingsBool use_hedged_requests;
5353
extern const SettingsBool push_external_roles_in_interserver_queries;
5454
extern const SettingsMilliseconds parallel_replicas_connect_timeout_ms;
55+
extern const SettingsBool allow_retries_in_cluster_requests;
5556
}
5657

5758
namespace ErrorCodes
@@ -82,6 +83,7 @@ RemoteQueryExecutor::RemoteQueryExecutor(
8283
, extension(extension_)
8384
, priority_func(priority_func_)
8485
, read_packet_type_separately(context->canUseParallelReplicasOnInitiator() && !context->getSettingsRef()[Setting::use_hedged_requests])
86+
, allow_retries_in_cluster_requests(context->getSettingsRef()[Setting::allow_retries_in_cluster_requests])
8587
{
8688
if (stage == QueryProcessingStage::QueryPlan && !query_plan)
8789
throw Exception(ErrorCodes::LOGICAL_ERROR, "Query plan is not passed for QueryPlan processing stage");
@@ -457,7 +459,8 @@ int RemoteQueryExecutor::sendQueryAsync()
457459
read_context = std::make_unique<ReadContext>(
458460
*this,
459461
/*suspend_when_query_sent*/ true,
460-
read_packet_type_separately);
462+
read_packet_type_separately,
463+
allow_retries_in_cluster_requests);
461464

462465
/// If query already sent, do nothing. Note that we cannot use sent_query flag here,
463466
/// because we can still be in process of sending scalars or external tables.
@@ -530,7 +533,8 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::readAsync()
530533
read_context = std::make_unique<ReadContext>(
531534
*this,
532535
/*suspend_when_query_sent*/ false,
533-
read_packet_type_separately);
536+
read_packet_type_separately,
537+
allow_retries_in_cluster_requests);
534538
recreate_read_context = false;
535539
}
536540

@@ -654,7 +658,11 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
654658
/// We can actually return it, and the first call to RemoteQueryExecutor::read
655659
/// will return earlier. We should consider doing it.
656660
if (!packet.block.empty() && (packet.block.rows() > 0))
661+
{
662+
if (extension && extension->replica_info)
663+
replica_has_processed_data.insert(extension->replica_info->number_of_current_replica);
657664
return ReadResult(adaptBlockStructure(packet.block, *header));
665+
}
658666
break; /// If the block is empty - we will receive other packets before EndOfStream.
659667

660668
case Protocol::Server::Exception:
@@ -716,6 +724,22 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
716724
case Protocol::Server::TimezoneUpdate:
717725
break;
718726

727+
case Protocol::Server::ConnectionLost:
728+
if (allow_retries_in_cluster_requests)
729+
{
730+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
731+
{
732+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
733+
{
734+
finished = true;
735+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
736+
return ReadResult(Block{});
737+
}
738+
}
739+
}
740+
packet.exception->rethrow();
741+
break;
742+
719743
default:
720744
got_unknown_packet_from_replica = true;
721745
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,22 @@ class RemoteQueryExecutorReadContext;
3131

3232
class ParallelReplicasReadingCoordinator;
3333

34-
/// This is the same type as StorageS3Source::IteratorWrapper
35-
using TaskIterator = std::function<ClusterFunctionReadTaskResponsePtr(size_t)>;
34+
namespace ErrorCodes
35+
{
36+
extern const int NOT_IMPLEMENTED;
37+
};
38+
39+
class TaskIterator
40+
{
41+
public:
42+
virtual ~TaskIterator() = default;
43+
virtual bool supportRerunTask() const { return false; }
44+
virtual void rescheduleTasksFromReplica(size_t /* number_of_current_replica */)
45+
{
46+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rescheduleTasksFromReplica is not implemented");
47+
}
48+
virtual ClusterFunctionReadTaskResponsePtr operator()(size_t number_of_current_replica) const = 0;
49+
};
3650

3751
/// This class allows one to launch queries on remote replicas of one shard and get results
3852
class RemoteQueryExecutor
@@ -316,6 +330,10 @@ class RemoteQueryExecutor
316330

317331
const bool read_packet_type_separately = false;
318332

333+
const bool allow_retries_in_cluster_requests = false;
334+
335+
std::unordered_set<size_t> replica_has_processed_data;
336+
319337
/// Send all scalars to remote servers
320338
void sendScalars();
321339

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 24 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,15 @@ namespace ErrorCodes
1919
}
2020

2121
RemoteQueryExecutorReadContext::RemoteQueryExecutorReadContext(
22-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_)
22+
RemoteQueryExecutor & executor_,
23+
bool suspend_when_query_sent_,
24+
bool read_packet_type_separately_,
25+
bool allow_retries_in_cluster_requests_)
2326
: AsyncTaskExecutor(std::make_unique<Task>(*this))
2427
, executor(executor_)
2528
, suspend_when_query_sent(suspend_when_query_sent_)
2629
, read_packet_type_separately(read_packet_type_separately_)
30+
, allow_retries_in_cluster_requests(allow_retries_in_cluster_requests_)
2731
{
2832
if (-1 == pipe2(pipe_fd, O_NONBLOCK))
2933
throw ErrnoException(ErrorCodes::CANNOT_OPEN_FILE, "Cannot create pipe");
@@ -54,18 +58,29 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5458
if (read_context.executor.needToSkipUnavailableShard())
5559
return;
5660

57-
while (true)
61+
try
5862
{
59-
read_context.has_read_packet_part = PacketPart::None;
60-
61-
if (read_context.read_packet_type_separately)
63+
while (true)
6264
{
63-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
64-
read_context.has_read_packet_part = PacketPart::Type;
65+
read_context.has_read_packet_part = PacketPart::None;
66+
67+
if (read_context.read_packet_type_separately)
68+
{
69+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
70+
read_context.has_read_packet_part = PacketPart::Type;
71+
suspend_callback();
72+
}
73+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
74+
read_context.has_read_packet_part = PacketPart::Body;
6575
suspend_callback();
6676
}
67-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
68-
read_context.has_read_packet_part = PacketPart::Body;
77+
}
78+
catch (const Exception &)
79+
{
80+
if (!read_context.allow_retries_in_cluster_requests)
81+
throw;
82+
read_context.packet.type = Protocol::Server::ConnectionLost;
83+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
6984
suspend_callback();
7085
}
7186
}

src/QueryPipeline/RemoteQueryExecutorReadContext.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
2626
{
2727
public:
2828
explicit RemoteQueryExecutorReadContext(
29-
RemoteQueryExecutor & executor_, bool suspend_when_query_sent_, bool read_packet_type_separately_);
29+
RemoteQueryExecutor & executor_,
30+
bool suspend_when_query_sent_,
31+
bool read_packet_type_separately_,
32+
bool allow_retries_in_cluster_requests_);
3033

3134
~RemoteQueryExecutorReadContext() override;
3235

@@ -108,6 +111,7 @@ class RemoteQueryExecutorReadContext : public AsyncTaskExecutor
108111
bool suspend_when_query_sent = false;
109112
bool is_query_sent = false;
110113
const bool read_packet_type_separately = false;
114+
const bool allow_retries_in_cluster_requests = false;
111115
};
112116

113117
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 33 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,36 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
191191
}
192192
}
193193

194+
class TaskDistributor : public TaskIterator
195+
{
196+
public:
197+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
198+
std::vector<std::string> && ids_of_hosts,
199+
bool send_over_whole_archive,
200+
uint64_t lock_object_storage_task_distribution_ms,
201+
ContextPtr context_
202+
)
203+
: task_distributor(iterator, std::move(ids_of_hosts), send_over_whole_archive, lock_object_storage_task_distribution_ms)
204+
, context(context_) {}
205+
~TaskDistributor() override = default;
206+
bool supportRerunTask() const override { return true; }
207+
void rescheduleTasksFromReplica(size_t number_of_current_replica) override
208+
{
209+
task_distributor.rescheduleTasksFromReplica(number_of_current_replica);
210+
}
211+
212+
ClusterFunctionReadTaskResponsePtr operator()(size_t number_of_current_replica) const override
213+
{
214+
auto task = task_distributor.getNextTask(number_of_current_replica);
215+
if (task)
216+
return std::make_shared<ClusterFunctionReadTaskResponse>(std::move(task), context);
217+
return std::make_shared<ClusterFunctionReadTaskResponse>();
218+
}
219+
220+
private:
221+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
222+
ContextPtr context;
223+
};
194224

195225
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
196226
const ActionsDAG::Node * predicate,
@@ -238,20 +268,11 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
238268
lock_object_storage_task_distribution_ms_max
239269
);
240270

241-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(
242-
iterator,
271+
auto callback = std::make_shared<TaskDistributor>(iterator,
243272
std::move(ids_of_hosts),
244273
/* send_over_whole_archive */!local_context->getSettingsRef()[Setting::cluster_function_process_archive_on_multiple_nodes],
245-
lock_object_storage_task_distribution_ms);
246-
247-
auto callback = std::make_shared<TaskIterator>(
248-
[task_distributor, local_context](size_t number_of_current_replica) mutable -> ClusterFunctionReadTaskResponsePtr
249-
{
250-
auto task = task_distributor->getNextTask(number_of_current_replica);
251-
if (task)
252-
return std::make_shared<ClusterFunctionReadTaskResponse>(std::move(task), local_context);
253-
return std::make_shared<ClusterFunctionReadTaskResponse>();
254-
});
274+
lock_object_storage_task_distribution_ms,
275+
local_context);
255276

256277
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
257278
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@ namespace DB
99
namespace ErrorCodes
1010
{
1111
extern const int LOGICAL_ERROR;
12-
}
12+
extern const int CANNOT_READ_ALL_DATA;
13+
};
1314

1415
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1516
std::shared_ptr<IObjectIterator> iterator_,
@@ -23,6 +24,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
2324
, lock_object_storage_task_distribution_us(lock_object_storage_task_distribution_ms_ * 1000)
2425
, iterator_exhausted(false)
2526
{
27+
size_t nodes = ids_of_nodes.size();
28+
for (size_t i = 0; i < nodes; ++i)
29+
replica_to_files_to_be_processed[i] = std::list<ObjectInfoPtr>{};
2630
}
2731

2832
ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
@@ -31,16 +35,27 @@ ObjectInfoPtr StorageObjectStorageStableTaskDistributor::getNextTask(size_t numb
3135

3236
saveLastNodeActivity(number_of_current_replica);
3337

34-
// 1. Check pre-queued files first
35-
if (auto file = getPreQueuedFile(number_of_current_replica))
36-
return file;
38+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
39+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
40+
throw Exception(
41+
ErrorCodes::LOGICAL_ERROR,
42+
"Replica number {} was marked as lost, can't set task for it anymore",
43+
number_of_current_replica
44+
);
3745

46+
// 1. Check pre-queued files first
47+
auto file = getPreQueuedFile(number_of_current_replica);
3848
// 2. Try to find a matching file from the iterator
39-
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
40-
return file;
41-
49+
if (!file)
50+
file = getMatchingFileFromIterator(number_of_current_replica);
4251
// 3. Process unprocessed files if iterator is exhausted
43-
return getAnyUnprocessedFile(number_of_current_replica);
52+
if (!file)
53+
file = getAnyUnprocessedFile(number_of_current_replica);
54+
55+
if (file)
56+
processed_file_list_ptr->second.push_back(file);
57+
58+
return file;
4459
}
4560

4661
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
@@ -52,16 +67,27 @@ size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String
5267
return 0;
5368

5469
/// Rendezvous hashing
55-
size_t best_id = 0;
56-
UInt64 best_weight = sipHash64(ids_of_nodes[0] + file_path);
57-
for (size_t id = 1; id < nodes_count; ++id)
70+
auto replica = replica_to_files_to_be_processed.begin();
71+
if (replica == replica_to_files_to_be_processed.end())
72+
throw Exception(
73+
ErrorCodes::LOGICAL_ERROR,
74+
"No active replicas, can't find best replica for file {}",
75+
file_path
76+
);
77+
78+
size_t best_id = replica->first;
79+
UInt64 best_weight = sipHash64(ids_of_nodes[best_id] + file_path);
80+
++replica;
81+
while (replica != replica_to_files_to_be_processed.end())
5882
{
83+
size_t id = replica->first;
5984
UInt64 weight = sipHash64(ids_of_nodes[id] + file_path);
6085
if (weight > best_weight)
6186
{
6287
best_weight = weight;
6388
best_id = id;
6489
}
90+
++replica;
6591
}
6692
return best_id;
6793
}
@@ -234,4 +260,28 @@ void StorageObjectStorageStableTaskDistributor::saveLastNodeActivity(size_t numb
234260
last_node_activity[number_of_current_replica] = now;
235261
}
236262

263+
void StorageObjectStorageStableTaskDistributor::rescheduleTasksFromReplica(size_t number_of_current_replica)
264+
{
265+
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
266+
std::lock_guard lock(mutex);
267+
268+
auto processed_file_list_ptr = replica_to_files_to_be_processed.find(number_of_current_replica);
269+
if (processed_file_list_ptr == replica_to_files_to_be_processed.end())
270+
throw Exception(
271+
ErrorCodes::LOGICAL_ERROR,
272+
"Replica number {} was marked as lost already",
273+
number_of_current_replica
274+
);
275+
276+
if (replica_to_files_to_be_processed.size() < 2)
277+
throw Exception(
278+
ErrorCodes::CANNOT_READ_ALL_DATA,
279+
"All replicas were marked as lost"
280+
);
281+
282+
replica_to_files_to_be_processed.erase(number_of_current_replica);
283+
for (const auto & file : processed_file_list_ptr->second)
284+
unprocessed_files.emplace(file->getPath(), std::make_pair(file, getReplicaForFile(file->getPath())));
285+
}
286+
237287
}

0 commit comments

Comments
 (0)