Skip to content

Commit be61867

Browse files
committed
Return try-catch for unexpected EOF
1 parent 628904a commit be61867

File tree

3 files changed

+47
-11
lines changed

3 files changed

+47
-11
lines changed

src/QueryPipeline/RemoteQueryExecutor.cpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -730,6 +730,15 @@ RemoteQueryExecutor::ReadResult RemoteQueryExecutor::processPacket(Packet packet
730730
break;
731731

732732
case Protocol::Server::ConnectionLost:
733+
if (extension && extension->task_iterator && extension->task_iterator->supportRerunTask() && extension->replica_info)
734+
{
735+
if (!replica_has_processed_data.contains(extension->replica_info->number_of_current_replica))
736+
{
737+
finished = true;
738+
extension->task_iterator->rescheduleTasksFromReplica(extension->replica_info->number_of_current_replica);
739+
return ReadResult(Block{});
740+
}
741+
}
733742
packet.exception->rethrow();
734743
break;
735744

@@ -1000,6 +1009,11 @@ void RemoteQueryExecutor::setProfileInfoCallback(ProfileInfoCallback callback)
10001009
profile_info_callback = std::move(callback);
10011010
}
10021011

1012+
bool RemoteQueryExecutor::skipUnavailableShards() const
1013+
{
1014+
return context->getSettingsRef()[Setting::skip_unavailable_shards];
1015+
}
1016+
10031017
bool RemoteQueryExecutor::needToSkipUnavailableShard() const
10041018
{
10051019
return context->getSettingsRef()[Setting::skip_unavailable_shards] && (0 == connections->size());

src/QueryPipeline/RemoteQueryExecutor.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,8 @@ class RemoteQueryExecutor
233233

234234
IConnections & getConnections() { return *connections; }
235235

236+
bool skipUnavailableShards() const;
237+
236238
bool needToSkipUnavailableShard() const;
237239

238240
bool isReplicaUnavailable() const { return extension && extension->parallel_reading_coordinator && connections->size() == 0; }

src/QueryPipeline/RemoteQueryExecutorReadContext.cpp

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,42 @@ void RemoteQueryExecutorReadContext::Task::run(AsyncCallback async_callback, Sus
5757
if (read_context.executor.needToSkipUnavailableShard())
5858
return;
5959

60-
while (true)
60+
try
6161
{
62-
read_context.has_read_packet_part = PacketPart::None;
62+
while (true)
63+
{
64+
read_context.has_read_packet_part = PacketPart::None;
65+
66+
if (read_context.read_packet_type_separately)
67+
{
68+
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
69+
read_context.has_read_packet_part = PacketPart::Type;
70+
suspend_callback();
71+
}
72+
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
73+
read_context.has_read_packet_part = PacketPart::Body;
74+
if (read_context.packet.type == Protocol::Server::Data)
75+
read_context.has_data_packets = true;
6376

64-
if (read_context.read_packet_type_separately)
77+
suspend_callback();
78+
}
79+
}
80+
catch (const Exception & e)
81+
{
82+
/// If cluster node unxepectedly shutted down (kill/segfault/power off/etc.) socket just closes.
83+
/// If initiator did not process any data packets before, this fact can be ignored.
84+
/// Unprocessed tasks will be executed on other nodes.
85+
if (e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
86+
&& !read_context.has_data_packets.load()
87+
&& read_context.executor.skipUnavailableShards())
6588
{
66-
read_context.packet.type = read_context.executor.getConnections().receivePacketTypeUnlocked(async_callback);
67-
read_context.has_read_packet_part = PacketPart::Type;
89+
read_context.packet.type = Protocol::Server::ConnectionLost;
90+
read_context.packet.exception = std::make_unique<Exception>(getCurrentExceptionMessageAndPattern(true), getCurrentExceptionCode());
91+
read_context.has_read_packet_part = PacketPart::Body;
6892
suspend_callback();
6993
}
70-
read_context.packet = read_context.executor.getConnections().receivePacketUnlocked(async_callback);
71-
read_context.has_read_packet_part = PacketPart::Body;
72-
if (read_context.packet.type == Protocol::Server::Data)
73-
read_context.has_data_packets = true;
74-
75-
suspend_callback();
94+
else
95+
throw;
7696
}
7797
}
7898

0 commit comments

Comments
 (0)