|
26 | 26 | #include "common/nixl_log.h" |
27 | 27 | #include "telemetry.h" |
28 | 28 | #include "telemetry_event.h" |
| 29 | +#include "telemetry_plugin_manager.h" |
29 | 30 | #include "util.h" |
30 | 31 |
|
31 | 32 | using namespace std::chrono_literals; |
@@ -75,23 +76,92 @@ nixlTelemetry::initializeTelemetry() { |
75 | 76 | throw std::invalid_argument("Telemetry buffer size cannot be 0"); |
76 | 77 | } |
77 | 78 |
|
78 | | - NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path |
79 | | - << " with size: " << buffer_size; |
80 | 79 |
|
81 | | - buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>( |
82 | | - full_file_path, true, TELEMETRY_VERSION, buffer_size); |
| 80 | + // Check if exporter is enabled and which type to use |
| 81 | + const char *exporter_type_env = std::getenv(TELEMETRY_EXPORTER_VAR); |
| 82 | + if (exporter_type_env) { |
| 83 | + // Use plugin-based exporter |
| 84 | + std::string exporter_type = exporter_type_env; |
| 85 | + |
| 86 | + NIXL_INFO << "Telemetry exporter enabled, type: " << exporter_type; |
| 87 | + |
| 88 | + // Prepare initialization parameters for the exporter |
| 89 | + const nixlTelemetryExporterInitParams init_params = { |
| 90 | + .outputPath = std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) ? |
| 91 | + std::getenv(TELEMETRY_EXPORTER_OUTPUT_PATH_VAR) : |
| 92 | + full_file_path.string(), |
| 93 | + .eventLimit = buffer_size}; |
| 94 | + |
| 95 | + // Create exporter through plugin manager |
| 96 | + auto &exporter_manager = nixlTelemetryPluginManager::getInstance(); |
| 97 | + exporter_ = exporter_manager.createExporter(exporter_type, init_params); |
| 98 | + |
| 99 | + if (!exporter_) { |
| 100 | + NIXL_WARN << "Failed to create telemetry exporter '" << exporter_type |
| 101 | + << "', falling back to buffer mode"; |
| 102 | + // Fall back to buffer mode |
| 103 | + buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>( |
| 104 | + full_file_path, true, TELEMETRY_VERSION, buffer_size); |
| 105 | + writeTask_.callback_ = [this]() { return writeEventHelper(); }; |
| 106 | + } else { |
| 107 | + writeTask_.callback_ = [this]() { return telemetryExporterHelper(); }; |
| 108 | + } |
| 109 | + } else { |
| 110 | + // Use buffer mode (default) |
| 111 | + NIXL_INFO << "Telemetry enabled, using buffer path: " << full_file_path |
| 112 | + << " with size: " << buffer_size; |
| 113 | + |
| 114 | + buffer_ = std::make_unique<sharedRingBuffer<nixlTelemetryEvent>>( |
| 115 | + full_file_path, true, TELEMETRY_VERSION, buffer_size); |
| 116 | + |
| 117 | + writeTask_.callback_ = [this]() { return writeEventHelper(); }; |
| 118 | + } |
83 | 119 |
|
84 | 120 | auto run_interval = std::getenv(TELEMETRY_RUN_INTERVAL_VAR) ? |
85 | 121 | std::chrono::milliseconds(std::stoul(std::getenv(TELEMETRY_RUN_INTERVAL_VAR))) : |
86 | 122 | DEFAULT_TELEMETRY_RUN_INTERVAL; |
87 | 123 |
|
88 | 124 | // Update write task interval and start it |
89 | | - writeTask_.callback_ = [this]() { return writeEventHelper(); }; |
| 125 | + |
90 | 126 | writeTask_.interval_ = run_interval; |
91 | 127 | writeTask_.enabled_ = true; |
92 | 128 | registerPeriodicTask(writeTask_); |
93 | 129 | } |
94 | 130 |
|
| 131 | +bool |
| 132 | +nixlTelemetry::telemetryExporterHelper() { |
| 133 | + std::vector<nixlTelemetryEvent> next_queue; |
| 134 | + next_queue.reserve(exporter_->getEventLimit()); |
| 135 | + { |
| 136 | + std::lock_guard<std::mutex> lock(mutex_); |
| 137 | + events_.swap(next_queue); |
| 138 | + } |
| 139 | + for (auto &event : next_queue) { |
| 140 | + // if full, ignore |
| 141 | + exporter_->exportEvent(event); |
| 142 | + } |
| 143 | + // collect all events and sort them by timestamp |
| 144 | + std::vector<nixlTelemetryEvent> all_events; |
| 145 | + for (auto &backend : backendMap_) { |
| 146 | + auto backend_events = backend.second->getTelemetryEvents(); |
| 147 | + for (auto &event : backend_events) { |
| 148 | + // don't trust enum value coming from backend, |
| 149 | + // as it might be different from the one in agent |
| 150 | + event.category_ = nixl_telemetry_category_t::NIXL_TELEMETRY_BACKEND; |
| 151 | + all_events.push_back(event); |
| 152 | + } |
| 153 | + } |
| 154 | + std::sort(all_events.begin(), |
| 155 | + all_events.end(), |
| 156 | + [](const nixlTelemetryEvent &a, const nixlTelemetryEvent &b) { |
| 157 | + return a.timestampUs_ < b.timestampUs_; |
| 158 | + }); |
| 159 | + for (auto &event : all_events) { |
| 160 | + exporter_->exportEvent(event); |
| 161 | + } |
| 162 | + return true; |
| 163 | +} |
| 164 | + |
95 | 165 | bool |
96 | 166 | nixlTelemetry::writeEventHelper() { |
97 | 167 | std::vector<nixlTelemetryEvent> next_queue; |
|
0 commit comments