Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/api/cpp/telemetry/telemetry_exporter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A general note, please add some .md file that explains how to write a custom telemetry plug-in. What is the API and what templates can be used.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H
#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H

#include "nixl_types.h"
#include "telemetry_event.h"
#include "common/cyclic_buffer.h"

#include <string>
#include <vector>
#include <fstream>
Comment on lines +20 to +26
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please delete the unused header files.


inline constexpr char TELEMETRY_EXPORTER_VAR[] = "NIXL_TELEMETRY_EXPORTER";
inline constexpr char TELEMETRY_EXPORTER_PLUGIN_DIR_VAR[] = "NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR";
inline constexpr char TELEMETRY_EXPORTER_OUTPUT_PATH_VAR[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";
Comment on lines +28 to +30
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
inline constexpr char TELEMETRY_EXPORTER_VAR[] = "NIXL_TELEMETRY_EXPORTER";
inline constexpr char TELEMETRY_EXPORTER_PLUGIN_DIR_VAR[] = "NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR";
inline constexpr char TELEMETRY_EXPORTER_OUTPUT_PATH_VAR[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";
inline constexpr char telemetry_exporter_var[] = "NIXL_TELEMETRY_EXPORTER";
inline constexpr char telemetry_exporter_plugin_dir_var[] = "NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR";
inline constexpr char telemetry_exporter_output_path_var[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";

https://isocpp.github.io/CppCoreGuidelines/CppCoreGuidelines#es9-avoid-all_caps-names


/**
* @struct nixlTelemetryExporterInitParams
* @brief Initialization parameters for telemetry exporters
*/
struct nixlTelemetryExporterInitParams {
std::string outputPath; // Output path (file path, URL, etc.)
const size_t eventLimit; // Maximum number of events to buffer
};

/**
* @class nixlTelemetryExporter
* @brief Abstract base class for telemetry exporters
*
* This class defines the interface that all telemetry exporters must implement.
* It provides the core functionality for reading telemetry events and exporting
* them to various destinations.
*/
class nixlTelemetryExporter {
protected:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const size_t eventLimit_;

public:
explicit nixlTelemetryExporter(const nixlTelemetryExporterInitParams &init_params)
: eventLimit_(init_params.eventLimit) {};
nixlTelemetryExporter(nixlTelemetryExporter &&) = delete;
nixlTelemetryExporter(const nixlTelemetryExporter &) = delete;

void
operator=(nixlTelemetryExporter &&) = delete;
void
operator=(const nixlTelemetryExporter &) = delete;

virtual ~nixlTelemetryExporter() = default;

[[nodiscard]] size_t
getEventLimit() const noexcept {
return eventLimit_;
}

virtual nixl_status_t
exportEvent(const nixlTelemetryEvent &event) = 0;
};

#endif // _TELEMETRY_EXPORTER_H
105 changes: 105 additions & 0 deletions src/api/cpp/telemetry/telemetry_plugin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
* SPDX-License-Identifier: Apache-2.0
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H
#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H

#include "telemetry/telemetry_exporter.h"
#include <string_view>
#include <memory>

enum class NixlTelemetryPluginApiVersion : unsigned int { V1 = 1 };
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


inline constexpr NixlTelemetryPluginApiVersion NIXL_TELEMETRY_PLUGIN_API_VERSION =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please avoid ALL_CAPS names.

NixlTelemetryPluginApiVersion::V1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to put v1 into the name of all types (and global functions, if any) that are under versioning to make it clearer which parts exactly are in fact versioned?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is another "just in case" placeholder actually for beta version. I'd consider removing API versioning at all


// Type alias for exporter creation function
using ExporterCreatorFn =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using ExporterCreatorFn =
using exporter_creator_fn_t =

https://github.com/ai-dynamo/nixl/blob/main/docs/CodeStyle.md

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
using ExporterCreatorFn =
using exporter_creator_fn_t =

https://github.com/ai-dynamo/nixl/blob/main/docs/CodeStyle.md

std::unique_ptr<nixlTelemetryExporter> (*)(const nixlTelemetryExporterInitParams &init_params);

class nixlTelemetryPlugin {
private:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the public before private order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the public before private order.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the public before private order.

std::string_view name_;
std::string_view version_;

public:
NixlTelemetryPluginApiVersion api_version;
ExporterCreatorFn create_exporter;

nixlTelemetryPlugin(NixlTelemetryPluginApiVersion version,
std::string_view name,
std::string_view ver,
ExporterCreatorFn create) noexcept
: name_(name),
version_(ver),
api_version(version),
create_exporter(create) {}

[[nodiscard]] std::string_view
getName() const noexcept {
return name_;
}

[[nodiscard]] std::string_view
getVersion() const noexcept {
return version_;
}
};

// Macro to define exported C functions for the plugin
#define NIXL_TELEMETRY_PLUGIN_EXPORT __attribute__((visibility("default")))

// Template for creating backend plugins with minimal boilerplate
template<typename ExporterType> class nixlTelemetryPluginCreator {
public:
static nixlTelemetryPlugin *
create(NixlTelemetryPluginApiVersion api_version,
std::string_view name,
std::string_view version) {
static nixlTelemetryPlugin plugin_instance(api_version, name, version, createExporter);

return &plugin_instance;
}

private:
static std::unique_ptr<nixlTelemetryExporter>
createExporter(const nixlTelemetryExporterInitParams &init_params) {
try {
return std::make_unique<ExporterType>(init_params);
}
catch (const std::exception &e) {
NIXL_ERROR << "Failed to create exporter: " << e.what();
return nullptr;
}
}
};

// Plugin must implement these functions for dynamic loading
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a lot of code duplication. telemetry_plugin.h looks like backend_plugin.h, and telemetry_plugin_manager.h looks like nixl_plugin_manager.h.

You can use the existing infrastructure by reorganizing the class structure (inheritance, composition, ...).
To better understand which path to choose, it is useful to have an example of using similar telemetry plugins. In other words, this PR should contain tests that use all the new functionality.

At the moment, I'm not entirely sure if telemetry needs to replicate the infrastructure that is used for NIXL plugins.

// Note: extern "C" is required for dynamic loading to avoid C++ name mangling
extern "C" {
// Initialize the plugin
NIXL_TELEMETRY_PLUGIN_EXPORT
nixlTelemetryPlugin *
nixl_telemetry_plugin_init();

// Cleanup the plugin
NIXL_TELEMETRY_PLUGIN_EXPORT
void
nixl_telemetry_plugin_fini();
}

#endif // _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H
1 change: 1 addition & 0 deletions src/core/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ nixl_lib = library('nixl',
'nixl_plugin_manager.cpp',
'nixl_listener.cpp',
'telemetry.cpp',
'telemetry_plugin_manager.cpp',
include_directories: [ nixl_inc_dirs, utils_inc_dirs ],
link_args: ['-lstdc++fs'],
dependencies: nixl_lib_deps,
Expand Down
13 changes: 12 additions & 1 deletion src/core/nixl_agent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg
memorySection = new nixlLocalSection();
const char *telemetry_env_val = std::getenv(TELEMETRY_ENABLED_VAR);
const char *telemetry_env_dir = std::getenv(TELEMETRY_DIR_VAR);
const char *telemetry_env_exporter = std::getenv(TELEMETRY_EXPORTER_VAR);
const char *telemetry_env_exporter_plugin_dir = std::getenv(TELEMETRY_EXPORTER_PLUGIN_DIR_VAR);

if (telemetry_env_val != nullptr) {
if (!strcasecmp(telemetry_env_val, "y") || !strcasecmp(telemetry_env_val, "1") ||
Expand All @@ -136,8 +138,17 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg
std::string telemetry_file = std::string(telemetry_env_dir) + "/" + name;
telemetry_ = std::make_unique<nixlTelemetry>(telemetry_file, backendEngines);
NIXL_DEBUG << "NIXL telemetry is enabled with output file: " << telemetry_file;
} else if (telemetry_env_exporter_plugin_dir != nullptr &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The nixlAgentData ctor is overloaded. Refactoring is necessary. Please move telemetry initialization into a separate private function.

telemetry_env_exporter != nullptr) {
std::string telemetry_file =
std::string(telemetry_env_exporter_plugin_dir) + "/" + telemetry_env_exporter;
telemetry_ = std::make_unique<nixlTelemetry>(telemetry_file, backendEngines);
NIXL_DEBUG << "NIXL telemetry is enabled with exporter type: "
<< telemetry_env_exporter
<< " from: " << telemetry_env_exporter_plugin_dir;
} else {
NIXL_DEBUG << "NIXL telemetry is enabled without an output file";
NIXL_DEBUG << "NIXL telemetry is enabled without an output file or exporter plugin"
"directory and exporter type";
}
} else if (cfg.captureTelemetry) {
telemetryEnabled = true;
Expand Down
80 changes: 75 additions & 5 deletions src/core/telemetry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/nixl_log.h"
#include "telemetry.h"
#include "telemetry_event.h"
#include "telemetry_plugin_manager.h"
#include "util.h"

using namespace std::chrono_literals;
Expand Down Expand Up @@ -75,23 +76,92 @@ nixlTelemetry::initializeTelemetry() {
throw std::invalid_argument("Telemetry buffer size cannot be 0");
}

NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
<< " with size: " << buffer_size;

buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
full_file_path, true, TELEMETRY_VERSION, buffer_size);
// Check if exporter is enabled and which type to use
const char *exporter_type_env = std::getenv(TELEMETRY_EXPORTER_VAR);
if (exporter_type_env) {
// Use plugin-based exporter
std::string exporter_type = exporter_type_env;

NIXL_INFO << "Telemetry exporter enabled, type: " << exporter_type;

// Prepare initialization parameters for the exporter
const nixlTelemetryExporterInitParams init_params = {
.outputPath = std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) ?
std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) :
full_file_path.string(),
.eventLimit = buffer_size};

// Create exporter through plugin manager
auto &exporter_manager = nixlTelemetryPluginManager::getInstance();
exporter_ = exporter_manager.createExporter(exporter_type, init_params);

if (!exporter_) {
NIXL_WARN << "Failed to create telemetry exporter '" << exporter_type
<< "', falling back to buffer mode";
// Fall back to buffer mode
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
full_file_path, true, TELEMETRY_VERSION, buffer_size);
writeTask_.callback_ = [this]() { return writeEventHelper(); };
} else {
writeTask_.callback_ = [this]() { return telemetryExporterHelper(); };
}
} else {
// Use buffer mode (default)
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
<< " with size: " << buffer_size;

buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
full_file_path, true, TELEMETRY_VERSION, buffer_size);

writeTask_.callback_ = [this]() { return writeEventHelper(); };
}

auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ?
std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) :
DEFAULT_TELEMETRY_RUN_INTERVAL;

// Update write task interval and start it
writeTask_.callback_ = [this]() { return writeEventHelper(); };

writeTask_.interval_ = run_interval;
writeTask_.enabled_ = true;
registerPeriodicTask(writeTask_);
}

bool
nixlTelemetry::telemetryExporterHelper() {
std::vector<nixlTelemetryEvent> next_queue;
next_queue.reserve(exporter_->getEventLimit());
{
std::lock_guard<std::mutex> lock(mutex_);
events_.swap(next_queue);
}
for (auto &event : next_queue) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

// if full, ignore
exporter_->exportEvent(event);
}
// collect all events and sort them by timestamp
std::vector<nixlTelemetryEvent> all_events;
for (auto &backend : backendMap_) {
auto backend_events = backend.second->getTelemetryEvents();
for (auto &event : backend_events) {
// don't trust enum value coming from backend,
// as it might be different from the one in agent
event.category_ = nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND;
all_events.push_back(event);
}
}
std::sort(all_events.begin(),
all_events.end(),
[](const nixlTelemetryEvent &a, const nixlTelemetryEvent &b) {
return a.timestampUs_ < b.timestampUs_;
});
for (auto &event : all_events) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

const

exporter_->exportEvent(event);
}
return true;
}

bool
nixlTelemetry::writeEventHelper() {
std::vector<nixlTelemetryEvent> next_queue;
Expand Down
4 changes: 4 additions & 0 deletions src/core/telemetry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "common/cyclic_buffer.h"
#include "telemetry_event.h"
#include "telemetry/telemetry_exporter.h"
#include "mem_section.h"
#include "nixl_types.h"

Expand Down Expand Up @@ -81,8 +82,11 @@ class nixlTelemetry {
updateData(const std::string &event_name, nixl_telemetry_category_t category, uint64_t value);
bool
writeEventHelper();
bool
telemetryExporterHelper();
std::unique_ptr<sharedRingBuffer<nixlTelemetryEvent>> buffer_;
std::vector<nixlTelemetryEvent> events_;
std::unique_ptr<nixlTelemetryExporter> exporter_;
std::mutex mutex_;
asio::thread_pool pool_;
periodicTask writeTask_;
Expand Down
Loading