Skip to content

Commit 7f30902

Browse files
authored
CORE: Communication thread exception handler. (#999)
Signed-off-by: Raul Akhmetshin <[email protected]>
1 parent 4d6cdcd commit 7f30902

File tree

3 files changed

+37
-10
lines changed

3 files changed

+37
-10
lines changed

src/core/agent_data.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,12 @@ class nixlAgentData {
9999
bool commThreadStop;
100100
bool useEtcd;
101101
std::unique_ptr<nixlTelemetry> telemetry_;
102-
void commWorker(nixlAgent* myAgent);
102+
std::exception_ptr commThreadException_;
103+
104+
void
105+
commWorker(nixlAgent &myAgent) noexcept;
106+
void
107+
commWorkerInternal(nixlAgent *myAgent);
103108
void enqueueCommWork(nixl_comm_req_t request);
104109
void getCommWork(std::vector<nixl_comm_req_t> &req_list);
105110
nixl_status_t

src/core/nixl_agent.cpp

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,7 @@ nixlAgent::nixlAgent(const std::string &name, const nixlAgentConfig &cfg) :
194194

195195
if (data->useEtcd || cfg.useListenThread) {
196196
data->commThreadStop = false;
197-
data->commThread =
198-
std::thread(&nixlAgentData::commWorker, data.get(), this);
197+
data->commThread = std::thread(&nixlAgentData::commWorker, data.get(), std::ref(*this));
199198
}
200199
}
201200

@@ -204,6 +203,15 @@ nixlAgent::~nixlAgent() {
204203
data->commThreadStop = true;
205204
if(data->commThread.joinable()) data->commThread.join();
206205

206+
try {
207+
if (data->commThreadException_) {
208+
std::rethrow_exception(data->commThreadException_);
209+
}
210+
}
211+
catch (const std::exception &e) {
212+
NIXL_WARN << "Communication thread has thrown an exception: " << e.what();
213+
}
214+
207215
// Close remaining connections from comm thread
208216
for (auto &[remote, fd] : data->remoteSockets) {
209217
shutdown(fd, SHUT_RDWR);

src/core/nixl_listener.cpp

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -106,18 +106,22 @@ sendCommMessage(int fd, const std::string& msg) {
106106
};
107107

108108
for (size_t i = 0, offset = 0, sent = 0; i < iov_size;) {
109-
auto bytes = send(fd, static_cast<char *>(iov[i].iov_base) + offset, iov[i].iov_len - offset, 0);
109+
auto bytes = send(fd,
110+
static_cast<char *>(iov[i].iov_base) + offset,
111+
iov[i].iov_len - offset,
112+
MSG_NOSIGNAL);
110113
if (bytes < 0) {
111114
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
112115
continue;
113116
}
114117

115118
throw std::runtime_error(
116-
absl::StrFormat("sendCommMessage(fd=%d) %zu/%zu bytes failed, errno=%d",
117-
fd,
118-
sent,
119-
size + sizeof(size),
120-
errno));
119+
absl::StrFormat("sendCommMessage(fd=%d, msg=%s) %zu/%zu bytes failed, errno=%d",
120+
fd,
121+
msg.c_str(),
122+
sent,
123+
size + sizeof(size),
124+
errno));
121125
}
122126

123127
offset += bytes;
@@ -435,8 +439,18 @@ class nixlEtcdClient {
435439

436440
} // unnamed namespace
437441

438-
void nixlAgentData::commWorker(nixlAgent* myAgent){
442+
void
443+
nixlAgentData::commWorker(nixlAgent &myAgent) noexcept {
444+
try {
445+
commWorkerInternal(&myAgent);
446+
}
447+
catch (...) {
448+
commThreadException_ = std::current_exception();
449+
}
450+
}
439451

452+
void
453+
nixlAgentData::commWorkerInternal(nixlAgent *myAgent) {
440454
#if HAVE_ETCD
441455
std::unique_ptr<nixlEtcdClient> etcdClient = nullptr;
442456
// useEtcd is set in nixlAgent constructor and is true if NIXL_ETCD_ENDPOINTS is set

0 commit comments

Comments
 (0)