Skip to content

Commit 9b19467

Browse files
2dmaranadive
andauthored
Flush out commQueue before stopping listener (#830)
* Flush out commQueue before stopping listener Co-authored-by: Adit Ranadive <[email protected]> Signed-off-by: Micha Dery <[email protected]>
1 parent 7f30902 commit 9b19467

File tree

3 files changed

+18
-7
lines changed

3 files changed

+18
-7
lines changed

src/core/agent_data.h

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -91,13 +91,14 @@ class nixlAgentData {
9191
std::hash<std::string>, strEqual> remoteSections;
9292

9393
// State/methods for listener thread
94-
nixlMDStreamListener *listener;
95-
std::map<nixl_socket_peer_t, int> remoteSockets;
96-
std::thread commThread;
97-
std::vector<nixl_comm_req_t> commQueue;
98-
std::mutex commLock;
99-
bool commThreadStop;
100-
bool useEtcd;
94+
nixlMDStreamListener *listener;
95+
std::map<nixl_socket_peer_t, int> remoteSockets;
96+
std::thread commThread;
97+
std::vector<nixl_comm_req_t> commQueue;
98+
std::mutex commLock;
99+
std::atomic<bool> commThreadStop;
100+
std::atomic<bool> agentShutdown;
101+
bool useEtcd;
101102
std::unique_ptr<nixlTelemetry> telemetry_;
102103
std::exception_ptr commThreadException_;
103104

src/core/nixl_agent.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,12 +194,18 @@ nixlAgent::nixlAgent(const std::string &name, const nixlAgentConfig &cfg) :
194194

195195
if (data->useEtcd || cfg.useListenThread) {
196196
data->commThreadStop = false;
197+
data->agentShutdown = false;
197198
data->commThread = std::thread(&nixlAgentData::commWorker, data.get(), std::ref(*this));
198199
}
199200
}
200201

201202
nixlAgent::~nixlAgent() {
202203
if (data && (data->useEtcd || data->config.useListenThread)) {
204+
data->agentShutdown = true;
205+
while (!data->commQueue.empty()) {
206+
std::this_thread::sleep_for(std::chrono::milliseconds(10));
207+
}
208+
203209
data->commThreadStop = true;
204210
if(data->commThread.joinable()) data->commThread.join();
205211

src/core/nixl_listener.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -676,6 +676,10 @@ nixlAgentData::commWorkerInternal(nixlAgent *myAgent) {
676676
}
677677

678678
void nixlAgentData::enqueueCommWork(nixl_comm_req_t request){
679+
if (agentShutdown) {
680+
NIXL_WARN << "Agent shutting down, unable to accept new requests";
681+
return;
682+
}
679683
std::lock_guard<std::mutex> lock(commLock);
680684
commQueue.push_back(std::move(request));
681685
}

0 commit comments

Comments
 (0)