Skip to content

Commit ade7dda

Browse files
committed
Telemetry exporter plug-in infra
DRAFT: Prometheus exporter implemetation Signed-off-by: Aleksandr Bilkovskii <[email protected]>
1 parent 0c48036 commit ade7dda

17 files changed

+1388
-6
lines changed
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H
18+
#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_EXPORTER_H
19+
20+
#include "nixl_types.h"
21+
#include "telemetry_event.h"
22+
#include "common/cyclic_buffer.h"
23+
24+
#include <string>
25+
#include <vector>
26+
#include <fstream>
27+
28+
inline constexpr char TELEMETRY_EXPORTER_VAR[] = "NIXL_TELEMETRY_EXPORTER";
29+
inline constexpr char TELEMETRY_EXPORTER_PLUGIN_DIR_VAR[] = "NIXL_TELEMETRY_EXPORTER_PLUGIN_DIR";
30+
inline constexpr char TELEMETRY_EXPORTER_OUTPUT_PATH_VAR[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";
31+
32+
/**
33+
* @struct nixlTelemetryExporterInitParams
34+
* @brief Initialization parameters for telemetry exporters
35+
*/
36+
struct nixlTelemetryExporterInitParams {
37+
std::string outputPath; // Output path (file path, URL, etc.)
38+
const size_t eventLimit; // Maximum number of events to buffer
39+
};
40+
41+
/**
42+
* @class nixlTelemetryExporter
43+
* @brief Abstract base class for telemetry exporters
44+
*
45+
* This class defines the interface that all telemetry exporters must implement.
46+
* It provides the core functionality for reading telemetry events and exporting
47+
* them to various destinations.
48+
*/
49+
class nixlTelemetryExporter {
50+
protected:
51+
const size_t eventLimit_;
52+
53+
public:
54+
explicit nixlTelemetryExporter(const nixlTelemetryExporterInitParams &init_params)
55+
: eventLimit_(init_params.eventLimit) {};
56+
nixlTelemetryExporter(nixlTelemetryExporter &&) = delete;
57+
nixlTelemetryExporter(const nixlTelemetryExporter &) = delete;
58+
59+
void
60+
operator=(nixlTelemetryExporter &&) = delete;
61+
void
62+
operator=(const nixlTelemetryExporter &) = delete;
63+
64+
virtual ~nixlTelemetryExporter() = default;
65+
66+
[[nodiscard]] size_t
67+
getEventLimit() const noexcept {
68+
return eventLimit_;
69+
}
70+
71+
virtual nixl_status_t
72+
exportEvent(const nixlTelemetryEvent &event) = 0;
73+
};
74+
75+
#endif // _TELEMETRY_EXPORTER_H
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#ifndef _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H
19+
#define _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H
20+
21+
#include "telemetry/telemetry_exporter.h"
22+
#include <string_view>
23+
#include <memory>
24+
25+
enum class NixlTelemetryPluginApiVersion : unsigned int { V1 = 1 };
26+
27+
inline constexpr NixlTelemetryPluginApiVersion NIXL_TELEMETRY_PLUGIN_API_VERSION =
28+
NixlTelemetryPluginApiVersion::V1;
29+
30+
// Type alias for exporter creation function
31+
using ExporterCreatorFn =
32+
std::unique_ptr<nixlTelemetryExporter> (*)(const nixlTelemetryExporterInitParams &init_params);
33+
34+
class nixlTelemetryPlugin {
35+
private:
36+
std::string_view name_;
37+
std::string_view version_;
38+
39+
public:
40+
NixlTelemetryPluginApiVersion api_version;
41+
ExporterCreatorFn create_exporter;
42+
43+
nixlTelemetryPlugin(NixlTelemetryPluginApiVersion version,
44+
std::string_view name,
45+
std::string_view ver,
46+
ExporterCreatorFn create) noexcept
47+
: name_(name),
48+
version_(ver),
49+
api_version(version),
50+
create_exporter(create) {}
51+
52+
[[nodiscard]] std::string_view
53+
getName() const noexcept {
54+
return name_;
55+
}
56+
57+
[[nodiscard]] std::string_view
58+
getVersion() const noexcept {
59+
return version_;
60+
}
61+
};
62+
63+
// Macro to define exported C functions for the plugin
64+
#define NIXL_TELEMETRY_PLUGIN_EXPORT __attribute__((visibility("default")))
65+
66+
// Template for creating backend plugins with minimal boilerplate
67+
template<typename ExporterType> class nixlTelemetryPluginCreator {
68+
public:
69+
static nixlTelemetryPlugin *
70+
create(NixlTelemetryPluginApiVersion api_version,
71+
std::string_view name,
72+
std::string_view version) {
73+
static nixlTelemetryPlugin plugin_instance(api_version, name, version, createExporter);
74+
75+
return &plugin_instance;
76+
}
77+
78+
private:
79+
static std::unique_ptr<nixlTelemetryExporter>
80+
createExporter(const nixlTelemetryExporterInitParams &init_params) {
81+
try {
82+
return std::make_unique<ExporterType>(init_params);
83+
}
84+
catch (const std::exception &e) {
85+
NIXL_ERROR << "Failed to create exporter: " << e.what();
86+
return nullptr;
87+
}
88+
}
89+
};
90+
91+
// Plugin must implement these functions for dynamic loading
92+
// Note: extern "C" is required for dynamic loading to avoid C++ name mangling
93+
extern "C" {
94+
// Initialize the plugin
95+
NIXL_TELEMETRY_PLUGIN_EXPORT
96+
nixlTelemetryPlugin *
97+
nixl_telemetry_plugin_init();
98+
99+
// Cleanup the plugin
100+
NIXL_TELEMETRY_PLUGIN_EXPORT
101+
void
102+
nixl_telemetry_plugin_fini();
103+
}
104+
105+
#endif // _NIXL_SRC_API_CPP_TELEMETRY_TELEMETRY_PLUGIN_H

src/core/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ nixl_lib = library('nixl',
6262
'nixl_plugin_manager.cpp',
6363
'nixl_listener.cpp',
6464
'telemetry.cpp',
65+
'telemetry_plugin_manager.cpp',
6566
include_directories: [ nixl_inc_dirs, utils_inc_dirs ],
6667
link_args: ['-lstdc++fs'],
6768
dependencies: nixl_lib_deps,

src/core/nixl_agent.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg
127127
memorySection = new nixlLocalSection();
128128
const char *telemetry_env_val = std::getenv(TELEMETRY_ENABLED_VAR);
129129
const char *telemetry_env_dir = std::getenv(TELEMETRY_DIR_VAR);
130+
const char *telemetry_env_exporter = std::getenv(TELEMETRY_EXPORTER_VAR);
131+
const char *telemetry_env_exporter_plugin_dir = std::getenv(TELEMETRY_EXPORTER_PLUGIN_DIR_VAR);
130132

131133
if (telemetry_env_val != nullptr) {
132134
if (!strcasecmp(telemetry_env_val, "y") || !strcasecmp(telemetry_env_val, "1") ||
@@ -136,8 +138,17 @@ nixlAgentData::nixlAgentData(const std::string &name, const nixlAgentConfig &cfg
136138
std::string telemetry_file = std::string(telemetry_env_dir) + "/" + name;
137139
telemetry_ = std::make_unique<nixlTelemetry>(telemetry_file, backendEngines);
138140
NIXL_DEBUG << "NIXL telemetry is enabled with output file: " << telemetry_file;
141+
} else if (telemetry_env_exporter_plugin_dir != nullptr &&
142+
telemetry_env_exporter != nullptr) {
143+
std::string telemetry_file =
144+
std::string(telemetry_env_exporter_plugin_dir) + "/" + telemetry_env_exporter;
145+
telemetry_ = std::make_unique<nixlTelemetry>(telemetry_file, backendEngines);
146+
NIXL_DEBUG << "NIXL telemetry is enabled with exporter type: "
147+
<< telemetry_env_exporter
148+
<< " from: " << telemetry_env_exporter_plugin_dir;
139149
} else {
140-
NIXL_DEBUG << "NIXL telemetry is enabled without an output file";
150+
NIXL_DEBUG << "NIXL telemetry is enabled without an output file or exporter plugin"
151+
"directory and exporter type";
141152
}
142153
} else if (cfg.captureTelemetry) {
143154
telemetryEnabled = true;

src/core/telemetry.cpp

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/nixl_log.h"
2727
#include "telemetry.h"
2828
#include "telemetry_event.h"
29+
#include "telemetry_plugin_manager.h"
2930
#include "util.h"
3031

3132
using namespace std::chrono_literals;
@@ -75,23 +76,92 @@ nixlTelemetry::initializeTelemetry() {
7576
throw std::invalid_argument("Telemetry buffer size cannot be 0");
7677
}
7778

78-
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
79-
<< " with size: " << buffer_size;
8079

81-
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
82-
full_file_path, true, TELEMETRY_VERSION, buffer_size);
80+
// Check if exporter is enabled and which type to use
81+
const char *exporter_type_env = std::getenv(TELEMETRY_EXPORTER_VAR);
82+
if (exporter_type_env) {
83+
// Use plugin-based exporter
84+
std::string exporter_type = exporter_type_env;
85+
86+
NIXL_INFO << "Telemetry exporter enabled, type: " << exporter_type;
87+
88+
// Prepare initialization parameters for the exporter
89+
const nixlTelemetryExporterInitParams init_params = {
90+
.outputPath = std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) ?
91+
std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) :
92+
full_file_path.string(),
93+
.eventLimit = buffer_size};
94+
95+
// Create exporter through plugin manager
96+
auto &exporter_manager = nixlTelemetryPluginManager::getInstance();
97+
exporter_ = exporter_manager.createExporter(exporter_type, init_params);
98+
99+
if (!exporter_) {
100+
NIXL_WARN << "Failed to create telemetry exporter '" << exporter_type
101+
<< "', falling back to buffer mode";
102+
// Fall back to buffer mode
103+
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
104+
full_file_path, true, TELEMETRY_VERSION, buffer_size);
105+
writeTask_.callback_ = [this]() { return writeEventHelper(); };
106+
} else {
107+
writeTask_.callback_ = [this]() { return telemetryExporterHelper(); };
108+
}
109+
} else {
110+
// Use buffer mode (default)
111+
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
112+
<< " with size: " << buffer_size;
113+
114+
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
115+
full_file_path, true, TELEMETRY_VERSION, buffer_size);
116+
117+
writeTask_.callback_ = [this]() { return writeEventHelper(); };
118+
}
83119

84120
auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ?
85121
std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) :
86122
DEFAULT_TELEMETRY_RUN_INTERVAL;
87123

88124
// Update write task interval and start it
89-
writeTask_.callback_ = [this]() { return writeEventHelper(); };
125+
90126
writeTask_.interval_ = run_interval;
91127
writeTask_.enabled_ = true;
92128
registerPeriodicTask(writeTask_);
93129
}
94130

131+
bool
132+
nixlTelemetry::telemetryExporterHelper() {
133+
std::vector<nixlTelemetryEvent> next_queue;
134+
next_queue.reserve(exporter_->getEventLimit());
135+
{
136+
std::lock_guard<std::mutex> lock(mutex_);
137+
events_.swap(next_queue);
138+
}
139+
for (auto &event : next_queue) {
140+
// if full, ignore
141+
exporter_->exportEvent(event);
142+
}
143+
// collect all events and sort them by timestamp
144+
std::vector<nixlTelemetryEvent> all_events;
145+
for (auto &backend : backendMap_) {
146+
auto backend_events = backend.second->getTelemetryEvents();
147+
for (auto &event : backend_events) {
148+
// don't trust enum value coming from backend,
149+
// as it might be different from the one in agent
150+
event.category_ = nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND;
151+
all_events.push_back(event);
152+
}
153+
}
154+
std::sort(all_events.begin(),
155+
all_events.end(),
156+
[](const nixlTelemetryEvent &a, const nixlTelemetryEvent &b) {
157+
return a.timestampUs_ < b.timestampUs_;
158+
});
159+
for (auto &event : all_events) {
160+
exporter_->exportEvent(event);
161+
}
162+
return true;
163+
}
164+
95165
bool
96166
nixlTelemetry::writeEventHelper() {
97167
std::vector<nixlTelemetryEvent> next_queue;

src/core/telemetry.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "common/cyclic_buffer.h"
2121
#include "telemetry_event.h"
22+
#include "telemetry/telemetry_exporter.h"
2223
#include "mem_section.h"
2324
#include "nixl_types.h"
2425

@@ -81,8 +82,11 @@ class nixlTelemetry {
8182
updateData(const std::string &event_name, nixl_telemetry_category_t category, uint64_t value);
8283
bool
8384
writeEventHelper();
85+
bool
86+
telemetryExporterHelper();
8487
std::unique_ptr<sharedRingBuffer<nixlTelemetryEvent>> buffer_;
8588
std::vector<nixlTelemetryEvent> events_;
89+
std::unique_ptr<nixlTelemetryExporter> exporter_;
8690
std::mutex mutex_;
8791
asio::thread_pool pool_;
8892
periodicTask writeTask_;

0 commit comments

Comments
 (0)