Skip to content

Commit 5529676

Browse files
authored
[OMON-540] Provide selected metrics from Kafka over Webserver (#285)
1 parent 789be01 commit 5529676

File tree

3 files changed

+237
-73
lines changed

3 files changed

+237
-73
lines changed

CMakeLists.txt

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,6 @@ set(EXAMPLES
215215
examples/5-Benchmark.cxx
216216
examples/6-Increment.cxx
217217
examples/7-InternalBenchamrk.cxx
218-
examples/8-DbFiller.cxx
219218
examples/9-RunNumber.cxx
220219
examples/10-Buffering.cxx
221220
)
@@ -235,12 +234,14 @@ foreach (example ${EXAMPLES})
235234
endforeach()
236235

237236
set_target_properties(5-Benchmark PROPERTIES OUTPUT_NAME "o2-monitoring-benchmark")
238-
set_target_properties(8-DbFiller PROPERTIES OUTPUT_NAME "o2-monitoring-dbfiller")
237+
239238
if(RdKafka_FOUND)
240239
set_target_properties(11-KafkaToWebsocket PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-grafana")
241240
endif()
241+
242+
242243
####################################
243-
# Kafka protobuf deserializer
244+
# Generate protobuf
244245
####################################
245246
if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
246247
set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/envs.proto)
@@ -256,19 +257,24 @@ if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
256257
COMMENT "Running protoc on ${PROTO_FILE}"
257258
VERBATIM)
258259

259-
add_executable(12-KafkvaToInfluxDb
260+
set(PROTO_EXAMPLES
260261
examples/12-KafkaToInfluxDb.cxx
261-
${PROTO_CPP_OUTPUT})
262-
263-
target_include_directories(12-KafkvaToInfluxDb
264-
PRIVATE
265-
${CMAKE_CURRENT_BINARY_DIR})
266-
267-
target_link_libraries(12-KafkvaToInfluxDb
268-
PRIVATE
269-
Monitoring
270-
Boost::program_options
271-
protobuf::libprotobuf)
262+
examples/8-KafkaToHttpServer.cxx
263+
)
264+
foreach (example ${PROTO_EXAMPLES})
265+
get_filename_component(example_name ${example} NAME)
266+
string(REGEX REPLACE ".cxx" "" example_name ${example_name})
267+
add_executable(${example_name} ${example} ${PROTO_CPP_OUTPUT})
268+
target_link_libraries(${example_name} PRIVATE
269+
Monitoring
270+
Boost::program_options
271+
protobuf::libprotobuf
272+
)
273+
target_include_directories(${example_name} PRIVATE ${CMAKE_CURRENT_BINARY_DIR})
274+
install(TARGETS ${example_name})
275+
endforeach()
276+
set_target_properties(8-KafkaToHttpServer PROPERTIES OUTPUT_NAME "o2-monitoring-env-webserver")
277+
set_target_properties(12-KafkaToInfluxDb PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-influxdb")
272278
endif()
273279

274280
####################################
@@ -311,7 +317,7 @@ endforeach()
311317
####################################
312318

313319
# Install library
314-
install(TARGETS Monitoring 5-Benchmark 8-DbFiller
320+
install(TARGETS Monitoring 5-Benchmark
315321
EXPORT MonitoringTargets
316322
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
317323
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}

examples/8-DbFiller.cxx

Lines changed: 0 additions & 57 deletions
This file was deleted.

examples/8-KafkaToHttpServer.cxx

Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
#define BOOST_BEAST_USE_STD_STRING_VIEW
2+
3+
#include <boost/beast/core.hpp>
4+
#include <boost/beast/http.hpp>
5+
#include <boost/beast/version.hpp>
6+
#include <boost/asio.hpp>
7+
#include <boost/program_options.hpp>
8+
#include <boost/algorithm/string/join.hpp>
9+
#include <chrono>
10+
#include <iostream>
11+
#include <memory>
12+
#include <mutex>
13+
#include <string>
14+
#include <thread>
15+
16+
#include "../src/Transports/KafkaConsumer.h"
17+
#include "../src/MonLogger.h"
18+
#include "envs.pb.h"
19+
20+
namespace beast = boost::beast;
21+
namespace http = beast::http;
22+
using tcp = boost::asio::ip::tcp;
23+
using o2::monitoring::MonLogger;
24+
using namespace std::literals::string_literals;
25+
26+
27+
aliceo2::envs::ActiveRunsList gActiveEnvs;
28+
std::mutex gEnvAccess;
29+
30+
31+
class httpConnection : public std::enable_shared_from_this<httpConnection>
32+
{
33+
public:
34+
httpConnection(tcp::socket socket) : mSocket(std::move(socket)) {}
35+
36+
// Initiate the asynchronous operations associated with the connection.
37+
void start() {
38+
readRequest();
39+
checkDeadline();
40+
}
41+
42+
void addCallback(std::string_view path, std::function<void(http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response)> callback) {
43+
mCallbacks.insert({path, callback});
44+
}
45+
46+
private:
47+
tcp::socket mSocket;
48+
beast::flat_buffer mBuffer{8192};
49+
http::request<http::dynamic_body> mRequest;
50+
http::response<http::dynamic_body> mResponse;
51+
boost::asio::steady_timer mDeadline{mSocket.get_executor(), std::chrono::seconds(5)};
52+
std::map<std::string_view, std::function<void(http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response)>> mCallbacks;
53+
54+
// Asynchronously receive a complete request message.
55+
void readRequest() {
56+
auto self = shared_from_this();
57+
http::async_read(mSocket, mBuffer, mRequest, [self](beast::error_code ec, std::size_t bytes_transferred) {
58+
boost::ignore_unused(bytes_transferred);
59+
if (!ec) self->processRequest();
60+
});
61+
}
62+
63+
// Determine what needs to be done with the request message.
64+
void processRequest() {
65+
mResponse.version(mRequest.version());
66+
mResponse.keep_alive(false);
67+
mResponse.result(http::status::ok);
68+
createResponse();
69+
writeResponse();
70+
}
71+
72+
// Construct a response message based on the program state.
73+
void createResponse() {
74+
for (const auto& [key, value] : mCallbacks) {
75+
if (mRequest.target().find(key) != std::string_view::npos) {
76+
value(mRequest, mResponse);
77+
return;
78+
}
79+
}
80+
mResponse.result(http::status::not_found);
81+
mResponse.set(http::field::content_type, "text/plain");
82+
beast::ostream(mResponse.body()) << "Not found\r\n";
83+
}
84+
85+
// Asynchronously transmit the response message.
86+
void writeResponse() {
87+
auto self = shared_from_this();
88+
mResponse.content_length(mResponse.body().size());
89+
http::async_write(mSocket, mResponse, [self](beast::error_code ec, std::size_t) {
90+
self->mSocket.shutdown(tcp::socket::shutdown_send, ec);
91+
self->mDeadline.cancel();
92+
});
93+
}
94+
95+
// Check whether we have spent enough time on this connection.
96+
void checkDeadline() {
97+
auto self = shared_from_this();
98+
mDeadline.async_wait(
99+
[self](beast::error_code ec) {
100+
if(!ec) {
101+
self->mSocket.close(ec);
102+
}
103+
});
104+
}
105+
};
106+
107+
// "Loop" forever accepting new connections.
108+
void httpServer(tcp::acceptor& acceptor, tcp::socket& socket) {
109+
acceptor.async_accept(socket, [&](beast::error_code ec) {
110+
if (!ec) {
111+
auto connection = std::make_shared<httpConnection>(std::move(socket));
112+
connection->addCallback("SHOW+RETENTION+POLICIES",
113+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
114+
response.set(http::field::content_type, "application/json");
115+
beast::ostream(response.body()) << "{}\n";
116+
});
117+
connection->addCallback("SHOW+TAG+VALUES+FROM",
118+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
119+
std::string jsonPrefix = R"({"results": [{"statement_id": 0, "series": [{"name": "env_active", "columns": ["key", "value"], "values": [)";
120+
std::string jsonSuffix = R"(]}]}]})";
121+
response.set(http::field::content_type, "application/json");
122+
const std::lock_guard<std::mutex> lock(gEnvAccess);
123+
std::string envsJson;
124+
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
125+
envsJson += "[\"run\", \"" + std::to_string(gActiveEnvs.activeruns(i).runnumber()) + "\"],";
126+
}
127+
if (!envsJson.empty()) {
128+
envsJson.pop_back();
129+
}
130+
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
131+
});
132+
connection->addCallback("active_runs+WHERE+run",
133+
[](http::request<http::dynamic_body>& request, http::response<http::dynamic_body>& response) {
134+
std::string jsonPrefix = R"({"results":[{"statement_id":0,"series":[{"name":"env","columns":["time","Env ID","Run number","Detectors","State"],"values":[)";
135+
std::string jsonSuffix = R"(]}]}]})";
136+
response.set(http::field::content_type, "application/json");
137+
std::string runString = std::string(request.target().substr(request.target().find("WHERE+run+%3D+") + 14));
138+
uint32_t run;
139+
try {
140+
run = static_cast<uint32_t>(std::stoul(runString));
141+
} catch(...) {
142+
response.set(http::field::content_type, "application/json");
143+
beast::ostream(response.body()) << "{}\r\n";
144+
return;
145+
}
146+
const std::lock_guard<std::mutex> lock(gEnvAccess);
147+
std::string envsJson;
148+
for (int i = 0; i < gActiveEnvs.activeruns_size(); i++) {
149+
if (run != gActiveEnvs.activeruns(i).runnumber()) {
150+
continue;
151+
}
152+
auto detectorsProto = gActiveEnvs.activeruns(i).detectors();
153+
std::vector<std::string> detectors(detectorsProto.begin(), detectorsProto.end());
154+
envsJson += "[" + std::to_string(gActiveEnvs.timestamp()) + ", \""
155+
+ gActiveEnvs.activeruns(i).environmentid() + "\", "
156+
+ std::to_string(gActiveEnvs.activeruns(i).runnumber()) + ", \""
157+
+ boost::algorithm::join(detectors, " ") + "\", \""
158+
+ gActiveEnvs.activeruns(i).state() + "\"]";
159+
}
160+
beast::ostream(response.body()) << jsonPrefix << envsJson << jsonSuffix << '\n';
161+
});
162+
connection->start();
163+
}
164+
httpServer(acceptor, socket);
165+
});
166+
}
167+
168+
// Deserialize active runs
169+
void deserializeActiveRuns(const std::string& lastActiveRunMessage)
170+
{
171+
aliceo2::envs::ActiveRunsList activeRuns;
172+
activeRuns.ParseFromString(lastActiveRunMessage);
173+
if (activeRuns.activeruns_size() == 0) {
174+
MonLogger::Get() << "Empty active runs" << MonLogger::End();;
175+
} else {
176+
for (int i = 0; i < activeRuns.activeruns_size(); i++) {
177+
MonLogger::Get() << "Active run: " << activeRuns.activeruns(i).runnumber() << " ("
178+
<< activeRuns.activeruns(i).environmentid() << ")" << MonLogger::End();
179+
}
180+
}
181+
const std::lock_guard<std::mutex> lock(gEnvAccess);
182+
gActiveEnvs = activeRuns;
183+
}
184+
185+
int main(int argc, char* argv[]) {
186+
boost::program_options::options_description desc("Program options");
187+
desc.add_options()
188+
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka host")
189+
("kafka-topics", boost::program_options::value<std::vector<std::string>>()->multitoken()->required(), "Kafka topics");
190+
boost::program_options::variables_map vm;
191+
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
192+
boost::program_options::notify(vm);
193+
194+
MonLogger::mLoggerSeverity = o2::monitoring::Severity::Debug;
195+
196+
std::thread webServerThread([](){
197+
auto const address = boost::asio::ip::make_address("0.0.0.0");
198+
boost::asio::io_context ioc{1};
199+
tcp::acceptor acceptor{ioc, {address, 8086}};
200+
tcp::socket socket{ioc};
201+
httpServer(acceptor, socket);
202+
ioc.run();
203+
});
204+
205+
auto kafkaConsumer = std::make_unique<o2::monitoring::transports::KafkaConsumer>(
206+
vm["kafka-host"].as<std::string>(), 9092, std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}
207+
);
208+
for (;;) {
209+
auto serializedRuns = kafkaConsumer->receive();
210+
if (!serializedRuns.empty()) {
211+
deserializeActiveRuns(serializedRuns.back());
212+
}
213+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
214+
}
215+
}

0 commit comments

Comments
 (0)