From dfbfa4052b548b46d10f5b7f0cf478b781d6e74a Mon Sep 17 00:00:00 2001 From: Timothy Stamler Date: Mon, 10 Nov 2025 13:30:16 -0600 Subject: [PATCH 1/8] ETCD: initial heartbeat thread --- examples/python/heartbeat_example.py | 92 ++++++++++++++++++++++++++++ src/api/cpp/nixl_params.h | 6 ++ src/core/nixl_listener.cpp | 61 +++++++++++++----- 3 files changed, 144 insertions(+), 15 deletions(-) create mode 100644 examples/python/heartbeat_example.py diff --git a/examples/python/heartbeat_example.py b/examples/python/heartbeat_example.py new file mode 100644 index 000000000..bb75487b3 --- /dev/null +++ b/examples/python/heartbeat_example.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3 + +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import multiprocessing +import time + +import torch + +from nixl._api import nixl_agent, nixl_agent_config +from nixl.logging import get_logger + +logger = get_logger(__name__) + +def run_target(): + """ + Target mode function that runs in a subprocess. + This posts metadata to etcd and then is killed. + """ + logger.info("Target subprocess started") + + config = nixl_agent_config(True, True) + + # Allocate memory and register with NIXL + agent = nixl_agent("target", config) + tensors = [torch.ones(10, dtype=torch.float32) for _ in range(2)] + + logger.info("Target running with tensors: %s", tensors) + + reg_descs = agent.register_memory(tensors) + if not reg_descs: + logger.error("Target: Memory registration failed.") + return + + agent.send_local_metadata("target") + + logger.info("Waiting to die") + + time.sleep(100) + + agent.deregister_memory(reg_descs) + + logger.info("Target subprocess complete successfully (should have died by now).") + +if __name__ == "__main__": + # Start the target process + target_process = multiprocessing.Process( + target=run_target + ) + target_process.start() + + logger.info("Subprocess started") + + config = nixl_agent_config(True, True) + + agent = nixl_agent("initiator", config) + + # Fetch remote metadata when its ready + agent.fetch_remote_metadata("target") + + # Ensure remote metadata has arrived from fetch + ready = False + while not ready: + ready = agent.check_remote_metadata("target") + + # SIGKILL the target process to test heartbeat failure + target_process.kill() + + logger.info("Target process killed, waiting for metadata to be invalidated") + + # Wait for metadata to be invalidated + ready = True + while ready: + ready = agent.check_remote_metadata("target") + + agent.invalidate_local_metadata() + + logger.info("Test Complete.") + diff --git a/src/api/cpp/nixl_params.h b/src/api/cpp/nixl_params.h index d5869b6bb..0809eb37b 100644 --- a/src/api/cpp/nixl_params.h +++ b/src/api/cpp/nixl_params.h @@ -61,6 +61,12 @@ class nixlAgentConfig { */ std::chrono::microseconds etcdWatchTimeout; + /** + * @var ETCD heartbeat interval in seconds + * Interval for renewing etcd lease after metadata is stored. + */ + std::chrono::seconds etcdHeartbeatInterval; + /** * @brief Agent configuration constructor for enabling various features. * @param use_prog_thread flag to determine use of progress thread diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 0dc5e76f3..2d1f4a11f 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -189,6 +189,11 @@ class nixlEtcdClient { std::hash, strEqual> agentWatchers; std::chrono::microseconds watchTimeout_; + std::thread heartbeat_thread; + std::atomic heartbeat_thread_start = false; + std::atomic heartbeat_thread_stop = false; + std::chrono::seconds heartbeat_interval; + // Helper function to create etcd key std::string makeKey(const std::string& agent_name, const std::string& metadata_type) { @@ -199,7 +204,8 @@ class nixlEtcdClient { public: nixlEtcdClient(const std::string &my_agent_name, - const std::chrono::microseconds &timeout = std::chrono::microseconds(5000000)) + const std::chrono::microseconds &timeout = std::chrono::microseconds(5000000), + const std::chrono::seconds &heartbeat_interval = std::chrono::seconds(2)) : watchTimeout_(timeout) { const char* etcd_endpoints = std::getenv("NIXL_ETCD_ENDPOINTS"); if (!etcd_endpoints || strlen(etcd_endpoints) == 0) { @@ -228,6 +234,25 @@ class nixlEtcdClient { } } + ~nixlEtcdClient() { + heartbeat_thread_stop = true; + if (heartbeat_thread.joinable()) { + heartbeat_thread.join(); + } + } + + void startHeartbeatThread(std::chrono::seconds interval = std::chrono::seconds(10)) { + auto lease = etcd->leasekeepalive(interval.count()); + while (!heartbeat_thread_stop) { + if (!lease.is_ok()) { + NIXL_ERROR << "Failed to renew lease: " << lease.error_message(); + return; + } + lease = etcd->leasekeepalive(interval.count()); + std::this_thread::sleep_for(interval); + } + } + // Store metadata in etcd nixl_status_t storeMetadataInEtcd(const std::string& agent_name, const std::string& metadata_type, @@ -250,13 +275,19 @@ class nixlEtcdClient { NIXL_ERROR << "Failed to store " << metadata_type << " in etcd: " << response.error_message(); return NIXL_ERR_BACKEND; } + + if (!heartbeat_thread_start) { + heartbeat_thread_start = true; + heartbeat_thread = std::thread([this]() { + startHeartbeatThread(heartbeat_interval); + }); + } } catch (const std::exception &e) { NIXL_ERROR << "Error sending " << metadata_type << " to etcd: " << e.what(); return NIXL_ERR_BACKEND; } } - // Remove all agent's metadata from etcd nixl_status_t removeMetadataFromEtcd(const std::string& agent_name) { if (!etcd) { @@ -441,7 +472,7 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ std::unique_ptr etcdClient = nullptr; // useEtcd is set in nixlAgent constructor and is true if NIXL_ETCD_ENDPOINTS is set if(useEtcd) { - etcdClient = std::make_unique(name, config.etcdWatchTimeout); + etcdClient = std::make_unique(name, config.etcdWatchTimeout, config.etcdHeartbeatInterval); } #endif // HAVE_ETCD @@ -514,18 +545,18 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ } switch(req_command) { - case SOCK_SEND: { - sendCommMessage(client_fd, "NIXLCOMM:LOAD" + my_MD); - break; - } - case SOCK_FETCH: { - sendCommMessage(client_fd, "NIXLCOMM:SEND"); - break; - } - case SOCK_INVAL: { - sendCommMessage(client_fd, "NIXLCOMM:INVL" + name); - break; - } + case SOCK_SEND: { + sendCommMessage(client_fd, "NIXLCOMM:LOAD" + my_MD); + break; + } + case SOCK_FETCH: { + sendCommMessage(client_fd, "NIXLCOMM:SEND"); + break; + } + case SOCK_INVAL: { + sendCommMessage(client_fd, "NIXLCOMM:INVL" + name); + break; + } #if HAVE_ETCD // ETCD operations using existing methods case ETCD_SEND: From 864d63a4e97b1584c3c4a754c64392a4c276eddf Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Mon, 10 Nov 2025 12:45:31 -0800 Subject: [PATCH 2/8] refactor --- src/core/nixl_listener.cpp | 39 ++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 2d1f4a11f..6613f1e56 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -23,6 +23,7 @@ #include "common/nixl_log.h" #if HAVE_ETCD #include +#include #include #include #endif // HAVE_ETCD @@ -205,8 +206,8 @@ class nixlEtcdClient { public: nixlEtcdClient(const std::string &my_agent_name, const std::chrono::microseconds &timeout = std::chrono::microseconds(5000000), - const std::chrono::seconds &heartbeat_interval = std::chrono::seconds(2)) - : watchTimeout_(timeout) { + const std::chrono::seconds &heartbeat = std::chrono::seconds(2)) + : watchTimeout_(timeout), heartbeat_interval(heartbeat) { const char* etcd_endpoints = std::getenv("NIXL_ETCD_ENDPOINTS"); if (!etcd_endpoints || strlen(etcd_endpoints) == 0) { throw std::runtime_error("No etcd endpoints provided"); @@ -241,15 +242,12 @@ class nixlEtcdClient { } } - void startHeartbeatThread(std::chrono::seconds interval = std::chrono::seconds(10)) { - auto lease = etcd->leasekeepalive(interval.count()); - while (!heartbeat_thread_stop) { - if (!lease.is_ok()) { - NIXL_ERROR << "Failed to renew lease: " << lease.error_message(); - return; - } - lease = etcd->leasekeepalive(interval.count()); - std::this_thread::sleep_for(interval); + void startHeartbeatThread(uint64_t lease_id) { + while (!heartbeat_thread_stop) { + //keep alive for twice the heartbeat interval + etcd::KeepAlive keepalive(*etcd, (heartbeat_interval.count())*2, lease_id); + keepalive.Check(); + std::this_thread::sleep_for(heartbeat_interval); } } @@ -264,13 +262,23 @@ class nixlEtcdClient { try { std::string metadata_key = makeKey(agent_name, metadata_type); - etcd::Response response = etcd->put(metadata_key, metadata); + etcd::Response response = etcd->leasegrant((heartbeat_interval.count())*2); + uint64_t lease_id = response.value().lease(); + + if (response.is_ok()) { + + NIXL_DEBUG << "Successfully leased " << lease_id; + } else { + NIXL_ERROR << "Failed to get lease"; + return NIXL_ERR_BACKEND; + } + + response = etcd->put(metadata_key, metadata, lease_id); if (response.is_ok()) { NIXL_DEBUG << "Successfully stored " << metadata_type << " in etcd with key: " << metadata_key << " (rev " << response.value().modified_index() << ")"; - return NIXL_SUCCESS; } else { NIXL_ERROR << "Failed to store " << metadata_type << " in etcd: " << response.error_message(); return NIXL_ERR_BACKEND; @@ -278,10 +286,9 @@ class nixlEtcdClient { if (!heartbeat_thread_start) { heartbeat_thread_start = true; - heartbeat_thread = std::thread([this]() { - startHeartbeatThread(heartbeat_interval); - }); + heartbeat_thread = std::thread(&nixlEtcdClient::startHeartbeatThread, this, lease_id); } + return NIXL_SUCCESS; } catch (const std::exception &e) { NIXL_ERROR << "Error sending " << metadata_type << " to etcd: " << e.what(); From 2d39fa6001a2cbaba7f4e1da34c35e212267ad97 Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Wed, 12 Nov 2025 08:02:33 -0800 Subject: [PATCH 3/8] bug fixes --- examples/python/heartbeat_example.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/examples/python/heartbeat_example.py b/examples/python/heartbeat_example.py index bb75487b3..11c848e6b 100644 --- a/examples/python/heartbeat_example.py +++ b/examples/python/heartbeat_example.py @@ -19,23 +19,23 @@ import time import torch - from nixl._api import nixl_agent, nixl_agent_config from nixl.logging import get_logger logger = get_logger(__name__) + def run_target(): """ Target mode function that runs in a subprocess. This posts metadata to etcd and then is killed. """ logger.info("Target subprocess started") - - config = nixl_agent_config(True, True) + + config = nixl_agent_config(True, True, 5555) # Allocate memory and register with NIXL - agent = nixl_agent("target", config) + agent = nixl_agent("target", config, ) tensors = [torch.ones(10, dtype=torch.float32) for _ in range(2)] logger.info("Target running with tensors: %s", tensors) @@ -45,7 +45,7 @@ def run_target(): logger.error("Target: Memory registration failed.") return - agent.send_local_metadata("target") + agent.send_local_metadata() logger.info("Waiting to die") @@ -55,15 +55,14 @@ def run_target(): logger.info("Target subprocess complete successfully (should have died by now).") + if __name__ == "__main__": # Start the target process - target_process = multiprocessing.Process( - target=run_target - ) + target_process = multiprocessing.Process(target=run_target) target_process.start() - + logger.info("Subprocess started") - + config = nixl_agent_config(True, True) agent = nixl_agent("initiator", config) @@ -78,15 +77,14 @@ def run_target(): # SIGKILL the target process to test heartbeat failure target_process.kill() - + logger.info("Target process killed, waiting for metadata to be invalidated") - + # Wait for metadata to be invalidated ready = True while ready: ready = agent.check_remote_metadata("target") - + agent.invalidate_local_metadata() - - logger.info("Test Complete.") + logger.info("Test Complete.") From c1cd6b7dc62f3b9b2f154e00827a798863a1e0cb Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Wed, 12 Nov 2025 09:24:23 -0800 Subject: [PATCH 4/8] API update --- examples/python/heartbeat_example.py | 7 ++++++- src/api/cpp/nixl_params.h | 14 +++++++++----- src/core/nixl_listener.cpp | 2 +- 3 files changed, 16 insertions(+), 7 deletions(-) diff --git a/examples/python/heartbeat_example.py b/examples/python/heartbeat_example.py index 11c848e6b..69b208751 100644 --- a/examples/python/heartbeat_example.py +++ b/examples/python/heartbeat_example.py @@ -61,7 +61,9 @@ def run_target(): target_process = multiprocessing.Process(target=run_target) target_process.start() - logger.info("Subprocess started") + logger.info("Subprocess started, pausing...") + + time.sleep(5) config = nixl_agent_config(True, True) @@ -75,6 +77,9 @@ def run_target(): while not ready: ready = agent.check_remote_metadata("target") + logger.info("Ready to kill, pausing...") + + time.sleep(5) # SIGKILL the target process to test heartbeat failure target_process.kill() diff --git a/src/api/cpp/nixl_params.h b/src/api/cpp/nixl_params.h index 0809eb37b..1491a66fc 100644 --- a/src/api/cpp/nixl_params.h +++ b/src/api/cpp/nixl_params.h @@ -62,10 +62,10 @@ class nixlAgentConfig { std::chrono::microseconds etcdWatchTimeout; /** - * @var ETCD heartbeat interval in seconds - * Interval for renewing etcd lease after metadata is stored. + * @var Heartbeat interval in seconds + * Interval for heartbeat that keeps remote metadata valid. */ - std::chrono::seconds etcdHeartbeatInterval; + std::chrono::seconds heartbeatInterval; /** * @brief Agent configuration constructor for enabling various features. @@ -78,6 +78,7 @@ class nixlAgentConfig { * @param lthr_delay_us Optional delay for listener thread in us * @param capture_telemetry Optional flag to enable telemetry capture * @param etcd_watch_timeout Optional timeout for etcd watch operations in microseconds + * @param heartbeat_interval Optional timeout for how often an agent should send a keepalive heartbeat. Only supported in ETCD for now. */ nixlAgentConfig(const bool use_prog_thread, const bool use_listen_thread = false, @@ -88,7 +89,9 @@ class nixlAgentConfig { const uint64_t lthr_delay_us = 100000, const bool capture_telemetry = false, const std::chrono::microseconds &etcd_watch_timeout = - std::chrono::microseconds(5000000)) + std::chrono::microseconds(5000000), + const std::chrono::seconds &heartbeat_interval = + std::chrono::seconds(2)) : useProgThread(use_prog_thread), useListenThread(use_listen_thread), listenPort(port), @@ -96,7 +99,8 @@ class nixlAgentConfig { captureTelemetry(capture_telemetry), pthrDelay(pthr_delay_us), lthrDelay(lthr_delay_us), - etcdWatchTimeout(etcd_watch_timeout) {} + etcdWatchTimeout(etcd_watch_timeout), + heartbeatInterval(heartbeat_interval) {} /** * @brief Copy constructor for nixlAgentConfig object diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 6613f1e56..919b6d35f 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -479,7 +479,7 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ std::unique_ptr etcdClient = nullptr; // useEtcd is set in nixlAgent constructor and is true if NIXL_ETCD_ENDPOINTS is set if(useEtcd) { - etcdClient = std::make_unique(name, config.etcdWatchTimeout, config.etcdHeartbeatInterval); + etcdClient = std::make_unique(name, config.etcdWatchTimeout, config.heartbeatInterval); } #endif // HAVE_ETCD From ea70af4d92499791d97134ef0114d61245b5b593 Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Wed, 12 Nov 2025 14:23:34 -0800 Subject: [PATCH 5/8] functional --- examples/python/heartbeat_example.py | 7 +++++-- src/core/nixl_listener.cpp | 8 +++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/examples/python/heartbeat_example.py b/examples/python/heartbeat_example.py index 69b208751..5d983aeb5 100644 --- a/examples/python/heartbeat_example.py +++ b/examples/python/heartbeat_example.py @@ -35,7 +35,10 @@ def run_target(): config = nixl_agent_config(True, True, 5555) # Allocate memory and register with NIXL - agent = nixl_agent("target", config, ) + agent = nixl_agent( + "target", + config, + ) tensors = [torch.ones(10, dtype=torch.float32) for _ in range(2)] logger.info("Target running with tensors: %s", tensors) @@ -63,7 +66,7 @@ def run_target(): logger.info("Subprocess started, pausing...") - time.sleep(5) + time.sleep(5) config = nixl_agent_config(True, True) diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 919b6d35f..d8079a9c6 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -417,7 +417,7 @@ class nixlEtcdClient { } // Setup a watcher for an agent's metadata invalidation if it doesn't already exist - void setupAgentWatcher(const std::string &agent_name) { + void setupAgentWatcher(const std::string &agent_name, const std::string metadata_label) { if (agentWatchers.find(agent_name) != agentWatchers.end()) { return; } @@ -447,7 +447,9 @@ class nixlEtcdClient { } }; - std::string agent_prefix = makeKey(agent_name, ""); + std::string agent_prefix = makeKey(agent_name, metadata_label); + NIXL_DEBUG << "Create watcher for metadata " << agent_prefix; + agentWatchers[agent_name] = std::make_unique(*etcd, agent_prefix, process_response); } @@ -612,7 +614,7 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ } NIXL_DEBUG << "Successfully loaded metadata for agent: " << remote_agent; - etcdClient->setupAgentWatcher(remote_agent); + etcdClient->setupAgentWatcher(remote_agent, metadata_label); break; } case ETCD_INVAL: From 17c1fcd91809a270ef53ab5e2f065a25e39b9e5a Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Wed, 12 Nov 2025 14:26:23 -0800 Subject: [PATCH 6/8] precommit --- examples/python/heartbeat_example.py | 3 +-- src/core/nixl_listener.cpp | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/examples/python/heartbeat_example.py b/examples/python/heartbeat_example.py index 5d983aeb5..35096f350 100644 --- a/examples/python/heartbeat_example.py +++ b/examples/python/heartbeat_example.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - # SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. # SPDX-License-Identifier: Apache-2.0 # @@ -19,6 +17,7 @@ import time import torch + from nixl._api import nixl_agent, nixl_agent_config from nixl.logging import get_logger diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index d8079a9c6..95b68f2a9 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -243,7 +243,7 @@ class nixlEtcdClient { } void startHeartbeatThread(uint64_t lease_id) { - while (!heartbeat_thread_stop) { + while (!heartbeat_thread_stop) { //keep alive for twice the heartbeat interval etcd::KeepAlive keepalive(*etcd, (heartbeat_interval.count())*2, lease_id); keepalive.Check(); From 7985b71a8fa87769376ca44f9d755170d0cd8b16 Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Wed, 12 Nov 2025 14:28:09 -0800 Subject: [PATCH 7/8] clang format --- src/api/cpp/nixl_params.h | 6 ++--- src/core/nixl_listener.cpp | 55 +++++++++++++++++++++----------------- 2 files changed, 33 insertions(+), 28 deletions(-) diff --git a/src/api/cpp/nixl_params.h b/src/api/cpp/nixl_params.h index 1491a66fc..66dd15567 100644 --- a/src/api/cpp/nixl_params.h +++ b/src/api/cpp/nixl_params.h @@ -78,7 +78,8 @@ class nixlAgentConfig { * @param lthr_delay_us Optional delay for listener thread in us * @param capture_telemetry Optional flag to enable telemetry capture * @param etcd_watch_timeout Optional timeout for etcd watch operations in microseconds - * @param heartbeat_interval Optional timeout for how often an agent should send a keepalive heartbeat. Only supported in ETCD for now. + * @param heartbeat_interval Optional timeout for how often an agent should send a + * keepalive heartbeat. Only supported in ETCD for now. */ nixlAgentConfig(const bool use_prog_thread, const bool use_listen_thread = false, @@ -90,8 +91,7 @@ class nixlAgentConfig { const bool capture_telemetry = false, const std::chrono::microseconds &etcd_watch_timeout = std::chrono::microseconds(5000000), - const std::chrono::seconds &heartbeat_interval = - std::chrono::seconds(2)) + const std::chrono::seconds &heartbeat_interval = std::chrono::seconds(2)) : useProgThread(use_prog_thread), useListenThread(use_listen_thread), listenPort(port), diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index 95b68f2a9..54262bd80 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -207,7 +207,8 @@ class nixlEtcdClient { nixlEtcdClient(const std::string &my_agent_name, const std::chrono::microseconds &timeout = std::chrono::microseconds(5000000), const std::chrono::seconds &heartbeat = std::chrono::seconds(2)) - : watchTimeout_(timeout), heartbeat_interval(heartbeat) { + : watchTimeout_(timeout), + heartbeat_interval(heartbeat) { const char* etcd_endpoints = std::getenv("NIXL_ETCD_ENDPOINTS"); if (!etcd_endpoints || strlen(etcd_endpoints) == 0) { throw std::runtime_error("No etcd endpoints provided"); @@ -242,11 +243,12 @@ class nixlEtcdClient { } } - void startHeartbeatThread(uint64_t lease_id) { + void + startHeartbeatThread(uint64_t lease_id) { while (!heartbeat_thread_stop) { - //keep alive for twice the heartbeat interval - etcd::KeepAlive keepalive(*etcd, (heartbeat_interval.count())*2, lease_id); - keepalive.Check(); + // keep alive for twice the heartbeat interval + etcd::KeepAlive keepalive(*etcd, (heartbeat_interval.count()) * 2, lease_id); + keepalive.Check(); std::this_thread::sleep_for(heartbeat_interval); } } @@ -262,10 +264,10 @@ class nixlEtcdClient { try { std::string metadata_key = makeKey(agent_name, metadata_type); - etcd::Response response = etcd->leasegrant((heartbeat_interval.count())*2); - uint64_t lease_id = response.value().lease(); + etcd::Response response = etcd->leasegrant((heartbeat_interval.count()) * 2); + uint64_t lease_id = response.value().lease(); - if (response.is_ok()) { + if (response.is_ok()) { NIXL_DEBUG << "Successfully leased " << lease_id; } else { @@ -273,7 +275,7 @@ class nixlEtcdClient { return NIXL_ERR_BACKEND; } - response = etcd->put(metadata_key, metadata, lease_id); + response = etcd->put(metadata_key, metadata, lease_id); if (response.is_ok()) { NIXL_DEBUG << "Successfully stored " << metadata_type @@ -286,7 +288,8 @@ class nixlEtcdClient { if (!heartbeat_thread_start) { heartbeat_thread_start = true; - heartbeat_thread = std::thread(&nixlEtcdClient::startHeartbeatThread, this, lease_id); + heartbeat_thread = + std::thread(&nixlEtcdClient::startHeartbeatThread, this, lease_id); } return NIXL_SUCCESS; } @@ -417,7 +420,8 @@ class nixlEtcdClient { } // Setup a watcher for an agent's metadata invalidation if it doesn't already exist - void setupAgentWatcher(const std::string &agent_name, const std::string metadata_label) { + void + setupAgentWatcher(const std::string &agent_name, const std::string metadata_label) { if (agentWatchers.find(agent_name) != agentWatchers.end()) { return; } @@ -448,7 +452,7 @@ class nixlEtcdClient { }; std::string agent_prefix = makeKey(agent_name, metadata_label); - NIXL_DEBUG << "Create watcher for metadata " << agent_prefix; + NIXL_DEBUG << "Create watcher for metadata " << agent_prefix; agentWatchers[agent_name] = std::make_unique(*etcd, agent_prefix, process_response); } @@ -481,7 +485,8 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ std::unique_ptr etcdClient = nullptr; // useEtcd is set in nixlAgent constructor and is true if NIXL_ETCD_ENDPOINTS is set if(useEtcd) { - etcdClient = std::make_unique(name, config.etcdWatchTimeout, config.heartbeatInterval); + etcdClient = std::make_unique( + name, config.etcdWatchTimeout, config.heartbeatInterval); } #endif // HAVE_ETCD @@ -554,18 +559,18 @@ void nixlAgentData::commWorker(nixlAgent* myAgent){ } switch(req_command) { - case SOCK_SEND: { - sendCommMessage(client_fd, "NIXLCOMM:LOAD" + my_MD); - break; - } - case SOCK_FETCH: { - sendCommMessage(client_fd, "NIXLCOMM:SEND"); - break; - } - case SOCK_INVAL: { - sendCommMessage(client_fd, "NIXLCOMM:INVL" + name); - break; - } + case SOCK_SEND: { + sendCommMessage(client_fd, "NIXLCOMM:LOAD" + my_MD); + break; + } + case SOCK_FETCH: { + sendCommMessage(client_fd, "NIXLCOMM:SEND"); + break; + } + case SOCK_INVAL: { + sendCommMessage(client_fd, "NIXLCOMM:INVL" + name); + break; + } #if HAVE_ETCD // ETCD operations using existing methods case ETCD_SEND: From b5065dc2830f32446440528a619e5728d2748189 Mon Sep 17 00:00:00 2001 From: Tim Stamler Date: Fri, 14 Nov 2025 07:54:09 -0800 Subject: [PATCH 8/8] heartbeat for whole agent --- src/core/nixl_listener.cpp | 39 +++++++++++++++++--------------------- 1 file changed, 17 insertions(+), 22 deletions(-) diff --git a/src/core/nixl_listener.cpp b/src/core/nixl_listener.cpp index a2e119147..e317343a1 100644 --- a/src/core/nixl_listener.cpp +++ b/src/core/nixl_listener.cpp @@ -195,7 +195,6 @@ class nixlEtcdClient { std::chrono::microseconds watchTimeout_; std::thread heartbeat_thread; - std::atomic heartbeat_thread_start = false; std::atomic heartbeat_thread_stop = false; std::chrono::seconds heartbeat_interval; @@ -232,8 +231,20 @@ class nixlEtcdClient { NIXL_DEBUG << "Using etcd namespace for agents: " << namespace_prefix; + etcd::Response response = etcd->leasegrant((heartbeat.count()) * 2); + uint64_t lease_id = response.value().lease(); + + if (response.is_ok()) { + + NIXL_DEBUG << "Successfully leased " << lease_id; + } else { + throw std::runtime_error("Failed to get least for agent " + my_agent_name + + " in etcd: " + response.error_message()); + } + + heartbeat_thread = std::thread(&nixlEtcdClient::startHeartbeatThread, this, lease_id); std::string agent_prefix = makeKey(my_agent_name, ""); - etcd::Response response = etcd->put(agent_prefix, ""); + response = etcd->put(agent_prefix, "", lease_id); if (!response.is_ok()) { throw std::runtime_error("Failed to store agent " + my_agent_name + " prefix key in etcd: " + response.error_message()); @@ -268,18 +279,7 @@ class nixlEtcdClient { try { std::string metadata_key = makeKey(agent_name, metadata_type); - etcd::Response response = etcd->leasegrant((heartbeat_interval.count()) * 2); - uint64_t lease_id = response.value().lease(); - - if (response.is_ok()) { - - NIXL_DEBUG << "Successfully leased " << lease_id; - } else { - NIXL_ERROR << "Failed to get lease"; - return NIXL_ERR_BACKEND; - } - - response = etcd->put(metadata_key, metadata, lease_id); + etcd::Response response = etcd->put(metadata_key, metadata); if (response.is_ok()) { NIXL_DEBUG << "Successfully stored " << metadata_type @@ -290,11 +290,6 @@ class nixlEtcdClient { return NIXL_ERR_BACKEND; } - if (!heartbeat_thread_start) { - heartbeat_thread_start = true; - heartbeat_thread = - std::thread(&nixlEtcdClient::startHeartbeatThread, this, lease_id); - } return NIXL_SUCCESS; } catch (const std::exception &e) { @@ -425,7 +420,7 @@ class nixlEtcdClient { // Setup a watcher for an agent's metadata invalidation if it doesn't already exist void - setupAgentWatcher(const std::string &agent_name, const std::string metadata_label) { + setupAgentWatcher(const std::string &agent_name) { if (agentWatchers.find(agent_name) != agentWatchers.end()) { return; } @@ -455,7 +450,7 @@ class nixlEtcdClient { } }; - std::string agent_prefix = makeKey(agent_name, metadata_label); + std::string agent_prefix = makeKey(agent_name, ""); NIXL_DEBUG << "Create watcher for metadata " << agent_prefix; agentWatchers[agent_name] = std::make_unique(*etcd, agent_prefix, process_response); @@ -633,7 +628,7 @@ nixlAgentData::commWorkerInternal(nixlAgent *myAgent) { } NIXL_DEBUG << "Successfully loaded metadata for agent: " << remote_agent; - etcdClient->setupAgentWatcher(remote_agent, metadata_label); + etcdClient->setupAgentWatcher(remote_agent); break; } case ETCD_INVAL: