Skip to content

Commit 58b88de

Browse files
authored
[OMON-520] Add Kafka bridge example (#277)
1 parent 4e4dcfd commit 58b88de

File tree

4 files changed

+59
-1
lines changed

4 files changed

+59
-1
lines changed

CMakeLists.txt

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ set(EXAMPLES
216216
)
217217

218218
if(RdKafka_FOUND)
219-
list(APPEND EXAMPLES "examples/11-KafkaToHttp.cxx")
219+
list(APPEND EXAMPLES "examples/11-KafkaToWebsocket.cxx")
220220
endif()
221221

222222
foreach (example ${EXAMPLES})
@@ -231,6 +231,9 @@ endforeach()
231231

232232
set_target_properties(5-Benchmark PROPERTIES OUTPUT_NAME "o2-monitoring-benchmark")
233233
set_target_properties(8-DbFiller PROPERTIES OUTPUT_NAME "o2-monitoring-dbfiller")
234+
if(RdKafka_FOUND)
235+
set_target_properties(11-KafkaToWebsocket PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-grafana")
236+
endif()
234237

235238
####################################
236239
# Tests
@@ -278,6 +281,9 @@ install(TARGETS Monitoring 5-Benchmark 8-DbFiller
278281
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
279282
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
280283
)
284+
if(RdKafka_FOUND)
285+
install(TARGETS 11-KafkaToWebsocket)
286+
endif()
281287

282288
# Create version file
283289
include(CMakePackageConfigHelpers)

examples/11-KafkaToWebsocket.cxx

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
///
2+
/// \file 11-KafkaToHttp.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "../src/Transports/KafkaConsumer.h"
7+
#include "../src/Transports/HTTP.h"
8+
#include "../src/Transports/WebSocket.h"
9+
10+
#include <iostream>
11+
#include <memory>
12+
#include <thread>
13+
#include <boost/algorithm/string/join.hpp>
14+
#include <boost/program_options.hpp>
15+
16+
using namespace o2::monitoring;
17+
18+
int main(int argc, char* argv[])
19+
{
20+
boost::program_options::options_description desc("Program options");
21+
desc.add_options()
22+
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname")
23+
("kafka-topic", boost::program_options::value<std::string>()->required(), "Kafka topic")
24+
("grafana-host", boost::program_options::value<std::string>()->required(), "Grafana hostname")
25+
("grafana-key", boost::program_options::value<std::string>()->required(), "Grafana API key");
26+
boost::program_options::variables_map vm;
27+
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
28+
boost::program_options::notify(vm);
29+
30+
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, vm["kafka-topic"].as<std::string>());
31+
auto outTransport = std::make_unique<transports::WebSocket>(vm["grafana-host"].as<std::string>(), 3000, vm["grafana-key"].as<std::string>(), "alice_o2");
32+
std::thread readThread([&outTransport](){
33+
for (;;) {
34+
outTransport->read();
35+
std::this_thread::sleep_for(std::chrono::milliseconds(500));
36+
}
37+
});
38+
for (;;) {
39+
auto metrics = kafkaConsumer->receive();
40+
if (!metrics.empty()) {
41+
outTransport->send(boost::algorithm::join(metrics, "\n"));
42+
}
43+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
44+
}
45+
}

src/Transports/WebSocket.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,11 @@ WebSocket::WebSocket(const std::string& hostname, int port, const std::string& t
4040
mWebSocket.handshake(hostname, "/api/live/push/" + stream);
4141
}
4242

43+
void WebSocket::read() {
44+
beast::flat_buffer buffer;
45+
mWebSocket.read(buffer);
46+
}
47+
4348
WebSocket::~WebSocket()
4449
{
4550
mWebSocket.close(beast::websocket::close_code::normal);

src/Transports/WebSocket.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ class WebSocket : public TransportInterface
5454
/// \param message r-value string formated
5555
void send(std::string&& message) override;
5656

57+
/// Read control frames from socket
58+
void read();
5759
private:
5860
/// IO context
5961
boost::asio::io_context mIoContext;

0 commit comments

Comments
 (0)