Skip to content

Commit d0b114f

Browse files
committed
Telemetry exporter plug-in infra
DRAFT: Prometheus exporter implemetation Signed-off-by: Aleksandr Bilkovskii <[email protected]>
1 parent 0c48036 commit d0b114f

File tree

15 files changed

+1191
-5
lines changed

15 files changed

+1191
-5
lines changed
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#ifndef _TELEMETRY_EXPORTER_H
18+
#define _TELEMETRY_EXPORTER_H
19+
20+
#include "nixl_types.h"
21+
#include "telemetry_event.h"
22+
#include "common/cyclic_buffer.h"
23+
24+
#include <string>
25+
#include <vector>
26+
#include <fstream>
27+
28+
constexpr char TELEMETRY_EXPORTER_VAR[] = "NIXL_TELEMETRY_EXPORTER";
29+
constexpr char TELEMETRY_EXPORTER_OUTPUT_PATH_VAR[] = "NIXL_TELEMETRY_EXPORTER_OUTPUT_PATH";
30+
31+
/**
32+
* @struct nixlTelemetryExporterInitParams
33+
* @brief Initialization parameters for telemetry exporters
34+
*/
35+
struct nixlTelemetryExporterInitParams {
36+
std::string outputPath; // Output path (file path, URL, etc.)
37+
uint32_t eventLimit; // Maximum number of events to buffer
38+
};
39+
40+
/**
41+
* @class nixlTelemetryExporter
42+
* @brief Abstract base class for telemetry exporters
43+
*
44+
* This class defines the interface that all telemetry exporters must implement.
45+
* It provides the core functionality for reading telemetry events and exporting
46+
* them to various destinations.
47+
*/
48+
class nixlTelemetryExporter {
49+
protected:
50+
uint32_t eventLimit_;
51+
52+
public:
53+
explicit nixlTelemetryExporter(const nixlTelemetryExporterInitParams *init_params)
54+
: eventLimit_(init_params->eventLimit) {};
55+
nixlTelemetryExporter(nixlTelemetryExporter &&) = delete;
56+
nixlTelemetryExporter(const nixlTelemetryExporter &) = delete;
57+
58+
void
59+
operator=(nixlTelemetryExporter &&) = delete;
60+
void
61+
operator=(const nixlTelemetryExporter &) = delete;
62+
63+
virtual ~nixlTelemetryExporter() = default;
64+
65+
uint32_t
66+
getEventLimit() const noexcept {
67+
return eventLimit_;
68+
}
69+
70+
virtual nixl_status_t
71+
exportEvent(const nixlTelemetryEvent &event) = 0;
72+
};
73+
74+
#endif // _TELEMETRY_EXPORTER_H
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* SPDX-FileCopyrightText: Copyright (c) 2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*
5+
* Licensed under the Apache License, Version 2.0 (the "License");
6+
* you may not use this file except in compliance with the License.
7+
* You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
#ifndef __TELEMETRY_PLUGIN_H
19+
#define __TELEMETRY_PLUGIN_H
20+
21+
#include "telemetry/telemetry_exporter.h"
22+
// #include "common/nixl_log.h"
23+
24+
// Forward declarations for special engine types
25+
// class nixlUcxEngine;
26+
27+
// Define the plugin API version
28+
#define NIXL_TELEMETRY_PLUGIN_API_VERSION 1
29+
30+
// Define the plugin interface class
31+
class nixlTelemetryPlugin {
32+
public:
33+
int api_version;
34+
35+
// Function pointer for creating a new exporter instance
36+
nixlTelemetryExporter *(*create_exporter)(const nixlTelemetryExporterInitParams *init_params);
37+
38+
// Function pointer for destroying an exporter instance
39+
void (*destroy_exporter)(nixlTelemetryExporter *exporter);
40+
41+
// Function to get the plugin name
42+
const char *(*get_plugin_name)();
43+
44+
// Function to get the plugin version
45+
const char *(*get_plugin_version)();
46+
};
47+
48+
// Macro to define exported C functions for the plugin
49+
#define NIXL_TELEMETRY_PLUGIN_EXPORT __attribute__((visibility("default")))
50+
51+
// Template for creating backend plugins with minimal boilerplate
52+
template<typename ExporterType> class nixlTelemetryPluginCreator {
53+
public:
54+
static nixlTelemetryPlugin *
55+
create(int api_version, const char *name, const char *version) {
56+
57+
static const char *plugin_name = name;
58+
static const char *plugin_version = version;
59+
60+
static nixlTelemetryPlugin plugin_instance = {api_version,
61+
createExporter,
62+
destroyExporter,
63+
[]() { return plugin_name; },
64+
[]() { return plugin_version; }};
65+
66+
return &plugin_instance;
67+
}
68+
69+
private:
70+
[[nodiscard]] static nixlTelemetryExporter *
71+
createExporter(const nixlTelemetryExporterInitParams *init_params) {
72+
try {
73+
return new ExporterType(init_params);
74+
}
75+
catch (const std::exception &e) {
76+
NIXL_ERROR << "Failed to create exporter: " << e.what();
77+
return nullptr;
78+
}
79+
}
80+
81+
static void
82+
destroyExporter(nixlTelemetryExporter *exporter) {
83+
delete exporter;
84+
}
85+
};
86+
87+
// Plugin must implement these functions for dynamic loading
88+
// Note: extern "C" is required for dynamic loading to avoid C++ name mangling
89+
extern "C" {
90+
// Initialize the plugin
91+
NIXL_TELEMETRY_PLUGIN_EXPORT
92+
nixlTelemetryPlugin *
93+
nixl_telemetry_plugin_init();
94+
95+
// Cleanup the plugin
96+
NIXL_TELEMETRY_PLUGIN_EXPORT
97+
void
98+
nixl_telemetry_plugin_fini();
99+
}
100+
101+
#endif // __TELEMETRY_PLUGIN_H

src/core/meson.build

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ nixl_lib = library('nixl',
6262
'nixl_plugin_manager.cpp',
6363
'nixl_listener.cpp',
6464
'telemetry.cpp',
65+
'telemetry_plugin_manager.cpp',
6566
include_directories: [ nixl_inc_dirs, utils_inc_dirs ],
6667
link_args: ['-lstdc++fs'],
6768
dependencies: nixl_lib_deps,

src/core/telemetry.cpp

Lines changed: 75 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
#include "common/nixl_log.h"
2727
#include "telemetry.h"
2828
#include "telemetry_event.h"
29+
#include "telemetry_plugin_manager.h"
2930
#include "util.h"
3031

3132
using namespace std::chrono_literals;
@@ -75,23 +76,92 @@ nixlTelemetry::initializeTelemetry() {
7576
throw std::invalid_argument("Telemetry buffer size cannot be 0");
7677
}
7778

78-
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
79-
<< " with size: " << buffer_size;
8079

81-
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
82-
full_file_path, true, TELEMETRY_VERSION, buffer_size);
80+
// Check if exporter is enabled and which type to use
81+
const char *exporter_type_env = std::getenv(TELEMETRY_EXPORTER_VAR);
82+
if (exporter_type_env) {
83+
// Use plugin-based exporter
84+
std::string exporter_type = exporter_type_env;
85+
86+
NIXL_INFO << "Telemetry exporter enabled, type: " << exporter_type;
87+
88+
// Prepare initialization parameters for the exporter
89+
nixlTelemetryExporterInitParams init_params;
90+
init_params.outputPath = std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) ?
91+
std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) :
92+
full_file_path.string();
93+
init_params.eventLimit = buffer_size;
94+
95+
// Create exporter through plugin manager
96+
auto &exporter_manager = nixlTelemetryPluginManager::getInstance();
97+
exporter_ = exporter_manager.createExporter(exporter_type, &init_params);
98+
99+
if (!exporter_) {
100+
NIXL_WARN << "Failed to create telemetry exporter '" << exporter_type
101+
<< "', falling back to buffer mode";
102+
// Fall back to buffer mode
103+
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
104+
full_file_path, true, TELEMETRY_VERSION, buffer_size);
105+
writeTask_.callback_ = [this]() { return writeEventHelper(); };
106+
} else {
107+
writeTask_.callback_ = [this]() { return telemetryExporterHelper(); };
108+
}
109+
} else {
110+
// Use buffer mode (default)
111+
NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path
112+
<< " with size: " << buffer_size;
113+
114+
buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>(
115+
full_file_path, true, TELEMETRY_VERSION, buffer_size);
116+
117+
writeTask_.callback_ = [this]() { return writeEventHelper(); };
118+
}
83119

84120
auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ?
85121
std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) :
86122
DEFAULT_TELEMETRY_RUN_INTERVAL;
87123

88124
// Update write task interval and start it
89-
writeTask_.callback_ = [this]() { return writeEventHelper(); };
125+
90126
writeTask_.interval_ = run_interval;
91127
writeTask_.enabled_ = true;
92128
registerPeriodicTask(writeTask_);
93129
}
94130

131+
bool
132+
nixlTelemetry::telemetryExporterHelper() {
133+
std::vector<nixlTelemetryEvent> next_queue;
134+
next_queue.reserve(exporter_->getEventLimit());
135+
{
136+
std::lock_guard<std::mutex> lock(mutex_);
137+
events_.swap(next_queue);
138+
}
139+
for (auto &event : next_queue) {
140+
// if full, ignore
141+
exporter_->exportEvent(event);
142+
}
143+
// collect all events and sort them by timestamp
144+
std::vector<nixlTelemetryEvent> all_events;
145+
for (auto &backend : backendMap_) {
146+
auto backend_events = backend.second->getTelemetryEvents();
147+
for (auto &event : backend_events) {
148+
// don't trust enum value coming from backend,
149+
// as it might be different from the one in agent
150+
event.category_ = nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND;
151+
all_events.push_back(event);
152+
}
153+
}
154+
std::sort(all_events.begin(),
155+
all_events.end(),
156+
[](const nixlTelemetryEvent &a, const nixlTelemetryEvent &b) {
157+
return a.timestampUs_ < b.timestampUs_;
158+
});
159+
for (auto &event : all_events) {
160+
exporter_->exportEvent(event);
161+
}
162+
return true;
163+
}
164+
95165
bool
96166
nixlTelemetry::writeEventHelper() {
97167
std::vector<nixlTelemetryEvent> next_queue;

src/core/telemetry.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include "common/cyclic_buffer.h"
2121
#include "telemetry_event.h"
22+
#include "telemetry/telemetry_exporter.h"
2223
#include "mem_section.h"
2324
#include "nixl_types.h"
2425

@@ -81,8 +82,11 @@ class nixlTelemetry {
8182
updateData(const std::string &event_name, nixl_telemetry_category_t category, uint64_t value);
8283
bool
8384
writeEventHelper();
85+
bool
86+
telemetryExporterHelper();
8487
std::unique_ptr<sharedRingBuffer<nixlTelemetryEvent>> buffer_;
8588
std::vector<nixlTelemetryEvent> events_;
89+
std::unique_ptr<nixlTelemetryExporter> exporter_;
8690
std::mutex mutex_;
8791
asio::thread_pool pool_;
8892
periodicTask writeTask_;

0 commit comments

Comments
 (0)