Skip to content

Commit 8a0c0dc

Browse files
author
Michal Tichák
committed
[OCTRL-932] sorted list of runs received from kafka
1 parent fcfe139 commit 8a0c0dc

File tree

4 files changed

+136
-106
lines changed

4 files changed

+136
-106
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@
55
.idea
66
examples/config.ini
77
/cmake-build-debug/
8+
compile_commands.json

CMakeLists.txt

Lines changed: 17 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ set(Boost_USE_MULTITHREADED ON)
2424

2525
# Set cmake policy by version: https://cmake.org/cmake/help/latest/manual/cmake-policies.7.html
2626
if(${CMAKE_VERSION} VERSION_LESS 3.12)
27-
cmake_policy(VERSION ${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION})
27+
cmake_policy(VERSION ${CMAKE_MAJOR_VERSION}.${CMAKE_MINOR_VERSION})
2828
else()
29-
cmake_policy(VERSION 3.12)
29+
cmake_policy(VERSION 3.12)
3030
endif()
3131

3232
# Define project
@@ -81,20 +81,21 @@ find_package(Protobuf CONFIG)
8181
find_package(gRPC CONFIG)
8282

8383
if(RDKAFKA_ROOT)
84-
message("RDKAFKA_ROOT set, we enable corresponding libs and binaries")
85-
find_library(RDKAFKA_LIB1 "rdkafka++" REQUIRED PATHS ${RDKAFKA_ROOT}/lib)
86-
find_library(RDKAFKA_LIB2 "rdkafka" REQUIRED PATHS ${RDKAFKA_ROOT}/lib)
87-
set(RDKAFKA_LIBS ${RDKAFKA_LIB1} ${RDKAFKA_LIB2})
88-
set(RDKAFKA_INCLUDE "${RDKAFKA_ROOT}/include")
89-
set(RdKafka_FOUND true)
84+
message("RDKAFKA_ROOT set as ${RDKAFKA_ROOT}, we enable corresponding libs and binaries")
85+
find_library(RDKAFKA_LIB1 "rdkafka++" REQUIRED NO_DEFAULT_PATH PATHS ${RDKAFKA_ROOT}/lib)
86+
find_library(RDKAFKA_LIB2 "rdkafka" REQUIRED NO_DEFAULT_PATH PATHS ${RDKAFKA_ROOT}/lib)
87+
set(RDKAFKA_LIBS ${RDKAFKA_LIB1} ${RDKAFKA_LIB2})
88+
set(RDKAFKA_INCLUDE "${RDKAFKA_ROOT}/include")
89+
set(RdKafka_FOUND true)
90+
message(STATUS "RDKAFKA_LIBS found: ${RDKAFKA_LIBS}")
9091
else()
91-
message("RDKAFKA_ROOT not set, corresponding libs and binaries won't be built")
92+
message("RDKAFKA_ROOT not set, corresponding libs and binaries won't be built")
9293
endif()
9394

9495
if(gRPC_FOUND)
95-
message("gRPC found, we enable corresponding libs and binaries")
96+
message("gRPC found, we enable corresponding libs and binaries")
9697
else()
97-
message("gRPC not found, corresponding libs and binaries won't be built")
98+
message("gRPC not found, corresponding libs and binaries won't be built")
9899
endif()
99100

100101

@@ -186,11 +187,11 @@ if(ApMon_FOUND)
186187
endif()
187188

188189
if(RdKafka_FOUND)
189-
message(STATUS " Compiling Kafka transport")
190+
message(STATUS " Compiling Kafka transport")
190191
endif()
191192

192193
if(CURL_FOUND)
193-
message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend")
194+
message(STATUS " Compiling HTTP transport/InfluxDB 2.x backend")
194195
endif()
195196

196197
# Detect operating system
@@ -218,7 +219,7 @@ if (APPLE)
218219
endif()
219220

220221
if(Protobuf_FOUND AND RdKafka_FOUND)
221-
message(STATUS "Compiling Kafka consumer with protobuf deserializer")
222+
message(STATUS "Compiling Kafka consumer with protobuf deserializer")
222223
endif()
223224

224225
# Handle custom compile definitions
@@ -319,6 +320,8 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
319320
set_target_properties(8-KafkaToHttpServer PROPERTIES OUTPUT_NAME "o2-monitoring-env-webserver")
320321
set_target_properties(12-KafkaToInfluxDb PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-influxdb")
321322
set_target_properties(14-OrbitId PROPERTIES OUTPUT_NAME "o2-monitoring-orbitid")
323+
else()
324+
message(WARNING "Failed to generate PROTO examples: RdKafka_FOUND ${RdKafka_FOUND}, Protobuf_FOUND ${Protobuf_FOUND}, CURL_FOUND ${CURL_FOUND}" )
322325
endif()
323326

324327
####################################

examples/8-KafkaToHttpServer.cxx

Lines changed: 117 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -20,91 +20,94 @@ aliceo2::envs::ActiveRunsList gActiveEnvs;
2020
std::mutex gEnvAccess;
2121

2222
// "Loop" forever accepting new connections.
23-
void httpServer(tcp::acceptor& acceptor, tcp::socket& socket) {
23+
void httpServer(tcp::acceptor& acceptor, tcp::socket& socket)
24+
{
2425
acceptor.async_accept(socket, [&](beast::error_code ec) {
2526
if (!ec) {
26-
auto connection = std::make_shared<httpConnection>(std::move(socket));
27-
connection->addCallback("SHOW+RETENTION+POLICIES",
28-
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
29-
response.set(http::field::content_type, "application/json");
30-
beast::ostream(response.body()) << "{}\n";
31-
});
32-
connection->addCallback("SHOW+TAG+VALUES+FROM+runs",
33-
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
34-
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "env_active", "columns": ["key", "value"], "values": [)";
35-
std::string jsonSuffix = R"(]}]}]})";
36-
response.set(http::field::content_type, "application/json");
37-
const std::lock_guard<std::mutex> lock(gEnvAccess);
38-
std::string envsJson;
39-
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
40-
envsJson += "[\"run\", \"" + std::to_string(gActiveEnvs.activeruns(i).runnumber()) + "\"],";
41-
}
42-
if (!envsJson.empty()) {
43-
envsJson.pop_back();
44-
} else {
45-
envsJson += "[\"run\", \"0\"]";
46-
}
47-
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
48-
});
49-
connection->addCallback("SHOW+TAG+VALUES+FROM+detectors",
50-
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
51-
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "detectors", "columns": ["key", "value"], "values": [)";
52-
std::string jsonSuffix = R"(]}]}]})";
53-
std::string runString = std::string(request.target().substr(request.target().find("WHERE+run+%3D+") + 14));
54-
response.set(http::field::content_type, "application/json");
55-
uint32_t run;
56-
try {
57-
run = static_cast<uint32_t>(std::stoul(runString));
58-
} catch(...) {
59-
beast::ostream(response.body()) << "{}\r\n";
60-
return;
61-
}
62-
const std::lock_guard<std::mutex> lock(gEnvAccess);
63-
std::string detectorsJson;
64-
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
65-
if (run != gActiveEnvs.activeruns(i).runnumber()) {
66-
continue;
67-
}
68-
for (int j = 0; j < gActiveEnvs.activeruns(i).detectors_size(); j++) {
69-
detectorsJson += "[\"detector\", \"" + boost::algorithm::to_lower_copy(gActiveEnvs.activeruns(i).detectors(j)) + "\"],";
70-
}
71-
if (!detectorsJson.empty()) {
72-
detectorsJson.pop_back();
73-
}
74-
}
75-
beast::ostream(response.body()) << jsonPrefix << detectorsJson << jsonSuffix << '\n';
76-
});
77-
connection->addCallback("active_runs+WHERE+run",
78-
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
79-
std::string jsonPrefix = R"({"results":[{"statement_id":0,"series":[{"name":"env","columns":["time","Env ID","Run number","Detectors","State", "Run type"],"values":[)";
80-
std::string jsonSuffix = R"(]}]}]})";
81-
response.set(http::field::content_type, "application/json");
82-
std::string runString = std::string(request.target().substr(request.target().find("WHERE+run+%3D+") + 14));
83-
uint32_t run;
84-
try {
85-
run = static_cast<uint32_t>(std::stoul(runString));
86-
} catch(...) {
87-
beast::ostream(response.body()) << "{}\r\n";
88-
return;
89-
}
90-
const std::lock_guard<std::mutex> lock(gEnvAccess);
91-
std::string envsJson;
92-
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
93-
if (run != gActiveEnvs.activeruns(i).runnumber()) {
94-
continue;
95-
}
96-
auto detectorsProto = gActiveEnvs.activeruns(i).detectors();
97-
std::vector<std::string> detectors(detectorsProto.begin(), detectorsProto.end());
98-
envsJson += "[" + std::to_string(gActiveEnvs.activeruns(i).enterstatetimestamp()) + ", \""
99-
+ gActiveEnvs.activeruns(i).environmentid() + "\", "
100-
+ std::to_string(gActiveEnvs.activeruns(i).runnumber()) + ", \""
101-
+ boost::algorithm::join(detectors, " ") + "\", \""
102-
+ gActiveEnvs.activeruns(i).state() + "\", \""
103-
+ gActiveEnvs.activeruns(i).runtype() + "\"]";
104-
}
105-
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
106-
});
107-
connection->start();
27+
auto connection = std::make_shared<httpConnection>(std::move(socket));
28+
connection->addCallback("SHOW+RETENTION+POLICIES",
29+
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
30+
response.set(http::field::content_type, "application/json");
31+
beast::ostream(response.body()) << "{}\n";
32+
});
33+
connection->addCallback("SHOW+TAG+VALUES+FROM+runs",
34+
[](http::request<http::dynamic_body>& /*request*/, http::response<http::dynamic_body>& response) {
35+
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "env_active", "columns": ["key", "value"], "values": [)";
36+
std::string jsonSuffix = R"(]}]}]})";
37+
response.set(http::field::content_type, "application/json");
38+
const std::lock_guard<std::mutex> lock(gEnvAccess);
39+
std::string envsJson;
40+
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
41+
envsJson += "[\"run\", \"" + std::to_string(gActiveEnvs.activeruns(i).runnumber()) + "\"],";
42+
}
43+
if (!envsJson.empty()) {
44+
envsJson.pop_back();
45+
} else {
46+
envsJson += "[\"run\", \"0\"]";
47+
}
48+
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
49+
});
50+
connection->addCallback("SHOW+TAG+VALUES+FROM+detectors",
51+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
52+
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "detectors", "columns": ["key", "value"], "values": [)";
53+
std::string jsonSuffix = R"(]}]}]})";
54+
std::string runString = std::string(request.target().substr(request.target().find("WHERE+run+%3D+") + 14));
55+
response.set(http::field::content_type, "application/json");
56+
uint32_t run;
57+
try {
58+
run = static_cast<uint32_t>(std::stoul(runString));
59+
} catch (...) {
60+
beast::ostream(response.body()) << "{}\r\n";
61+
return;
62+
}
63+
const std::lock_guard<std::mutex> lock(gEnvAccess);
64+
std::string detectorsJson;
65+
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
66+
if (run != gActiveEnvs.activeruns(i).runnumber()) {
67+
continue;
68+
}
69+
for (int j = 0; j < gActiveEnvs.activeruns(i).detectors_size(); j++) {
70+
detectorsJson += "[\"detector\", \"" + boost::algorithm::to_lower_copy(gActiveEnvs.activeruns(i).detectors(j)) + "\"],";
71+
}
72+
if (!detectorsJson.empty()) {
73+
detectorsJson.pop_back();
74+
}
75+
}
76+
beast::ostream(response.body()) << jsonPrefix << detectorsJson << jsonSuffix << '\n';
77+
});
78+
connection->addCallback("active_runs+WHERE+run",
79+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
80+
std::string jsonPrefix = R"({"results":[{"statement_id":0,"series":[{"name":"env","columns":["time","Env ID","Run number","Detectors","State", "Run type"],"values":[)";
81+
std::string jsonSuffix = R"(]}]}]})";
82+
response.set(http::field::content_type, "application/json");
83+
std::string runString = std::string(request.target().substr(request.target().find("WHERE+run+%3D+") + 14));
84+
uint32_t run;
85+
try {
86+
run = static_cast<uint32_t>(std::stoul(runString));
87+
} catch (...) {
88+
beast::ostream(response.body()) << "{}\r\n";
89+
return;
90+
}
91+
const std::lock_guard<std::mutex> lock(gEnvAccess);
92+
std::string envsJson;
93+
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
94+
if (run != gActiveEnvs.activeruns(i).runnumber()) {
95+
continue;
96+
}
97+
auto detectorsProto = gActiveEnvs.activeruns(i).detectors();
98+
std::vector<std::string> detectors(detectorsProto.begin(), detectorsProto.end());
99+
envsJson += "[" +
100+
std::to_string(gActiveEnvs.activeruns(i).enterstatetimestamp()) + ", \"" +
101+
gActiveEnvs.activeruns(i).environmentid() + "\", " +
102+
std::to_string(gActiveEnvs.activeruns(i).runnumber()) + ", \"" +
103+
boost::algorithm::join(detectors, " ") + "\", \"" +
104+
gActiveEnvs.activeruns(i).state() + "\", \"" +
105+
gActiveEnvs.activeruns(i).runtype() +
106+
"\"]";
107+
}
108+
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
109+
});
110+
connection->start();
108111
}
109112
httpServer(acceptor, socket);
110113
});
@@ -123,24 +126,47 @@ void deserializeActiveRuns(const std::string& lastActiveRunMessage)
123126
<< activeRuns.activeruns(i).environmentid() << ")" << MonLogger::End();
124127
}
125128
}
129+
130+
// sort received active runs according to following rules:
131+
// 1) runs with more than 2 detectors are listed first
132+
// 2) if there are more than 1 run with more than 2 detectors list those with ITS first
133+
auto* runsToSort = activeRuns.mutable_activeruns();
134+
std::sort(runsToSort->begin(), runsToSort->end(), [](const aliceo2::envs::EnvInfo& a, const aliceo2::envs::EnvInfo& b) {
135+
auto hasITS = [](auto&& detectors) {
136+
return std::find(detectors.begin(), detectors.end(), "ITS") != detectors.end();
137+
};
138+
139+
if (a.detectors().size() >= 2 && b.detectors().size() >= 2) {
140+
if (hasITS(a.detectors())) {
141+
return true;
142+
}
143+
if (hasITS(b.detectors())) {
144+
return false;
145+
}
146+
}
147+
148+
return a.detectors().size() > b.detectors().size();
149+
});
150+
126151
const std::lock_guard<std::mutex> lock(gEnvAccess);
127152
gActiveEnvs = activeRuns;
128153
}
129154

130-
int main(int argc, char* argv[]) {
155+
int main(int argc, char* argv[])
156+
{
131157
boost::program_options::options_description desc("Program options");
132-
desc.add_options()
133-
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka host")
134-
("kafka-topics", boost::program_options::value<std::vector<std::string>>()->multitoken()->required(), "Kafka topics")
135-
("http-port", boost::program_options::value<unsigned short>()->default_value(8086), "HTTP server bind port");
136-
boost::program_options::variables_map vm;
158+
desc.add_options()(
159+
"kafka-host", boost::program_options::value<std::string>()->required(), "Kafka host")(
160+
"kafka-topics", boost::program_options::value<std::vector<std::string>>()->multitoken()->required(), "Kafka topics")(
161+
"http-port", boost::program_options::value<unsigned short>()->default_value(8086), "HTTP server bind port");
162+
boost::program_options::variables_map vm;
137163
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
138164
boost::program_options::notify(vm);
139165
unsigned short port = vm["http-port"].as<unsigned short>();
140166

141167
MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug;
142168
MonLogger::Get() << "Using Kafka instance: " << vm["kafka-host"].as<std::string>() << ":9092 and HTTP server port: " << port << MonLogger::End();
143-
std::thread webServerThread([&port](){
169+
std::thread webServerThread([&port]() {
144170
auto const address = boost::asio::ip::make_address("0.0.0.0");
145171
boost::asio::io_context ioc{1};
146172
tcp::acceptor acceptor{ioc, {address, port}};
@@ -150,8 +176,7 @@ int main(int argc, char* argv[]) {
150176
});
151177

152178
auto kafkaConsumer = std::make_unique<o2::monitoring::transports::KafkaConsumer>(
153-
vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}, "kafka-aliecs-active-envs"
154-
);
179+
vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}, "kafka-aliecs-active-envs");
155180
for (;;) {
156181
auto serializedRuns = kafkaConsumer->pull();
157182
if (!serializedRuns.empty()) {

examples/helpers/HttpConnection.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
#include <boost/asio.hpp>
55
#include <chrono>
66
#include <string>
7+
#include <map>
78

89
namespace beast = boost::beast;
910
namespace http = beast::http;

0 commit comments

Comments
 (0)