Skip to content

Commit a9c4b76

Browse files
authored
[OMON-539] Add example how to deserialise protobuf from Kafka messages (#283)
1 parent 75c2839 commit a9c4b76

File tree

3 files changed

+101
-0
lines changed

3 files changed

+101
-0
lines changed

CMakeLists.txt

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ find_package(ApMon MODULE)
7878
find_package(CURL MODULE)
7979
find_package(RdKafka CONFIG)
8080
find_package(InfoLogger CONFIG)
81+
find_package(Protobuf CONFIG)
8182

8283
####################################
8384
# Set OUTPUT vars
@@ -183,6 +184,10 @@ if (APPLE)
183184
message(STATUS "Detected macOS: Process monitor disabled")
184185
endif()
185186

187+
if(Protobuf_FOUND AND RdKafka_FOUND)
188+
message(STATUS "Compiling Kafka consumer with protobuf deserializer")
189+
endif()
190+
186191
# Handle custom compile definitions
187192
target_compile_definitions(Monitoring
188193
PRIVATE
@@ -234,6 +239,37 @@ set_target_properties(8-DbFiller PROPERTIES OUTPUT_NAME "o2-monitoring-dbfiller"
234239
if(RdKafka_FOUND)
235240
set_target_properties(11-KafkaToWebsocket PROPERTIES OUTPUT_NAME "o2-monitoring-kafka-to-grafana")
236241
endif()
242+
####################################
243+
# Kafka protobuf deserializer
244+
####################################
245+
if(RdKafka_FOUND AND Protobuf_FOUND)
246+
set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/envs.proto)
247+
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
248+
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)
249+
set(PROTO_CPP_OUTPUT ${CMAKE_CURRENT_BINARY_DIR}/${PROTO_OUTPUT_NAME}.pb.cc)
250+
251+
add_custom_command(
252+
OUTPUT "${PROTO_CPP_OUTPUT}"
253+
COMMAND protobuf::protoc
254+
ARGS --proto_path ${PROTO_FILE_PREFIX} --cpp_out ${CMAKE_CURRENT_SOURCE_DIR}/build ${PROTO_FILE}
255+
DEPENDS ${PROTO_FILE}
256+
COMMENT "Running protoc on ${PROTO_FILE}"
257+
VERBATIM)
258+
259+
add_executable(12-KafkvaToInfluxDb
260+
examples/12-KafkaToInfluxDb.cxx
261+
${PROTO_CPP_OUTPUT})
262+
263+
target_include_directories(12-KafkvaToInfluxDb
264+
PRIVATE
265+
${CMAKE_CURRENT_BINARY_DIR})
266+
267+
target_link_libraries(12-KafkvaToInfluxDb
268+
PRIVATE
269+
Monitoring
270+
Boost::program_options
271+
protobuf::libprotobuf)
272+
endif()
237273

238274
####################################
239275
# Tests

examples/12-KafkaToInfluxDb.cxx

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
///
2+
/// \file 12-KafkaToInfluxDb.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "../src/Transports/KafkaConsumer.h"
7+
#include "../src/Transports/HTTP.h"
8+
#include "../src/Transports/WebSocket.h"
9+
10+
#include <iostream>
11+
#include <memory>
12+
#include <thread>
13+
#include <boost/algorithm/string/join.hpp>
14+
#include <boost/program_options.hpp>
15+
16+
#include "envs.pb.h"
17+
18+
using namespace o2::monitoring;
19+
20+
int main(int argc, char* argv[])
21+
{
22+
boost::program_options::options_description desc("Program options");
23+
desc.add_options()
24+
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname")
25+
("kafka-topic", boost::program_options::value<std::string>()->required(), "Kafka topic");
26+
boost::program_options::variables_map vm;
27+
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
28+
boost::program_options::notify(vm);
29+
30+
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, vm["kafka-topic"].as<std::string>());
31+
for (;;) {
32+
auto changes = kafkaConsumer->receive();
33+
if (!changes.empty()) {
34+
for (auto& change : changes) {
35+
aliceo2::envs::NewStateNotification stateChange;
36+
stateChange.ParseFromString(change);
37+
std::cout << "EnvID: " << stateChange.envinfo().environmentid() << std::endl
38+
<< "State: " << stateChange.envinfo().state() << std::endl
39+
<< "First detector: " << stateChange.envinfo().detectors().Get(0) << std::endl;
40+
}
41+
}
42+
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
43+
}
44+
}

proto/envs.proto

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
syntax = "proto3";
2+
package aliceo2.envs;
3+
option go_package = "protos;kafka";
4+
5+
message NewStateNotification {
6+
EnvInfo envInfo = 1;
7+
uint64 timestamp = 2; // ms since epoch
8+
}
9+
10+
message ActiveRunsList {
11+
repeated EnvInfo activeRuns = 1;
12+
uint64 timestamp = 2; // ms since epoch
13+
}
14+
15+
message EnvInfo {
16+
string environmentId = 1;
17+
uint32 runNumber = 2;
18+
string runType = 3;
19+
string state = 4;
20+
repeated string detectors = 5;
21+
}

0 commit comments

Comments
 (0)