Skip to content

Commit 9f7aa55

Browse files
authored
[OMON-699] Attempt to scrape metrics from ODC endpoint (#329)
1 parent ebfa94f commit 9f7aa55

File tree

5 files changed

+617
-84
lines changed

5 files changed

+617
-84
lines changed

CMakeLists.txt

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ find_package(CURL MODULE)
7979
find_package(RdKafka CONFIG)
8080
find_package(InfoLogger CONFIG)
8181
find_package(Protobuf)
82+
find_package(gRPC CONFIG)
8283

8384
####################################
8485
# Set OUTPUT vars
@@ -298,6 +299,50 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
298299
set_target_properties(14-OrbitId PROPERTIES OUTPUT_NAME "o2-monitoring-orbitid")
299300
endif()
300301

302+
####################################
303+
# gRPC
304+
####################################
305+
if(Protobuf_FOUND AND gRPC_FOUND)
306+
set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/odc.proto)
307+
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
308+
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)
309+
set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc)
310+
set(GRPC_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.grpc.pb.cc)
311+
312+
add_custom_command(
313+
OUTPUT "${PROTO_CPP_OUTPUT}"
314+
COMMAND protobuf::protoc
315+
ARGS --proto_path ${PROTO_FILE_PREFIX}
316+
--cpp_out ${CMAKE_CURRENT_BINARY_DIR}
317+
${PROTO_OUTPUT_NAME}.proto
318+
DEPENDS ${PROTO_FILE}
319+
COMMENT "Running protoc on ${PROTO_FILE}"
320+
VERBATIM)
321+
322+
add_custom_command(
323+
OUTPUT "${GRPC_CPP_OUTPUT}"
324+
COMMAND protobuf::protoc
325+
ARGS --proto_path ${PROTO_FILE_PREFIX}
326+
--grpc_out=${CMAKE_CURRENT_BINARY_DIR}
327+
--plugin=protoc-gen-grpc=$<TARGET_FILE:gRPC::grpc_cpp_plugin>
328+
${PROTO_OUTPUT_NAME}.proto
329+
DEPENDS ${PROTO_FILE}
330+
COMMENT "Running protoc/gRPC on ${PROTO_FILE}"
331+
VERBATIM)
332+
333+
set(example examples/15-ODC.cxx)
334+
get_filename_component(example_name ${example} NAME)
335+
string(REGEX REPLACE ".cxx" "" example_name ${example_name})
336+
add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT} ${GRPC_CPP_OUTPUT})
337+
target_link_libraries(${example_name} PRIVATE
338+
gRPC::grpc++
339+
protobuf::libprotobuf
340+
Boost::program_options
341+
)
342+
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
343+
set_target_properties(15-ODC PROPERTIES OUTPUT_NAME "o2-monitoring-odc")
344+
endif()
345+
301346
####################################
302347
# Tests
303348
####################################
@@ -353,6 +398,10 @@ if(RdKafka_FOUND AND Protobuf_FOUND)
353398
install(TARGETS 14-OrbitId)
354399
endif()
355400

401+
if(Protobuf_FOUND AND gRPC_FOUND)
402+
install(TARGETS 15-ODC)
403+
endif()
404+
356405
# Create version file
357406
include(CMakePackageConfigHelpers)
358407
write_basic_package_version_file("${CMAKE_CURRENT_BINARY_DIR}/cmake/MonitoringConfigVersion.cmake"

examples/15-ODC.cxx

Lines changed: 249 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,249 @@
1+
///
2+
/// \file 15-ODC.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include <iostream>
7+
#include <grpc++/grpc++.h>
8+
#include "odc.grpc.pb.h"
9+
#include "helpers/HttpConnection.h"
10+
#include <boost/program_options.hpp>
11+
#include <thread>
12+
#include <regex>
13+
#include "../src/MonLogger.h"
14+
15+
using grpc::Channel;
16+
using grpc::ClientContext;
17+
using grpc::Status;
18+
19+
using odc::ODC;
20+
using odc::StatusRequest;
21+
using odc::StatusReply;
22+
23+
using odc::StateRequest;
24+
using odc::StateReply;
25+
using odc::GeneralReply;
26+
using o2::monitoring::MonLogger;
27+
28+
std::mutex gMapAccess;
29+
30+
struct OdcStats {
31+
int EpnCount;
32+
int FailedTasks;
33+
unsigned int RecoTasks;
34+
unsigned int CalibTasks;
35+
std::unordered_map<std::string, unsigned int> TasksPerCalib;
36+
std::unordered_map<std::string, unsigned int> FailedTasksPerCalib;
37+
std::unordered_map<std::string, std::string> CalibNames;
38+
std::string State;
39+
};
40+
41+
std::map<std::string, OdcStats> gStats;
42+
43+
void httpServer(tcp::acceptor& acceptor, tcp::socket& socket) {
44+
acceptor.async_accept(socket, [&](beast::error_code ec) {
45+
if (!ec) {
46+
auto connection = std::make_shared<httpConnection>(std::move(socket));
47+
connection->addCallback("SHOW+RETENTION+POLICIES",
48+
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
49+
response.set(http::field::content_type, "application/json");
50+
beast::ostream(response.body()) << "{}\n";
51+
});
52+
connection->addCallback("SHOW+measurements",
53+
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
54+
response.set(http::field::content_type, "application/json");
55+
beast::ostream(response.body()) << R"({"results":[{"statement_id":0,"series":[{"name":"measurements","columns":["name"],"values":[["odc"]]}]}]}\n)";
56+
});
57+
connection->addCallback("SHOW+TAG+VALUES+FROM+calibs+WHERE+partitionid",
58+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
59+
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "odc_calibs", "columns": ["key", "value"], "values": [)";
60+
std::string jsonSuffix = R"(]}]}]})";
61+
response.set(http::field::content_type, "application/json");
62+
std::string calibJson;
63+
std::string id = std::string(request.target().substr(request.target().find("WHERE+partitionid+%3D+") + 22));
64+
const std::lock_guard<std::mutex> lock(gMapAccess);
65+
if (gStats.find(id) != gStats.end()) {
66+
for (auto const& calib : gStats.at(id).TasksPerCalib) {
67+
calibJson += "[\"calib\", \"" + calib.first + "\"],";
68+
}
69+
}
70+
if (!calibJson.empty()) {
71+
calibJson.pop_back();
72+
}
73+
beast::ostream(response.body()) << jsonPrefix << calibJson << jsonSuffix << '\n';
74+
});
75+
connection->addCallback("odc_status+WHERE+partitionid",
76+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
77+
std::string jsonPrefix = R"({"results":[{"statement_id":0,"series":[{"name":"odc","columns":["time","State","EPN count","Failed tasks", "Calib tasks", "Reco tasks"],"values":[)";
78+
std::string jsonSuffix = R"(]}]}]})";
79+
response.set(http::field::content_type, "application/json");
80+
std::string id = std::string(request.target().substr(request.target().find("WHERE+partitionid+%3D+") + 22));
81+
std::string odcStatJson;
82+
const std::lock_guard<std::mutex> lock(gMapAccess);
83+
if (gStats.find(id) != gStats.end()) {
84+
odcStatJson += "[" + std::to_string(0) + ", \""
85+
+ gStats.at(id).State + "\", \""
86+
+ std::to_string(gStats.at(id).EpnCount) + "\", \""
87+
+ std::to_string(gStats.at(id).FailedTasks) + "\", \""
88+
+ std::to_string(gStats.at(id).CalibTasks) + "\", \""
89+
+ std::to_string(gStats.at(id).RecoTasks) + "\"]";
90+
}
91+
beast::ostream(response.body()) << jsonPrefix << odcStatJson << jsonSuffix << '\n';
92+
});
93+
connection->addCallback("calib_tasks+WHERE+calib",
94+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
95+
std::string jsonPrefix = R"({"results":[{"statement_id":0,"series":[{"name":"calib_tasks","columns":["time","Name", "Total","Failed"],"values":[)";
96+
std::string jsonSuffix = R"(]}]}]})";
97+
response.set(http::field::content_type, "application/json");
98+
std::string calib = std::string(request.target().substr(request.target().find("WHERE+calib+%3D+") + 16));
99+
std::string calibTasksJson;
100+
const std::lock_guard<std::mutex> lock(gMapAccess);
101+
calibTasksJson += "[" + std::to_string(0);
102+
for (const auto& run : gStats) {
103+
if (run.second.TasksPerCalib.find(calib) != run.second.TasksPerCalib.end()) {
104+
calibTasksJson += ", \"";
105+
if (run.second.CalibNames.find(calib) != run.second.CalibNames.end()) {
106+
calibTasksJson += run.second.CalibNames.at(calib) + "\",";
107+
} else {
108+
calibTasksJson += "<None>\",";
109+
}
110+
calibTasksJson += std::to_string(run.second.TasksPerCalib.at(calib));
111+
if (run.second.FailedTasksPerCalib.find(calib) != run.second.FailedTasksPerCalib.end()) {
112+
calibTasksJson += "," + std::to_string(run.second.FailedTasksPerCalib.at(calib));
113+
} else {
114+
calibTasksJson += ",0";
115+
}
116+
}
117+
}
118+
calibTasksJson += "]";
119+
beast::ostream(response.body()) << jsonPrefix << calibTasksJson << jsonSuffix << '\n';
120+
});
121+
connection->start();
122+
}
123+
httpServer(acceptor, socket);
124+
});
125+
}
126+
127+
class OdcClient {
128+
public:
129+
OdcClient(std::shared_ptr<Channel> channel) : mStub(ODC::NewStub(channel)) {}
130+
void getStatus() {
131+
gStats.clear();
132+
StatusRequest request;
133+
request.set_running(true);
134+
StatusReply reply;
135+
ClientContext context;
136+
Status status = mStub->Status(&context, request, &reply);
137+
if (status.ok()) {
138+
MonLogger::Get() << "Status call OK" << MonLogger::End();
139+
for (int i = 0; i < reply.partitions_size(); i++) {
140+
auto partitionId = reply.partitions(i).partitionid();
141+
OdcStats stats;
142+
stats.State = reply.partitions(i).state();
143+
getRunState(partitionId, stats);
144+
const std::lock_guard<std::mutex> lock(gMapAccess);
145+
gStats.insert({partitionId, stats});
146+
}
147+
} else {
148+
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
149+
}
150+
}
151+
void getRunState(const std::string& partitionId, OdcStats& stats) {
152+
StateRequest request;
153+
request.set_detailed(true);
154+
request.set_partitionid(partitionId);
155+
StateReply reply;
156+
ClientContext context;
157+
Status status = mStub->GetState(&context, request, &reply);
158+
if (status.ok()) {
159+
MonLogger::Get() << "State call for " << partitionId << " OK" << MonLogger::End();
160+
unsigned int failedCount = 0;
161+
std::unordered_set<std::string> uniqueEpns{};
162+
std::unordered_set<std::string> calibCollections{};
163+
unsigned int recoTasks = 0;
164+
unsigned int calibTasks = 0;
165+
std::regex rReco("_reco[0-9]+_");
166+
std::regex rCalib("_calib[0-9]+_");
167+
for (int i = 0; i < reply.devices_size(); i++) {
168+
if (reply.devices(i).state() == "ERROR") {
169+
failedCount++;
170+
}
171+
uniqueEpns.insert(reply.devices(i).host());
172+
if (std::regex_search(reply.devices(i).path(), rReco)) {
173+
recoTasks++;
174+
}
175+
if (std::regex_search(reply.devices(i).path(), rCalib)) {
176+
calibTasks++;
177+
const auto& path = reply.devices(i).path();
178+
auto calibIdx = path.find("_calib");
179+
auto calib = path.substr(calibIdx + 1, path.size()-calibIdx-3);
180+
auto calibName = path.substr(path.find_last_of('/') + 1, calibIdx - path.find_last_of('/') - 1);
181+
if (calibName.find("aggregator-proxy-") != std::string::npos) {
182+
stats.CalibNames.insert({calib, calibName.substr(calibName.find_last_of('-') + 1)});
183+
}
184+
auto it = stats.TasksPerCalib.find(calib);
185+
if (it != stats.TasksPerCalib.end()) {
186+
it->second++;
187+
}
188+
else {
189+
stats.TasksPerCalib.insert({calib, 1});
190+
}
191+
if (reply.devices(i).state() == "ERROR") {
192+
auto it = stats.FailedTasksPerCalib.find(calib);
193+
if (it != stats.FailedTasksPerCalib.end()) {
194+
it->second++;
195+
}
196+
else {
197+
stats.FailedTasksPerCalib.insert({calib, 1});
198+
}
199+
}
200+
}
201+
}
202+
const std::lock_guard<std::mutex> lock(gMapAccess);
203+
stats.RecoTasks = recoTasks;
204+
stats.CalibTasks = calibTasks;
205+
stats.EpnCount = uniqueEpns.size();
206+
stats.FailedTasks = failedCount;
207+
} else {
208+
std::cout << status.error_code() << ": " << status.error_message() << std::endl;
209+
}
210+
}
211+
212+
private:
213+
std::unique_ptr<ODC::Stub> mStub;
214+
};
215+
216+
217+
int main(int argc, char* argv[]) {
218+
boost::program_options::options_description desc("Program options");
219+
desc.add_options()
220+
("odc-host", boost::program_options::value<std::string>()->required(), "ODC hostname")
221+
("odc-port", boost::program_options::value<unsigned short>()->required(), "ODC port")
222+
("http-port", boost::program_options::value<unsigned short>()->default_value(8088), "HTTP server bind port");
223+
boost::program_options::variables_map vm;
224+
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
225+
boost::program_options::notify(vm);
226+
unsigned short port = vm["http-port"].as<unsigned short>();
227+
228+
MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug;
229+
MonLogger::Get() << "Connected to ODC server: " << vm["odc-host"].as<std::string>() << ":" << vm["odc-port"].as<unsigned short>() << "; serving HTTP on port: " << port << MonLogger::End();
230+
std::thread webServerThread([&port](){
231+
auto const address = boost::asio::ip::make_address("0.0.0.0");
232+
boost::asio::io_context ioc{1};
233+
tcp::acceptor acceptor{ioc, {address, port}};
234+
tcp::socket socket{ioc};
235+
httpServer(acceptor, socket);
236+
ioc.run();
237+
});
238+
grpc::ChannelArguments args;
239+
args.SetMaxReceiveMessageSize(20*1024*1024);
240+
OdcClient client(grpc::CreateCustomChannel(
241+
vm["odc-host"].as<std::string>() + ":" + std::to_string(vm["odc-port"].as<unsigned short>()),
242+
grpc::InsecureChannelCredentials(),
243+
args
244+
));
245+
for (;;) {
246+
client.getStatus();
247+
std::this_thread::sleep_for(std::chrono::seconds(15));
248+
}
249+
}

0 commit comments

Comments
 (0)