From ade7dda7d4159a3bec83e6de5fb93aeab5577529 Mon Sep 17 00:00:00 2001 From: Aleksandr Bilkovskii Date: Sun, 19 Oct 2025 17:55:11 +0300 Subject: [PATCH] Telemetry exporter plug-in infra DRAFT: Prometheus exporter implemetation Signed-off-by: Aleksandr Bilkovskii --- src/api/cpp/telemetry/telemetry_exporter.h | 75 ++++ src/api/cpp/telemetry/telemetry_plugin.h | 105 ++++++ src/core/meson.build | 1 + src/core/nixl_agent.cpp | 13 +- src/core/telemetry.cpp | 80 ++++- src/core/telemetry.h | 4 + src/core/telemetry_plugin_manager.cpp | 339 ++++++++++++++++++ src/core/telemetry_plugin_manager.h | 153 ++++++++ src/plugins/meson.build | 3 + src/plugins/telemetry/README.md | 157 ++++++++ src/plugins/telemetry/meson.build | 16 + src/plugins/telemetry/prometheus/README.md | 76 ++++ src/plugins/telemetry/prometheus/meson.build | 59 +++ .../prometheus/prometheus_exporter.cpp | 196 ++++++++++ .../prometheus/prometheus_exporter.h | 75 ++++ .../prometheus/prometheus_plugin.cpp | 36 ++ subprojects/prometheus-cpp.wrap | 6 + 17 files changed, 1388 insertions(+), 6 deletions(-) create mode 100644 src/api/cpp/telemetry/telemetry_exporter.h create mode 100644 src/api/cpp/telemetry/telemetry_plugin.h create mode 100644 src/core/telemetry_plugin_manager.cpp create mode 100644 src/core/telemetry_plugin_manager.h create mode 100644 src/plugins/telemetry/README.md create mode 100644 src/plugins/telemetry/meson.build create mode 100644 src/plugins/telemetry/prometheus/README.md create mode 100644 src/plugins/telemetry/prometheus/meson.build create mode 100644 src/plugins/telemetry/prometheus/prometheus_exporter.cpp create mode 100644 src/plugins/telemetry/prometheus/prometheus_exporter.h create mode 100644 src/plugins/telemetry/prometheus/prometheus_plugin.cpp create mode 100644 subprojects/prometheus-cpp.wrap diff --git a/src/api/cpp/telemetry/telemetry_exporter.h b/src/api/cpp/telemetry/telemetry_exporter.h new file mode 100644 index 000000000..fc88e1eaf --- /dev/null +++ b/src/api/cpp/telemetry/telemetry_exporter.h @@ -0,0 +1,75 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H +#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H + +#include "nixl_types.h" +#include "telemetry_event.h" +#include "common/cyclic_buffer.h" + +#include +#include +#include + +inline constexpr char TELEMETRY_EXPORTER_VAR[] = "NIXL_TELEMETRY_EXPORTER"; +inline constexpr char TELEMETRY_EXPORTER_PLUGIN_DIR_VAR[] = "NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR"; +inline constexpr char TELEMETRY_EXPORTER_OUTPUT_PATH_VAR[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH"; + +/** + * @struct nixlTelemetryExporterInitParams + * @brief Initialization parameters for telemetry exporters + */ +struct nixlTelemetryExporterInitParams { + std::string outputPath; // Output path (file path, URL, etc.) + const size_t eventLimit; // Maximum number of events to buffer +}; + +/** + * @class nixlTelemetryExporter + * @brief Abstract base class for telemetry exporters + * + * This class defines the interface that all telemetry exporters must implement. + * It provides the core functionality for reading telemetry events and exporting + * them to various destinations. + */ +class nixlTelemetryExporter { +protected: + const size_t eventLimit_; + +public: + explicit nixlTelemetryExporter(const nixlTelemetryExporterInitParams &init_params) + : eventLimit_(init_params.eventLimit) {}; + nixlTelemetryExporter(nixlTelemetryExporter &&) = delete; + nixlTelemetryExporter(const nixlTelemetryExporter &) = delete; + + void + operator=(nixlTelemetryExporter &&) = delete; + void + operator=(const nixlTelemetryExporter &) = delete; + + virtual ~nixlTelemetryExporter() = default; + + [[nodiscard]] size_t + getEventLimit() const noexcept { + return eventLimit_; + } + + virtual nixl_status_t + exportEvent(const nixlTelemetryEvent &event) = 0; +}; + +#endif // _TELEMETRY_EXPORTER_H diff --git a/src/api/cpp/telemetry/telemetry_plugin.h b/src/api/cpp/telemetry/telemetry_plugin.h new file mode 100644 index 000000000..b53b9d09d --- /dev/null +++ b/src/api/cpp/telemetry/telemetry_plugin.h @@ -0,0 +1,105 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H +#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H + +#include "telemetry/telemetry_exporter.h" +#include +#include + +enum class NixlTelemetryPluginApiVersion : unsigned int { V1 = 1 }; + +inline constexpr NixlTelemetryPluginApiVersion NIXL_TELEMETRY_PLUGIN_API_VERSION = + NixlTelemetryPluginApiVersion::V1; + +// Type alias for exporter creation function +using ExporterCreatorFn = + std::unique_ptr (*)(const nixlTelemetryExporterInitParams &init_params); + +class nixlTelemetryPlugin { +private: + std::string_view name_; + std::string_view version_; + +public: + NixlTelemetryPluginApiVersion api_version; + ExporterCreatorFn create_exporter; + + nixlTelemetryPlugin(NixlTelemetryPluginApiVersion version, + std::string_view name, + std::string_view ver, + ExporterCreatorFn create) noexcept + : name_(name), + version_(ver), + api_version(version), + create_exporter(create) {} + + [[nodiscard]] std::string_view + getName() const noexcept { + return name_; + } + + [[nodiscard]] std::string_view + getVersion() const noexcept { + return version_; + } +}; + +// Macro to define exported C functions for the plugin +#define NIXL_TELEMETRY_PLUGIN_EXPORT __attribute__((visibility("default"))) + +// Template for creating backend plugins with minimal boilerplate +template class nixlTelemetryPluginCreator { +public: + static nixlTelemetryPlugin * + create(NixlTelemetryPluginApiVersion api_version, + std::string_view name, + std::string_view version) { + static nixlTelemetryPlugin plugin_instance(api_version, name, version, createExporter); + + return &plugin_instance; + } + +private: + static std::unique_ptr + createExporter(const nixlTelemetryExporterInitParams &init_params) { + try { + return std::make_unique(init_params); + } + catch (const std::exception &e) { + NIXL_ERROR << "Failed to create exporter: " << e.what(); + return nullptr; + } + } +}; + +// Plugin must implement these functions for dynamic loading +// Note: extern "C" is required for dynamic loading to avoid C++ name mangling +extern "C" { +// Initialize the plugin +NIXL_TELEMETRY_PLUGIN_EXPORT +nixlTelemetryPlugin * +nixl_telemetry_plugin_init(); + +// Cleanup the plugin +NIXL_TELEMETRY_PLUGIN_EXPORT +void +nixl_telemetry_plugin_fini(); +} + +#endif // _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H diff --git a/src/core/meson.build b/src/core/meson.build index d00fd7362..d73ce615a 100644 --- a/src/core/meson.build +++ b/src/core/meson.build @@ -62,6 +62,7 @@ nixl_lib = library('nixl', 'nixl_plugin_manager.cpp', 'nixl_listener.cpp', 'telemetry.cpp', + 'telemetry_plugin_manager.cpp', include_directories: [ nixl_inc_dirs, utils_inc_dirs ], link_args: ['-lstdc++fs'], dependencies: nixl_lib_deps, diff --git a/src/core/nixl_agent.cpp b/src/core/nixl_agent.cpp index 2d8bff7a3..8ca48b4ed 100644 --- a/src/core/nixl_agent.cpp +++ b/src/core/nixl_agent.cpp @@ -127,6 +127,8 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg memorySection = new nixlLocalSection(); const char *telemetry_env_val = std::getenv(TELEMETRY_ENABLED_VAR); const char *telemetry_env_dir = std::getenv(TELEMETRY_DIR_VAR); + const char *telemetry_env_exporter = std::getenv(TELEMETRY_EXPORTER_VAR); + const char *telemetry_env_exporter_plugin_dir = std::getenv(TELEMETRY_EXPORTER_PLUGIN_DIR_VAR); if (telemetry_env_val != nullptr) { if (!strcasecmp(telemetry_env_val, "y") || !strcasecmp(telemetry_env_val, "1") || @@ -136,8 +138,17 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg std::string telemetry_file = std::string(telemetry_env_dir) + "/" + name; telemetry_ = std::make_unique(telemetry_file, backendEngines); NIXL_DEBUG << "NIXL telemetry is enabled with output file: " << telemetry_file; + } else if (telemetry_env_exporter_plugin_dir != nullptr && + telemetry_env_exporter != nullptr) { + std::string telemetry_file = + std::string(telemetry_env_exporter_plugin_dir) + "/" + telemetry_env_exporter; + telemetry_ = std::make_unique(telemetry_file, backendEngines); + NIXL_DEBUG << "NIXL telemetry is enabled with exporter type: " + << telemetry_env_exporter + << " from: " << telemetry_env_exporter_plugin_dir; } else { - NIXL_DEBUG << "NIXL telemetry is enabled without an output file"; + NIXL_DEBUG << "NIXL telemetry is enabled without an output file or exporter plugin" + "directory and exporter type"; } } else if (cfg.captureTelemetry) { telemetryEnabled = true; diff --git a/src/core/telemetry.cpp b/src/core/telemetry.cpp index 6c2ff6910..2399b654b 100644 --- a/src/core/telemetry.cpp +++ b/src/core/telemetry.cpp @@ -26,6 +26,7 @@ #include "common/nixl_log.h" #include "telemetry.h" #include "telemetry_event.h" +#include "telemetry_plugin_manager.h" #include "util.h" using namespace std::chrono_literals; @@ -75,23 +76,92 @@ nixlTelemetry::initializeTelemetry() { throw std::invalid_argument("Telemetry buffer size cannot be 0"); } - NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path - << " with size: " << buffer_size; - buffer_ = std::make_unique>( - full_file_path, true, TELEMETRY_VERSION, buffer_size); + // Check if exporter is enabled and which type to use + const char *exporter_type_env = std::getenv(TELEMETRY_EXPORTER_VAR); + if (exporter_type_env) { + // Use plugin-based exporter + std::string exporter_type = exporter_type_env; + + NIXL_INFO << "Telemetry exporter enabled, type: " << exporter_type; + + // Prepare initialization parameters for the exporter + const nixlTelemetryExporterInitParams init_params = { + .outputPath = std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) ? + std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) : + full_file_path.string(), + .eventLimit = buffer_size}; + + // Create exporter through plugin manager + auto &exporter_manager = nixlTelemetryPluginManager::getInstance(); + exporter_ = exporter_manager.createExporter(exporter_type, init_params); + + if (!exporter_) { + NIXL_WARN << "Failed to create telemetry exporter '" << exporter_type + << "', falling back to buffer mode"; + // Fall back to buffer mode + buffer_ = std::make_unique>( + full_file_path, true, TELEMETRY_VERSION, buffer_size); + writeTask_.callback_ = [this]() { return writeEventHelper(); }; + } else { + writeTask_.callback_ = [this]() { return telemetryExporterHelper(); }; + } + } else { + // Use buffer mode (default) + NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path + << " with size: " << buffer_size; + + buffer_ = std::make_unique>( + full_file_path, true, TELEMETRY_VERSION, buffer_size); + + writeTask_.callback_ = [this]() { return writeEventHelper(); }; + } auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ? std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) : DEFAULT_TELEMETRY_RUN_INTERVAL; // Update write task interval and start it - writeTask_.callback_ = [this]() { return writeEventHelper(); }; + writeTask_.interval_ = run_interval; writeTask_.enabled_ = true; registerPeriodicTask(writeTask_); } +bool +nixlTelemetry::telemetryExporterHelper() { + std::vector next_queue; + next_queue.reserve(exporter_->getEventLimit()); + { + std::lock_guard lock(mutex_); + events_.swap(next_queue); + } + for (auto &event : next_queue) { + // if full, ignore + exporter_->exportEvent(event); + } + // collect all events and sort them by timestamp + std::vector all_events; + for (auto &backend : backendMap_) { + auto backend_events = backend.second->getTelemetryEvents(); + for (auto &event : backend_events) { + // don't trust enum value coming from backend, + // as it might be different from the one in agent + event.category_ = nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND; + all_events.push_back(event); + } + } + std::sort(all_events.begin(), + all_events.end(), + [](const nixlTelemetryEvent &a, const nixlTelemetryEvent &b) { + return a.timestampUs_ < b.timestampUs_; + }); + for (auto &event : all_events) { + exporter_->exportEvent(event); + } + return true; +} + bool nixlTelemetry::writeEventHelper() { std::vector next_queue; diff --git a/src/core/telemetry.h b/src/core/telemetry.h index 2e9f1baff..b600db9fd 100644 --- a/src/core/telemetry.h +++ b/src/core/telemetry.h @@ -19,6 +19,7 @@ #include "common/cyclic_buffer.h" #include "telemetry_event.h" +#include "telemetry/telemetry_exporter.h" #include "mem_section.h" #include "nixl_types.h" @@ -81,8 +82,11 @@ class nixlTelemetry { updateData(const std::string &event_name, nixl_telemetry_category_t category, uint64_t value); bool writeEventHelper(); + bool + telemetryExporterHelper(); std::unique_ptr> buffer_; std::vector events_; + std::unique_ptr exporter_; std::mutex mutex_; asio::thread_pool pool_; periodicTask writeTask_; diff --git a/src/core/telemetry_plugin_manager.cpp b/src/core/telemetry_plugin_manager.cpp new file mode 100644 index 000000000..4d1515485 --- /dev/null +++ b/src/core/telemetry_plugin_manager.cpp @@ -0,0 +1,339 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "telemetry_plugin_manager.h" +#include "common/nixl_log.h" +#include +#include +#include +#include + +using lock_guard = const std::lock_guard; + +void +nixlTelemetryPluginHandle::DlHandleDeleter::operator()(void *handle) const noexcept { + if (handle) { + using fini_func_t = void (*)(); + fini_func_t fini = + reinterpret_cast(dlsym(handle, "nixl_telemetry_plugin_fini")); + if (fini) { + try { + fini(); + } + catch (const std::exception &e) { + NIXL_ERROR << "Exception in plugin cleanup: " << e.what(); + } + catch (...) { + NIXL_ERROR << "Unknown exception in plugin cleanup"; + } + } + + dlclose(handle); + } +} + +nixlTelemetryPluginHandle::nixlTelemetryPluginHandle(std::unique_ptr handle, + nixlTelemetryPlugin *plugin) + : handle_(std::move(handle)), + plugin_(plugin) { + assert(handle_ && "Plugin handle must not be null"); + assert(plugin_ && "Plugin interface must not be null"); +} + +nixlTelemetryPluginHandle::~nixlTelemetryPluginHandle() = default; + +std::unique_ptr +nixlTelemetryPluginHandle::createExporter( + const nixlTelemetryExporterInitParams &init_params) const { + if (plugin_ && plugin_->create_exporter) { + return plugin_->create_exporter(init_params); + } + return nullptr; +} + +const char * +nixlTelemetryPluginHandle::getName() const { + if (plugin_) { + auto name = plugin_->getName(); + return !name.empty() ? name.data() : "unknown"; + } + return "unknown"; +} + +const char * +nixlTelemetryPluginHandle::getVersion() const { + if (plugin_) { + auto version = plugin_->getVersion(); + return !version.empty() ? version.data() : "unknown"; + } + return "unknown"; +} + +// Helper function to get plugin directory +namespace { +static std::filesystem::path +getTelemetryPluginDir() { + // Environment variable takes precedence + const char *plugin_dir = getenv(TELEMETRY_EXPORTER_PLUGIN_DIR_VAR); + if (plugin_dir) { + return plugin_dir; + } + + // By default, use the telemetry_exporters subdirectory relative to the library + Dl_info info; + int ok = dladdr(reinterpret_cast(&getTelemetryPluginDir), &info); + if (!ok) { + NIXL_ERROR << "Failed to get telemetry plugin directory from dladdr"; + return ""; + } + return std::filesystem::path(info.dli_fname).parent_path() / "telemetry_exporters"; +} +} // namespace + +// Plugin Manager implementation +nixlTelemetryPluginManager::nixlTelemetryPluginManager() { + std::filesystem::path plugin_dir = getTelemetryPluginDir(); + if (!plugin_dir.empty()) { + NIXL_DEBUG << "Loading telemetry exporter plugins from: " << plugin_dir; + plugin_dirs_.push_back(plugin_dir); + + // Auto-discover plugins if directory exists + if (std::filesystem::exists(plugin_dir) && std::filesystem::is_directory(plugin_dir)) { + discoverPluginsFromDir(plugin_dir); + } + } +} + +nixlTelemetryPluginManager & +nixlTelemetryPluginManager::getInstance() { + // Meyers singleton - thread-safe in C++11+ + static nixlTelemetryPluginManager instance; + return instance; +} + +std::shared_ptr +nixlTelemetryPluginManager::loadPluginFromPath(const std::filesystem::path &plugin_path) { + std::unique_ptr handle( + dlopen(plugin_path.c_str(), RTLD_NOW | RTLD_LOCAL)); + + if (!handle) { + NIXL_ERROR << "Failed to load telemetry exporter plugin from " << plugin_path << ": " + << dlerror(); + return nullptr; + } + + // Get the initialization function + using init_func_t = nixlTelemetryPlugin *(*)(); + init_func_t init = + reinterpret_cast(dlsym(handle.get(), "nixl_telemetry_plugin_init")); + if (!init) { + NIXL_ERROR << "Failed to find nixl_telemetry_plugin_init in " << plugin_path << ": " + << dlerror(); + return nullptr; + } + + // Call the initialization function + nixlTelemetryPlugin *plugin = init(); + if (!plugin) { + NIXL_ERROR << "Telemetry exporter plugin initialization failed for " << plugin_path; + return nullptr; + } + + // Check API version + if (plugin->api_version != NIXL_TELEMETRY_PLUGIN_API_VERSION) { + NIXL_ERROR << "Telemetry exporter plugin API version mismatch for " << plugin_path + << ": expected " << static_cast(NIXL_TELEMETRY_PLUGIN_API_VERSION) + << ", got " << static_cast(plugin->api_version); + return nullptr; + } + + // Create and store the plugin handle - transfer ownership from unique_ptr + auto plugin_handle = + std::make_shared(std::move(handle), plugin); + + return plugin_handle; +} + +std::shared_ptr +nixlTelemetryPluginManager::loadPlugin(const std::string &plugin_name) { + lock_guard lg(lock_); + + // Check if the plugin is already loaded + auto it = loaded_plugins_.find(plugin_name); + if (it != loaded_plugins_.end()) { + return it->second; + } + + // Track all directories tried for error reporting + std::vector tried_paths; + + // Try to load the plugin from all registered directories + for (const auto &dir : plugin_dirs_) { + if (dir.empty()) { + continue; + } + + // Construct plugin path: libtelemetry_exporter_.so + std::filesystem::path plugin_path = dir / ("libtelemetry_exporter_" + plugin_name + ".so"); + + tried_paths.push_back(plugin_path); + + // Check if the plugin file exists + if (!std::filesystem::exists(plugin_path)) { + NIXL_DEBUG << "Telemetry exporter plugin file does not exist: " << plugin_path; + continue; + } + + auto plugin_handle = loadPluginFromPath(plugin_path); + if (plugin_handle) { + loaded_plugins_.try_emplace(plugin_name, plugin_handle); + NIXL_INFO << "Loaded telemetry exporter plugin: " << plugin_name << " (version " + << plugin_handle->getVersion() << ")"; + return plugin_handle; + } + } + + // Failed to load the plugin - show all directories tried + NIXL_ERROR << "Failed to load telemetry exporter plugin '" << plugin_name + << "' from any directory. Tried " << tried_paths.size() << " location(s):"; + for (const auto &path : tried_paths) { + NIXL_ERROR << " - " << path; + } + + return nullptr; +} + +std::shared_ptr +nixlTelemetryPluginManager::getPlugin(std::string_view plugin_name) { + lock_guard lg(lock_); + + auto it = loaded_plugins_.find(plugin_name); + if (it != loaded_plugins_.end()) { + return it->second; + } + return nullptr; +} + +std::unique_ptr +nixlTelemetryPluginManager::createExporter(std::string_view plugin_name, + const nixlTelemetryExporterInitParams &init_params) { + + // Load plugin if not already loaded + auto plugin_handle = getPlugin(plugin_name); + if (!plugin_handle) { + plugin_handle = loadPlugin(std::string(plugin_name)); + } + + if (!plugin_handle) { + NIXL_ERROR << "Cannot create exporter: plugin '" << plugin_name << "' not found"; + return nullptr; + } + + // Create the exporter instance (smart pointer handles cleanup automatically) + auto exporter = plugin_handle->createExporter(init_params); + if (!exporter) { + NIXL_ERROR << "Failed to create exporter instance from plugin '" << plugin_name << "'"; + return nullptr; + } + + NIXL_INFO << "Created telemetry exporter from plugin: " << plugin_name; + return exporter; +} + +void +nixlTelemetryPluginManager::discoverPluginsFromDir(const std::filesystem::path &dirpath) { + std::error_code ec; + std::filesystem::directory_iterator dir_iter(dirpath, ec); + if (ec) { + NIXL_ERROR << "Error accessing telemetry exporter plugin directory(" << dirpath + << "): " << ec.message(); + return; + } + + for (const auto &entry : dir_iter) { + std::string filename = entry.path().filename().string(); + NIXL_INFO << "Entry:" << filename; + + if (filename.size() < 27) continue; // "libtelemetry_exporter_X.so" min length + + // Check if this is a telemetry exporter plugin file + if (filename.substr(0, 24) == "libtelemetry_exporter_" && + filename.substr(filename.size() - 3) == ".so") { + + // Extract plugin name: libtelemetry_exporter_.so + std::string plugin_name = filename.substr(24, filename.size() - 27); + + // Try to load the plugin + auto plugin = loadPlugin(plugin_name); + if (plugin) { + NIXL_INFO << "Discovered and loaded telemetry exporter plugin: " << plugin_name; + } + } + } +} + +void +nixlTelemetryPluginManager::addPluginDirectory(const std::filesystem::path &directory) { + if (directory.empty()) { + NIXL_ERROR << "Cannot add empty telemetry exporter plugin directory"; + return; + } + + // Check if directory exists + if (!std::filesystem::exists(directory) || !std::filesystem::is_directory(directory)) { + NIXL_ERROR << "Telemetry exporter plugin directory does not exist or is not readable: " + << directory; + return; + } + + { + lock_guard lg(lock_); + + // Check if directory is already in the list + for (const auto &dir : plugin_dirs_) { + if (dir == directory) { + NIXL_WARN << "Telemetry exporter plugin directory already registered: " + << directory; + return; + } + } + + // Prioritize the new directory by inserting it at the beginning + plugin_dirs_.insert(plugin_dirs_.begin(), directory); + } + + // Discover plugins in the new directory + discoverPluginsFromDir(directory); +} + +void +nixlTelemetryPluginManager::unloadPlugin(std::string_view plugin_name) { + lock_guard lg(lock_); + loaded_plugins_.erase(std::string(plugin_name)); +} + +std::vector +nixlTelemetryPluginManager::getLoadedPluginNames() const { + lock_guard lg(lock_); + + std::vector names; + names.reserve(loaded_plugins_.size()); + for (const auto &pair : loaded_plugins_) { + names.push_back(pair.first); + } + return names; +} diff --git a/src/core/telemetry_plugin_manager.h b/src/core/telemetry_plugin_manager.h new file mode 100644 index 000000000..8eebc7ff9 --- /dev/null +++ b/src/core/telemetry_plugin_manager.h @@ -0,0 +1,153 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _TELEMETRY_PLUGIN_MANAGER_H +#define _TELEMETRY_PLUGIN_MANAGER_H + +#include +#include +#include +#include +#include +#include +#include +#include "telemetry/telemetry_plugin.h" + +/** + * This class represents a telemetry exporter plugin handle used to create exporter instances. + * Plugin handle attributes are modified only in the constructor and destructor and remain + * unchanged during normal operation. This allows using it in multi-threading environments + * without lock protection. + */ +class nixlTelemetryPluginHandle { +public: + struct DlHandleDeleter { + void + operator()(void *handle) const noexcept; + }; + +private: + std::unique_ptr handle_; // Handle to the dynamically loaded library + nixlTelemetryPlugin *plugin_; // Plugin interface + +public: + nixlTelemetryPluginHandle(std::unique_ptr handle, + nixlTelemetryPlugin *plugin); + ~nixlTelemetryPluginHandle(); + + [[nodiscard]] std::unique_ptr + createExporter(const nixlTelemetryExporterInitParams &init_params) const; + const char * + getName() const; + const char * + getVersion() const; +}; + +/** + * Telemetry Exporter Plugin Manager + * + * Manages dynamic loading of telemetry exporter plugins. Unlike the backend plugin manager, + * this only supports dynamic plugins (no static plugins) for simplicity. + */ +class nixlTelemetryPluginManager { +private: + std::map, std::less<>> + loaded_plugins_; + std::vector plugin_dirs_; + mutable std::mutex lock_; + + // Private constructor for singleton pattern + nixlTelemetryPluginManager(); + + [[nodiscard]] std::shared_ptr + loadPluginFromPath(const std::filesystem::path &plugin_path); + +public: + // Singleton instance accessor + [[nodiscard]] static nixlTelemetryPluginManager & + getInstance(); + + // Delete copy constructor and assignment operator + nixlTelemetryPluginManager(const nixlTelemetryPluginManager &) = delete; + nixlTelemetryPluginManager & + operator=(const nixlTelemetryPluginManager &) = delete; + + /** + * Load a specific exporter plugin by name + * Searches in registered plugin directories for libtelemetry_exporter_.so + * + * @param plugin_name Name of the plugin (e.g., "file", "prometheus", "otlp") + * @return Plugin handle or nullptr if not found + */ + [[nodiscard]] std::shared_ptr + loadPlugin(const std::string &plugin_name); + + /** + * Get an already loaded plugin handle + * + * @param plugin_name Name of the plugin + * @return Plugin handle or nullptr if not loaded + */ + [[nodiscard]] std::shared_ptr + getPlugin(std::string_view plugin_name); + + /** + * Create an exporter instance from a plugin + * + * @param plugin_name Name of the plugin to use + * @param init_params Initialization parameters for the exporter + * @return Unique pointer to created exporter or nullptr on failure + */ + [[nodiscard]] std::unique_ptr + createExporter(std::string_view plugin_name, + const nixlTelemetryExporterInitParams &init_params); + + /** + * Discover and load all plugins from a directory + * Looks for files matching: libtelemetry_exporter_*.so + * + * @param dirpath Directory path to search + */ + void + discoverPluginsFromDir(const std::filesystem::path &dirpath); + + /** + * Add a directory to search for plugins + * New directory is prioritized over existing ones + * + * @param directory Path to plugin directory + */ + void + addPluginDirectory(const std::filesystem::path &directory); + + /** + * Unload a plugin + * + * @param plugin_name Name of the plugin to unload + */ + void + unloadPlugin(std::string_view plugin_name); + + /** + * Get all loaded plugin names + * + * @return Vector of loaded plugin names + */ + [[nodiscard]] std::vector + getLoadedPluginNames() const; +}; + +#endif // _TELEMETRY_PLUGIN_MANAGER_H diff --git a/src/plugins/meson.build b/src/plugins/meson.build index 2a97127b4..81087d75a 100644 --- a/src/plugins/meson.build +++ b/src/plugins/meson.build @@ -23,10 +23,13 @@ endif subdir('posix') # Always try to build POSIX backend, it will handle its own dependencies subdir('obj') # Always try to build Obj backend, it will handle its own dependencies +subdir('telemetry') + if libfabric_dep.found() subdir('libfabric') endif + disable_gds_backend = get_option('disable_gds_backend') if not disable_gds_backend and cuda_dep.found() subdir('cuda_gds') diff --git a/src/plugins/telemetry/README.md b/src/plugins/telemetry/README.md new file mode 100644 index 000000000..b52f6d94c --- /dev/null +++ b/src/plugins/telemetry/README.md @@ -0,0 +1,157 @@ + + +# NIXL Custom Telemetry Plugin Development Guide + +This guide explains how to create custom telemetry exporter plugins for NIXL. Telemetry plugins allow you to export NIXL telemetry data to different monitoring systems, databases, or file formats. + +## Overview + +NIXL telemetry plugins are dynamically loaded shared libraries that export telemetry events from the NIXL agent + +### Built-in Event Types + +NIXL generates the following telemetry events: + +| Category | Event Name | Type | Description | +|----------|-----------|------|-------------| +| MEMORY | `agent_memory_registered` | Gauge | Total bytes of memory registered | +| MEMORY | `agent_memory_deregistered` | Gauge | Total bytes of memory deregistered | +| TRANSFER | `agent_tx_bytes` | Counter | Total bytes transmitted | +| TRANSFER | `agent_rx_bytes` | Counter | Total bytes received | +| TRANSFER | `agent_tx_requests_num` | Counter | Number of transmit requests | +| TRANSFER | `agent_rx_requests_num` | Counter | Number of receive requests | +| PERFORMANCE | `agent_xfer_time` | Gauge | Transfer time in microseconds | +| PERFORMANCE | `agent_xfer_post_time` | Gauge | Post time in microseconds | +| BACKEND | Backend-specific events | Counter | Dynamic events from backends | + +## Quick Start + +Here's a minimal example of a CSV file exporter plugin: + +### 1. Create Your Exporter Class (`csv_exporter.h`) + +```cpp +#ifndef _TELEMETRY_CSV_EXPORTER_H +#define _TELEMETRY_CSV_EXPORTER_H + +#include "telemetry/telemetry_exporter.h" +#include + +class nixlTelemetryCsvExporter : public nixlTelemetryExporter { +public: + explicit nixlTelemetryCsvExporter(const nixlTelemetryExporterInitParams *init_params); + ~nixlTelemetryCsvExporter() override; + + nixl_status_t exportEvent(const nixlTelemetryEvent &event) override; + +private: + std::ofstream file_; +}; + +#endif // _TELEMETRY_CSV_EXPORTER_H +``` + +### 2. Implement Your Exporter (`csv_exporter.cpp`) + +```cpp +#include "csv_exporter.h" +#include "common/nixl_log.h" + +nixlTelemetryCsvExporter::nixlTelemetryCsvExporter( + const nixlTelemetryExporterInitParams *init_params) + : nixlTelemetryExporter(init_params) { + + file_.open(init_params->outputPath, std::ios::out | std::ios::app); + if (!file_.is_open()) { + throw std::runtime_error("Failed to open CSV file: " + init_params->outputPath); + } + + // Write CSV header + file_ << "timestamp_us,category,event_name,value\n"; + NIXL_INFO << "CSV exporter initialized: " << init_params->outputPath; +} + +nixlTelemetryCsvExporter::~nixlTelemetryCsvExporter() { + if (file_.is_open()) { + file_.close(); + } + NIXL_INFO << "CSV exporter shut down"; +} + +nixl_status_t +nixlTelemetryCsvExporter::exportEvent(const nixlTelemetryEvent &event) { + if (!file_.is_open()) { + return NIXL_ERR_UNKNOWN; + } + + try { + file_ << event.timestampUs_ << "," + << static_cast(event.category_) << "," + << event.eventName_ << "," + << event.value_ << "\n"; + file_.flush(); + return NIXL_SUCCESS; + } + catch (const std::exception &e) { + NIXL_ERROR << "Failed to export event: " << e.what(); + return NIXL_ERR_UNKNOWN; + } +} +``` + +### 3. Create Plugin Interface (`csv_plugin.cpp`) + +```cpp +#include "csv_exporter.h" +#include "telemetry/telemetry_plugin.h" + +// Use the plugin creator template for minimal boilerplate +using csv_exporter_plugin_t = nixlTelemetryPluginCreator; + +// Plugin initialization function - must be extern "C" +extern "C" NIXL_TELEMETRY_PLUGIN_EXPORT nixlTelemetryPlugin * +nixl_telemetry_plugin_init() { + return csv_exporter_plugin_t::create( + NIXL_TELEMETRY_PLUGIN_API_VERSION, + "csv", // Plugin name + "1.0.0" // Plugin version + ); +} + +// Plugin cleanup function +extern "C" NIXL_TELEMETRY_PLUGIN_EXPORT void +nixl_telemetry_plugin_fini() { + // Add any global cleanup if needed +} +``` + +### 4. Build Configuration (`meson.build`) + +```meson +# CSV Exporter Plugin +csv_exporter_plugin = shared_library( + 'libtelemetry_exporter_csv', + 'csv_plugin.cpp', + 'csv_exporter.cpp', + include_directories: [nixl_inc_dirs, utils_inc_dirs], + dependencies: [nixl_infra, absl_log_dep], + install: true, + install_dir: get_option('libdir') / 'nixl' / 'telemetry_exporters', + name_prefix: '', +) +``` diff --git a/src/plugins/telemetry/meson.build b/src/plugins/telemetry/meson.build new file mode 100644 index 000000000..5a84014db --- /dev/null +++ b/src/plugins/telemetry/meson.build @@ -0,0 +1,16 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +subdir('prometheus') \ No newline at end of file diff --git a/src/plugins/telemetry/prometheus/README.md b/src/plugins/telemetry/prometheus/README.md new file mode 100644 index 000000000..a5f7735a6 --- /dev/null +++ b/src/plugins/telemetry/prometheus/README.md @@ -0,0 +1,76 @@ + + +# NIXL Prometheus Telemetry exporter plug-in + +This telemetry exporter plug-in exports NIXL telemetry events in Prometheus format, by_exposing an HTTP endpoint that can be scraped by Prometheus servers. + +## Dependencies + +The Prometheus exporter requires the prometheus-cpp library, which is included as a subproject: + +libcurl is not handled by prometheus-cpp libaray. need to install the libcurl package: + +```bash +# Ubuntu/Debian +sudo apt-get install libcurl4-openssl-dev +# RHEL/CentOS/Fedora +sudo dnf install libcurl-devel +``` + +## Configuration + +To enable the Prometheus plug-in, set the following environment variables: + +```bash +export NIXL_TELEMETRY_ENABLE="y" # Enable NIXL telemetry +export NIXL_TELEMETRY_EXPORTER="prometheus" # Sets which plug-in to select in format libtelemetry_exporter_${NIXL_TELEMETRY_EXPORTER}.so +export NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR="path/to/dir/with/.so/files" # Sets where to find plug-in libs +``` + +### Optional Configuration + +You can configure the HTTP bind address and port using the output path parameter: + +```bash +# Default bind address is 0.0.0.0:9090 +export NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH="x.x.x.x:" + +# The outputPath init parameter should be in format: ip_addr:port_num +# Example configurations: +# - Bind to all interfaces on port 9090: "0.0.0.0:9090" +# - Bind to localhost only: "127.0.0.1:9090" +# - Custom port: "0.0.0.0:8080" +``` + +### Transfer Metrics (Counters) +- `agent_tx_bytes` - Total bytes transmitted +- `agent_rx_bytes` - Total bytes received +- `agent_tx_requests_num` - Number of transmit requests +- `agent_rx_requests_num` - Number of receive requests + +### Performance Metrics (Gauges) +- `agent_xfer_time` - Transfer time in microseconds +- `agent_xfer_post_time` - Post time in microseconds + +### Memory Metrics (Gauges) +- `agent_memory_registered` - Amount of memory registered +- `agent_memory_deregistered` - Amount of memory deregistered + +### Backend Metrics (Dynamic Counters) +- Backend-specific events are dynamically created as counters with category label + diff --git a/src/plugins/telemetry/prometheus/meson.build b/src/plugins/telemetry/prometheus/meson.build new file mode 100644 index 000000000..1ace6f195 --- /dev/null +++ b/src/plugins/telemetry/prometheus/meson.build @@ -0,0 +1,59 @@ +# SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use CMake subproject directly +cmake = import('cmake') +prometheus_found = false + +# Try to get prometheus-cpp as a CMake subproject +# Pass CMake options directly to ensure they're applied +cmake_opts = cmake.subproject_options() +cmake_opts.add_cmake_defines({ + 'ENABLE_PULL': 'ON', + 'ENABLE_PUSH': 'OFF', + 'ENABLE_COMPRESSION': 'OFF', + 'ENABLE_TESTING': 'OFF', + 'USE_THIRDPARTY_LIBRARIES': 'ON', + 'BUILD_SHARED_LIBS': 'ON', + 'CMAKE_POSITION_INDEPENDENT_CODE': 'ON', + 'CMAKE_C_FLAGS': '-Wno-unused-but-set-variable', + 'CMAKE_CXX_FLAGS': '-Wno-unused-but-set-variable' +}) + +prometheus_sub = cmake.subproject('prometheus-cpp', required: false, options: cmake_opts) +if prometheus_sub.found() + # Get the CMake targets + prometheus_core_dep = prometheus_sub.dependency('core') + prometheus_pull_dep = prometheus_sub.dependency('pull') + prometheus_found = true +endif + +if prometheus_found + # Prometheus Exporter Plugin + prometheus_exporter_plugin = shared_library( + 'libtelemetry_exporter_prometheus', + 'prometheus_plugin.cpp', + 'prometheus_exporter.cpp', + include_directories: [nixl_inc_dirs, utils_inc_dirs], + dependencies: [nixl_infra, absl_log_dep, prometheus_core_dep, prometheus_pull_dep], + install: true, + install_dir: get_option('libdir') / 'nixl' / 'telemetry_exporters', + name_prefix: '', + ) + message('Building Prometheus telemetry exporter plugin') +else + warning('Prometheus C++ library not found. Prometheus telemetry exporter will not be built.') +endif + diff --git a/src/plugins/telemetry/prometheus/prometheus_exporter.cpp b/src/plugins/telemetry/prometheus/prometheus_exporter.cpp new file mode 100644 index 000000000..288c63e72 --- /dev/null +++ b/src/plugins/telemetry/prometheus/prometheus_exporter.cpp @@ -0,0 +1,196 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "prometheus_exporter.h" +#include "common/nixl_log.h" + +#include +#include +#include +#include +#include +#include + +const std::string PROMETHEUS_EXPORTER_DEFAULT_BIND_ADDRESS = "0.0.0.0:9090"; + +nixlTelemetryPrometheusExporter::nixlTelemetryPrometheusExporter( + const nixlTelemetryExporterInitParams &init_params) + : nixlTelemetryExporter(init_params), + registry_(std::make_shared()), + bind_address_(PROMETHEUS_EXPORTER_DEFAULT_BIND_ADDRESS) { + + if (!init_params.outputPath.empty()) { + // Validate format: ip_addr:port_num + const std::string path = init_params.outputPath; + size_t colon_pos = path.find(':'); + + if (colon_pos == std::string::npos || colon_pos == 0 || colon_pos == path.length() - 1) { + NIXL_WARN << "Invalid bind address format '" << path + << "', expected 'ip_addr:port_num'. Using default: " + << PROMETHEUS_EXPORTER_DEFAULT_BIND_ADDRESS; + } else { + const std::string ip_addr = path.substr(0, colon_pos); + const std::string port_str = path.substr(colon_pos + 1); + + // Validate port is numeric + try { + const int port = std::stoi(port_str); + if (port < 1 || port > 65535) { + NIXL_WARN << "Invalid port number " << port + << ", must be between 1-65535. Using default: " + << PROMETHEUS_EXPORTER_DEFAULT_BIND_ADDRESS; + } else { + bind_address_ = path; + } + } + catch (const std::exception &e) { + NIXL_WARN << "Invalid port in bind address '" << path + << "', expected numeric port. Using default: " + << PROMETHEUS_EXPORTER_DEFAULT_BIND_ADDRESS; + } + } + } + + try { + exposer_ = std::make_unique(bind_address_); + exposer_->RegisterCollectable(registry_); + + initializeMetrics(); + NIXL_INFO << "Prometheus exporter initialized on " << bind_address_; + } + catch (const std::exception &e) { + NIXL_ERROR << "Failed to initialize Prometheus exporter: " << e.what(); + } +} + +nixlTelemetryPrometheusExporter::~nixlTelemetryPrometheusExporter() { + NIXL_INFO << "Prometheus exporter shutting down"; +} + +// To make access cheaper we are creating static metrics with the labels already set +// Events are defined in the telemetry.cpp file +void +nixlTelemetryPrometheusExporter::initializeMetrics() { + auto &tx_bytes_counter = prometheus::BuildCounter() + .Name("agent_tx_bytes") + .Help("Number of bytes sent by the agent") + .Register(*registry_); + + auto &rx_bytes_counter = prometheus::BuildCounter() + .Name("agent_rx_bytes") + .Help("Number of bytes received by the agent") + .Register(*registry_); + + auto &tx_requests_counter = prometheus::BuildCounter() + .Name("agent_tx_requests_num") + .Help("Number of requests sent by the agent") + .Register(*registry_); + + auto &rx_requests_counter = prometheus::BuildCounter() + .Name("agent_rx_requests_num") + .Help("Number of requests received by the agent") + .Register(*registry_); + + counters_["agent_tx_bytes"] = &tx_bytes_counter.Add({{"category", "NIXL_TELEMETRY_TRANSFER"}}); + counters_["agent_rx_bytes"] = &rx_bytes_counter.Add({{"category", "NIXL_TELEMETRY_TRANSFER"}}); + counters_["agent_tx_requests_num"] = + &tx_requests_counter.Add({{"category", "NIXL_TELEMETRY_TRANSFER"}}); + counters_["agent_rx_requests_num"] = + &rx_requests_counter.Add({{"category", "NIXL_TELEMETRY_TRANSFER"}}); + + auto &xfer_time_gauge = prometheus::BuildGauge() + .Name("agent_xfer_time") + .Help("Start to Complete (per request)") + .Register(*registry_); + + auto &xfer_post_time_gauge = prometheus::BuildGauge() + .Name("agent_xfer_post_time") + .Help("Start to posting to Back-End (per request)") + .Register(*registry_); + + auto &memory_registered_gauge = prometheus::BuildGauge() + .Name("agent_memory_registered") + .Help("Memory registered") + .Register(*registry_); + + auto &memory_deregistered_gauge = prometheus::BuildGauge() + .Name("agent_memory_deregistered") + .Help("Memory deregistered") + .Register(*registry_); + + gauges_["agent_xfer_time"] = &xfer_time_gauge.Add({{"category", "NIXL_TELEMETRY_PERFORMANCE"}}); + gauges_["agent_xfer_post_time"] = + &xfer_post_time_gauge.Add({{"category", "NIXL_TELEMETRY_PERFORMANCE"}}); + gauges_["agent_memory_registered"] = + &memory_registered_gauge.Add({{"category", "NIXL_TELEMETRY_MEMORY"}}); + gauges_["agent_memory_deregistered"] = + &memory_deregistered_gauge.Add({{"category", "NIXL_TELEMETRY_MEMORY"}}); +} + +void +nixlTelemetryPrometheusExporter::createOrUpdateBackendEvent(const std::string &event_name, + uint64_t value) { + auto it = counters_.find(event_name); + if (it != counters_.end()) { + it->second->Increment(value); + return; + } + + auto &backend_counter = + prometheus::BuildCounter().Name(event_name).Help("Backend event").Register(*registry_); + counters_[event_name] = &backend_counter.Add({{"category", "NIXL_TELEMETRY_BACKEND"}}); + counters_[event_name]->Increment(value); +} + +nixl_status_t +nixlTelemetryPrometheusExporter::exportEvent(const nixlTelemetryEvent &event) { + try { + const std::string event_name(event.eventName_); + + switch (event.category_) { + case nixl_telemetry_category_t::NIXL_TELEMETRY_TRANSFER: { + auto it = counters_.find(event_name); + if (it != counters_.end()) { + it->second->Increment(event.value_); + } + break; + } + case nixl_telemetry_category_t::NIXL_TELEMETRY_PERFORMANCE: + case nixl_telemetry_category_t::NIXL_TELEMETRY_MEMORY: { + auto it = gauges_.find(event_name); + if (it != gauges_.end()) { + it->second->Set(static_cast(event.value_)); + } + break; + } + case nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND: + createOrUpdateBackendEvent(event_name, event.value_); + break; + case nixl_telemetry_category_t::NIXL_TELEMETRY_CONNECTION: + case nixl_telemetry_category_t::NIXL_TELEMETRY_ERROR: + case nixl_telemetry_category_t::NIXL_TELEMETRY_SYSTEM: + case nixl_telemetry_category_t::NIXL_TELEMETRY_CUSTOM: + default: + break; + } + + return NIXL_SUCCESS; + } + catch (const std::exception &e) { + NIXL_ERROR << "Failed to export telemetry event: " << e.what(); + return NIXL_ERR_UNKNOWN; + } +} diff --git a/src/plugins/telemetry/prometheus/prometheus_exporter.h b/src/plugins/telemetry/prometheus/prometheus_exporter.h new file mode 100644 index 000000000..e28e3da0e --- /dev/null +++ b/src/plugins/telemetry/prometheus/prometheus_exporter.h @@ -0,0 +1,75 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _TELEMETRY_PROMETHEUS_EXPORTER_H +#define _TELEMETRY_PROMETHEUS_EXPORTER_H + +#include "telemetry/telemetry_exporter.h" +#include "telemetry_event.h" +#include "nixl_types.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * @class nixlTelemetryPrometheusExporter + * @brief Prometheus-based telemetry exporter implementation + * + * This class implements the telemetry exporter interface to export + * telemetry events to a Prometheus-compatible format using prometheus-cpp. + * It exposes metrics via an HTTP endpoint that can be scraped by Prometheus. + */ +class nixlTelemetryPrometheusExporter : public nixlTelemetryExporter { +public: + /** + * @brief Constructor using init params (plugin-compatible) + * @param init_params Initialization parameters + */ + explicit nixlTelemetryPrometheusExporter(const nixlTelemetryExporterInitParams &init_params); + + /** + * @brief Destructor + */ + ~nixlTelemetryPrometheusExporter() override; + + nixl_status_t + exportEvent(const nixlTelemetryEvent &event) override; + +private: + // Prometheus components + std::shared_ptr registry_; + std::unique_ptr exposer_; + std::string bind_address_; + + // Maps to track created metrics by event name + std::unordered_map counters_; + std::unordered_map gauges_; + + // Helper methods + void + initializeMetrics(); + void + createOrUpdateBackendEvent(const std::string &event_name, uint64_t value); +}; + +#endif // _TELEMETRY_PROMETHEUS_EXPORTER_H diff --git a/src/plugins/telemetry/prometheus/prometheus_plugin.cpp b/src/plugins/telemetry/prometheus/prometheus_plugin.cpp new file mode 100644 index 000000000..60d7aa803 --- /dev/null +++ b/src/plugins/telemetry/prometheus/prometheus_plugin.cpp @@ -0,0 +1,36 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "prometheus_exporter.h" +#include "telemetry/telemetry_plugin.h" +#include "telemetry/telemetry_exporter.h" + +// Plugin type alias for convenience +using prometheus_exporter_plugin_t = nixlTelemetryPluginCreator; + +// Plugin initialization function - must be extern "C" for dynamic loading +extern "C" NIXL_TELEMETRY_PLUGIN_EXPORT nixlTelemetryPlugin * +nixl_telemetry_plugin_init() { + return prometheus_exporter_plugin_t::create( + NIXL_TELEMETRY_PLUGIN_API_VERSION, "prometheus", "1.0.0"); +} + +// Plugin cleanup function +extern "C" NIXL_TELEMETRY_PLUGIN_EXPORT void +nixl_telemetry_plugin_fini() { + // Nothing to clean up for prometheus exporter +} diff --git a/subprojects/prometheus-cpp.wrap b/subprojects/prometheus-cpp.wrap new file mode 100644 index 000000000..6b5ac2b33 --- /dev/null +++ b/subprojects/prometheus-cpp.wrap @@ -0,0 +1,6 @@ +[wrap-git] +url = https://github.com/jupp0r/prometheus-cpp.git +revision = v1.3.0 +depth = 1 +clone-recursive = true +method = cmake