Skip to content

Commit 69cce89

Browse files
committed
Restart cluster tasks on connection lost
1 parent c25e57f commit 69cce89

9 files changed

+172
-27
lines changed

src/Core/Protocol.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,10 @@ namespace Protocol
9696
MergeTreeReadTaskRequest = 16, /// Request from a MergeTree replica to a coordinator
9797
TimezoneUpdate = 17, /// Receive server's (session-wide) default timezone
9898
SSHChallenge = 18, /// Return challenge for SSH signature signing
99+
99100
MAX = SSHChallenge,
100101

102+
ConnectionLost = 255, /// Exception that occurred on the client side.
101103
};
102104

103105
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,6 +716,16 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
716716
case Protocol::Server::TimezoneUpdate:
717717
break;
718718

719+
case Protocol::Server::ConnectionLost:
720+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
721+
{
722+
finished = true;
723+
extension->task_iterator->rerunTasksForReplica(extension->replica_info->number_of_current_replica);
724+
return ReadResult(Block{});
725+
}
726+
packet.exception->rethrow();
727+
break;
728+
719729
default:
720730
got_unknown_packet_from_replica = true;
721731
throw Exception(

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
#include <Interpreters/StorageID.h>
99
#include <sys/types.h>
1010

11+
#include <list>
12+
1113

1214
namespace DB
1315
{
@@ -27,8 +29,22 @@ class RemoteQueryExecutorReadContext;
2729

2830
class ParallelReplicasReadingCoordinator;
2931

30-
/// This is the same type as StorageS3Source::IteratorWrapper
31-
using TaskIterator = std::function<String(size_t)>;
32+
namespace ErrorCodes
33+
{
34+
extern const int NOT_IMPLEMENTED;
35+
};
36+
37+
class TaskIterator
38+
{
39+
public:
40+
virtual ~TaskIterator() = default;
41+
virtual bool supportRerunTask() const { return false; }
42+
virtual void rerunTasksForReplica(size_t /* number_of_current_replica */)
43+
{
44+
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Method rerunTasksForReplica is not implemented");
45+
}
46+
virtual std::string operator()(size_t number_of_current_replica) const = 0;
47+
};
3248

3349
/// This class allows one to launch queries on remote replicas of one shard and get results
3450
class RemoteQueryExecutor

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -54,18 +54,27 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5454
if (read_context.executor.needToSkipUnavailableShard())
5555
return;
5656

57-
while (true)
57+
try
5858
{
59-
read_context.has_read_packet_part = PacketPart::None;
60-
61-
if (read_context.read_packet_type_separately)
59+
while (true)
6260
{
63-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
64-
read_context.has_read_packet_part = PacketPart::Type;
61+
read_context.has_read_packet_part = PacketPart::None;
62+
63+
if (read_context.read_packet_type_separately)
64+
{
65+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
66+
read_context.has_read_packet_part = PacketPart::Type;
67+
suspend_callback();
68+
}
69+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
70+
read_context.has_read_packet_part = PacketPart::Body;
6571
suspend_callback();
6672
}
67-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
68-
read_context.has_read_packet_part = PacketPart::Body;
73+
}
74+
catch (const Exception &)
75+
{
76+
read_context.packet.type = Protocol::Server::ConnectionLost;
77+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
6978
suspend_callback();
7079
}
7180
}

src/Storages/ObjectStorage/StorageObjectStorageCluster.cpp

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,28 @@ void StorageObjectStorageCluster::updateQueryToSendIfNeeded(
337337
args.insert(args.end(), object_storage_type_arg);
338338
}
339339

340+
class TaskDistributor : public TaskIterator
341+
{
342+
public:
343+
TaskDistributor(std::shared_ptr<IObjectIterator> iterator,
344+
const std::vector<std::string> & ids_of_hosts)
345+
: task_distributor(iterator, ids_of_hosts) {}
346+
~TaskDistributor() override = default;
347+
bool supportRerunTask() const override { return true; }
348+
void rerunTasksForReplica(size_t number_of_current_replica) override
349+
{
350+
task_distributor.rerunTasksForReplica(number_of_current_replica);
351+
}
352+
353+
std::string operator()(size_t number_of_current_replica) const override
354+
{
355+
return task_distributor.getNextTask(number_of_current_replica).value_or("");
356+
}
357+
358+
private:
359+
mutable StorageObjectStorageStableTaskDistributor task_distributor;
360+
};
361+
340362
RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExtension(
341363
const ActionsDAG::Node * predicate,
342364
const std::optional<ActionsDAG> & filter_actions_dag,
@@ -360,12 +382,7 @@ RemoteQueryExecutor::Extension StorageObjectStorageCluster::getTaskIteratorExten
360382
}
361383
}
362384

363-
auto task_distributor = std::make_shared<StorageObjectStorageStableTaskDistributor>(iterator, ids_of_hosts);
364-
365-
auto callback = std::make_shared<TaskIterator>(
366-
[task_distributor](size_t number_of_current_replica) mutable -> String {
367-
return task_distributor->getNextTask(number_of_current_replica).value_or("");
368-
});
385+
auto callback = std::make_shared<TaskDistributor>(iterator, ids_of_hosts);
369386

370387
return RemoteQueryExecutor::Extension{ .task_iterator = std::move(callback) };
371388
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.cpp

Lines changed: 51 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
namespace DB
77
{
88

9+
namespace ErrorCodes
10+
{
11+
extern const int LOGICAL_ERROR;
12+
extern const int CANNOT_READ_ALL_DATA;
13+
};
14+
915
StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistributor(
1016
std::shared_ptr<IObjectIterator> iterator_,
1117
std::vector<std::string> ids_of_nodes_)
@@ -14,6 +20,9 @@ StorageObjectStorageStableTaskDistributor::StorageObjectStorageStableTaskDistrib
1420
, ids_of_nodes(ids_of_nodes_)
1521
, iterator_exhausted(false)
1622
{
23+
size_t nodes = ids_of_nodes.size();
24+
for (size_t i = 0; i < nodes; ++i)
25+
processed_files[i] = std::list<String>{};
1726
}
1827

1928
std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(size_t number_of_current_replica)
@@ -24,16 +33,27 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getNextTask(siz
2433
number_of_current_replica
2534
);
2635

27-
// 1. Check pre-queued files first
28-
if (auto file = getPreQueuedFile(number_of_current_replica))
29-
return file;
36+
auto processed_file_list_ptr = processed_files.find(number_of_current_replica);
37+
if (processed_file_list_ptr == processed_files.end())
38+
throw Exception(
39+
ErrorCodes::LOGICAL_ERROR,
40+
"Replica number {} was marked as lost, can't set satk for it anymore",
41+
number_of_current_replica
42+
);
3043

44+
// 1. Check pre-queued files first
45+
std::optional<String> file = getPreQueuedFile(number_of_current_replica);
3146
// 2. Try to find a matching file from the iterator
32-
if (auto file = getMatchingFileFromIterator(number_of_current_replica))
33-
return file;
34-
47+
if (!file.has_value())
48+
file = getMatchingFileFromIterator(number_of_current_replica);
3549
// 3. Process unprocessed files if iterator is exhausted
36-
return getAnyUnprocessedFile(number_of_current_replica);
50+
if (!file.has_value())
51+
file = getAnyUnprocessedFile(number_of_current_replica);
52+
53+
if (file.has_value())
54+
processed_file_list_ptr->second.push_back(*file);
55+
56+
return file;
3757
}
3858

3959
size_t StorageObjectStorageStableTaskDistributor::getReplicaForFile(const String & file_path)
@@ -179,4 +199,28 @@ std::optional<String> StorageObjectStorageStableTaskDistributor::getAnyUnprocess
179199
return std::nullopt;
180200
}
181201

202+
void StorageObjectStorageStableTaskDistributor::rerunTasksForReplica(size_t number_of_current_replica)
203+
{
204+
LOG_INFO(log, "Replica {} is marked as lost, tasks are returned to queue", number_of_current_replica);
205+
std::lock_guard lock(mutex);
206+
207+
auto processed_file_list_ptr = processed_files.find(number_of_current_replica);
208+
if (processed_file_list_ptr == processed_files.end())
209+
throw Exception(
210+
ErrorCodes::LOGICAL_ERROR,
211+
"Replica number {} was marked as lost already",
212+
number_of_current_replica
213+
);
214+
215+
if (processed_files.size() < 2)
216+
throw Exception(
217+
ErrorCodes::CANNOT_READ_ALL_DATA,
218+
"All replicas were marked as lost"
219+
);
220+
221+
for (const auto & file_path : processed_file_list_ptr->second)
222+
unprocessed_files.insert(file_path);
223+
processed_files.erase(number_of_current_replica);
224+
}
225+
182226
}

src/Storages/ObjectStorage/StorageObjectStorageStableTaskDistributor.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@
66
#include <Storages/ObjectStorage/StorageObjectStorageSource.h>
77
#include <Storages/ObjectStorageQueue/ObjectStorageQueueSource.h>
88
#include <unordered_set>
9+
#include <unordered_map>
10+
#include <list>
911
#include <vector>
1012
#include <mutex>
1113
#include <memory>
@@ -22,6 +24,9 @@ class StorageObjectStorageStableTaskDistributor
2224

2325
std::optional<String> getNextTask(size_t number_of_current_replica);
2426

27+
/// Insert objects back to unprocessed files
28+
void rerunTasksForReplica(size_t number_of_current_replica);
29+
2530
private:
2631
size_t getReplicaForFile(const String & file_path);
2732
std::optional<String> getPreQueuedFile(size_t number_of_current_replica);
@@ -34,6 +39,7 @@ class StorageObjectStorageStableTaskDistributor
3439
std::unordered_set<String> unprocessed_files;
3540

3641
std::vector<std::string> ids_of_nodes;
42+
std::unordered_map<size_t, std::list<String>> processed_files;
3743

3844
std::mutex mutex;
3945
bool iterator_exhausted = false;

src/Storages/StorageFileCluster.cpp

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -74,14 +74,35 @@ void StorageFileCluster::updateQueryToSendIfNeeded(DB::ASTPtr & query, const Sto
7474
expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context);
7575
}
7676

77+
class FileTaskIterator : public TaskIterator
78+
{
79+
public:
80+
FileTaskIterator(const Strings & files,
81+
std::optional<StorageFile::ArchiveInfo> archive_info,
82+
const ActionsDAG::Node * predicate,
83+
const NamesAndTypesList & virtual_columns,
84+
const ContextPtr & context,
85+
bool distributed_processing = false)
86+
: iterator(files, archive_info, predicate, virtual_columns, context, distributed_processing) {}
87+
88+
~FileTaskIterator() override = default;
89+
90+
std::string operator()(size_t /* number_of_current_replica */) const override
91+
{
92+
return iterator.next();
93+
}
94+
95+
private:
96+
mutable StorageFileSource::FilesIterator iterator;
97+
};
98+
7799
RemoteQueryExecutor::Extension StorageFileCluster::getTaskIteratorExtension(
78100
const ActionsDAG::Node * predicate,
79101
const std::optional<ActionsDAG> & /* filter_actions_dag */,
80102
const ContextPtr & context,
81103
ClusterPtr) const
82104
{
83-
auto iterator = std::make_shared<StorageFileSource::FilesIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
84-
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });
105+
auto callback = std::make_shared<FileTaskIterator>(paths, std::nullopt, predicate, getVirtualsList(), context);
85106
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
86107
}
87108

src/Storages/StorageURLCluster.cpp

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,35 @@ void StorageURLCluster::updateQueryToSendIfNeeded(ASTPtr & query, const StorageS
9393
expression_list->children, storage_snapshot->metadata->getColumns().getAll().toNamesAndTypesDescription(), format_name, context);
9494
}
9595

96+
class UrlTaskIterator : public TaskIterator
97+
{
98+
public:
99+
UrlTaskIterator(const String & uri,
100+
size_t max_addresses,
101+
const ActionsDAG::Node * predicate,
102+
const NamesAndTypesList & virtual_columns,
103+
const ContextPtr & context)
104+
: iterator(uri, max_addresses, predicate, virtual_columns, context) {}
105+
106+
~UrlTaskIterator() override = default;
107+
108+
std::string operator()(size_t /* number_of_current_replica */) const override
109+
{
110+
return iterator.next();
111+
}
112+
113+
private:
114+
mutable StorageURLSource::DisclosedGlobIterator iterator;
115+
};
116+
96117
RemoteQueryExecutor::Extension StorageURLCluster::getTaskIteratorExtension(
97118
const ActionsDAG::Node * predicate,
98119
const std::optional<ActionsDAG> & /* filter_actions_dag */,
99120
const ContextPtr & context,
100121
ClusterPtr) const
101122
{
102-
auto iterator = std::make_shared<StorageURLSource::DisclosedGlobIterator>(
123+
auto callback = std::make_shared<UrlTaskIterator>(
103124
uri, context->getSettingsRef()[Setting::glob_expansion_max_elements], predicate, getVirtualsList(), context);
104-
auto callback = std::make_shared<TaskIterator>([iter = std::move(iterator)](size_t) mutable -> String { return iter->next(); });
105125
return RemoteQueryExecutor::Extension{.task_iterator = std::move(callback)};
106126
}
107127

0 commit comments

Comments
 (0)