Skip to content

Commit 5437c91

Browse files
authored
[OMON-520] Add Kafka Consumer to transports (#276)
1 parent 14f2638 commit 5437c91

File tree

7 files changed

+196
-24
lines changed

7 files changed

+196
-24
lines changed

CMakeLists.txt

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,8 @@ add_library(Monitoring SHARED
131131
src/Transports/WebSocket.cxx
132132
src/Exceptions/MonitoringException.cxx
133133
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
134-
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>
134+
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/KafkaProducer.cxx>
135+
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/KafkaConsumer.cxx>
135136
$<$<BOOL:${CURL_FOUND}>:src/Transports/HTTP.cxx>
136137
)
137138

@@ -150,11 +151,11 @@ set_target_properties(Monitoring PROPERTIES OUTPUT_NAME "O2Monitoring")
150151
target_link_libraries(Monitoring
151152
PUBLIC
152153
Boost::boost
154+
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
153155
PRIVATE
154156
Boost::system
155157
pthread
156158
$<$<BOOL:${ApMon_FOUND}>:ApMon::ApMon>
157-
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
158159
$<$<BOOL:${CURL_FOUND}>:CURL::libcurl>
159160
$<$<BOOL:${InfoLogger_FOUND}>:AliceO2::InfoLogger>
160161
)
@@ -214,6 +215,10 @@ set(EXAMPLES
214215
examples/10-Buffering.cxx
215216
)
216217

218+
if(RdKafka_FOUND)
219+
list(APPEND EXAMPLES "examples/11-KafkaToHttp.cxx")
220+
endif()
221+
217222
foreach (example ${EXAMPLES})
218223
get_filename_component(example_name ${example} NAME)
219224
string(REGEX REPLACE ".cxx" "" example_name ${example_name})

src/MonitoringFactory.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,8 @@
3535
#endif
3636

3737
#ifdef O2_MONITORING_WITH_KAFKA
38-
#include "Transports/Kafka.h"
38+
#include "Transports/KafkaProducer.h"
39+
#include "Transports/KafkaConsumer.h"
3940
#endif
4041

4142
#ifdef O2_MONITORING_WITH_CURL
@@ -130,7 +131,7 @@ std::unique_ptr<Backend> getInfluxDb(http::url uri)
130131
}
131132
if (uri.protocol == "kafka") {
132133
#ifdef O2_MONITORING_WITH_KAFKA
133-
auto transport = std::make_unique<transports::Kafka>(uri.host, uri.port, uri.search);
134+
auto transport = std::make_unique<transports::KafkaProducer>(uri.host, uri.port, uri.search);
134135
return std::make_unique<backends::InfluxDB>(std::move(transport));
135136
#else
136137
throw MonitoringException("Factory", "Kafka transport is not enabled");

src/Transports/KafkaConsumer.cxx

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
///
13+
/// \file KafkaConsumer.cxx
14+
/// \author Adam Wegrzynek <[email protected]>
15+
///
16+
17+
#include "KafkaConsumer.h"
18+
#include <string>
19+
#include "../MonLogger.h"
20+
21+
namespace o2
22+
{
23+
/// ALICE O2 Monitoring system
24+
namespace monitoring
25+
{
26+
/// Monitoring transports
27+
namespace transports
28+
{
29+
30+
KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const std::string& topic) : mTopic(topic)
31+
{
32+
std::string errstr;
33+
std::unique_ptr<RdKafka::Conf> conf{RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)};
34+
if (conf->set("bootstrap.servers", host + ":" + std::to_string(port), errstr) != RdKafka::Conf::CONF_OK
35+
|| conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK
36+
|| conf->set("group.id", "o2-monitoring-group", errstr) != RdKafka::Conf::CONF_OK
37+
|| conf->set("auto.offset.reset", "latest", errstr) != RdKafka::Conf::CONF_OK
38+
|| conf->set("heartbeat.interval.ms", "2000", errstr) != RdKafka::Conf::CONF_OK
39+
|| conf->set("session.timeout.ms", "6000", errstr) != RdKafka::Conf::CONF_OK
40+
) {
41+
throw MonitoringException("Kafka Consumer", errstr);
42+
}
43+
44+
mConsumer = RdKafka::KafkaConsumer::create(conf.get(), errstr);
45+
if (!mConsumer) {
46+
MonLogger::Get(Severity::Warn) << "Could not initialize Kafka consumer" << MonLogger::End();
47+
}
48+
if (mConsumer->subscribe({mTopic})) {
49+
MonLogger::Get(Severity::Warn) << "Failed to subscribe to topic" << MonLogger::End();
50+
}
51+
}
52+
53+
std::vector<std::string> KafkaConsumer::receive()
54+
{
55+
std::vector<std::string> received;
56+
size_t batch_size = 5;
57+
int remaining_timeout = 1000;
58+
auto start = std::chrono::high_resolution_clock::now();
59+
60+
while (received.size() < batch_size) {
61+
std::unique_ptr<RdKafka::Message> message{mConsumer->consume(remaining_timeout)};
62+
switch (message->err()) {
63+
case RdKafka::ERR__TIMED_OUT:
64+
break;
65+
case RdKafka::ERR_NO_ERROR:
66+
received.push_back(std::string(static_cast<char*>(message->payload()), message->len()));
67+
break;
68+
default:
69+
std::cerr << "%% Consumer error: " << message->errstr() << std::endl;
70+
return received;
71+
}
72+
auto now = std::chrono::high_resolution_clock::now();
73+
remaining_timeout = std::chrono::duration_cast<std::chrono::milliseconds>(now - start).count();
74+
if (remaining_timeout > 1000) {
75+
break;
76+
}
77+
}
78+
return received;
79+
}
80+
81+
KafkaConsumer::~KafkaConsumer()
82+
{
83+
if (mConsumer) {
84+
mConsumer->close();
85+
}
86+
delete mConsumer;
87+
}
88+
89+
} // namespace transports
90+
} // namespace monitoring
91+
} // namespace o2

src/Transports/KafkaConsumer.h

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
///
13+
/// \file KafkaConsumer.h
14+
/// \author Adam Wegrzynek <[email protected]>
15+
///
16+
17+
#ifndef ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
18+
#define ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
19+
20+
#include "TransportInterface.h"
21+
22+
#include <chrono>
23+
#include <string>
24+
#include <librdkafka/rdkafkacpp.h>
25+
26+
namespace o2
27+
{
28+
/// ALICE O2 Monitoring system
29+
namespace monitoring
30+
{
31+
/// Monitoring transports
32+
namespace transports
33+
{
34+
35+
/// \brief Transport that sends string formatted metrics via Kafka
36+
class KafkaConsumer : public TransportInterface
37+
{
38+
public:
39+
/// Creates producer
40+
/// \param hostname Hostname
41+
/// \param port Port number
42+
/// \param topic Kafka topic
43+
KafkaConsumer(const std::string& host, unsigned int port, const std::string& topic);
44+
45+
/// Deletes producer
46+
~KafkaConsumer();
47+
48+
void send(std::string&&/* message*/) override {
49+
throw MonitoringException("Transport", "This transport does not implement sending");
50+
}
51+
52+
/// Sends metric via Kafka
53+
/// \param message r-value string formated
54+
std::vector<std::string> receive() override;
55+
private:
56+
/// Kafka producer instance
57+
RdKafka::KafkaConsumer* mConsumer;
58+
59+
/// Kafka topic
60+
std::string mTopic;
61+
};
62+
63+
} // namespace transports
64+
} // namespace monitoring
65+
} // namespace o2
66+
67+
#endif // ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
// or submit itself to any jurisdiction.
1111

1212
///
13-
/// \file Kafka.cxx
13+
/// \file KafkaProducer.cxx
1414
/// \author Adam Wegrzynek <[email protected]>
1515
///
1616

17-
#include "Kafka.h"
17+
#include "KafkaProducer.h"
1818
#include <string>
1919
#include "../MonLogger.h"
2020

@@ -27,35 +27,36 @@ namespace monitoring
2727
namespace transports
2828
{
2929

30-
Kafka::Kafka(const std::string& host, unsigned int port, const std::string& topic)
30+
KafkaProducer::KafkaProducer(const std::string& host, unsigned int port, const std::string& topic) : mTopic(topic)
3131
{
32-
topic.length() > 0 ? mTopic = topic : mTopic = "test";
3332
std::string errstr;
34-
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
33+
std::unique_ptr<RdKafka::Conf> conf{RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)};
3534
conf->set("bootstrap.servers", host + ":" + std::to_string(port), errstr);
3635
conf->set("request.required.acks", "0", errstr);
3736
conf->set("message.send.max.retries", "0", errstr);
3837
conf->set("queue.buffering.max.ms", "100", errstr);
3938
conf->set("batch.num.messages", "1000", errstr);
4039

41-
producer = RdKafka::Producer::create(conf, errstr);
42-
if (!producer) {
43-
MonLogger::Get(Severity::Warn) << "Coult not initialize Kafka transport" << MonLogger::End();
40+
mProducer = RdKafka::Producer::create(conf.get(), errstr);
41+
if (!mProducer) {
42+
MonLogger::Get(Severity::Warn) << "Could not initialize Kafka producer" << MonLogger::End();
4443
}
45-
4644
MonLogger::Get(Severity::Info) << "Kafka transport initialized (" << host << ":" << port << "/" << mTopic << ")" << MonLogger::End();
4745
}
4846

49-
Kafka::~Kafka()
47+
KafkaProducer::~KafkaProducer()
5048
{
51-
delete producer;
49+
if (mProducer) {
50+
mProducer->flush(250);
51+
}
52+
delete mProducer;
5253
}
5354

54-
void Kafka::send(std::string&& message)
55+
void KafkaProducer::send(std::string&& message)
5556
{
5657
int32_t partition = RdKafka::Topic::PARTITION_UA;
5758

58-
RdKafka::ErrorCode resp = producer->produce(
59+
RdKafka::ErrorCode resp = mProducer->produce(
5960
mTopic, partition,
6061
RdKafka::Producer::RK_MSG_COPY,
6162
const_cast<char*>(message.c_str()), message.size(),
@@ -66,7 +67,7 @@ void Kafka::send(std::string&& message)
6667
if (resp != RdKafka::ERR_NO_ERROR) {
6768
MonLogger::Get(Severity::Warn) << "Kafka send failed: " << RdKafka::err2str(resp) << MonLogger::End();
6869
}
69-
producer->poll(0);
70+
mProducer->poll(0);
7071
}
7172

7273
} // namespace transports
Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
// or submit itself to any jurisdiction.
1111

1212
///
13-
/// \file Kafka.h
13+
/// \file KafkaProducer.h
1414
/// \author Adam Wegrzynek <[email protected]>
1515
///
1616

@@ -33,25 +33,24 @@ namespace transports
3333
{
3434

3535
/// \brief Transport that sends string formatted metrics via Kafka
36-
class Kafka : public TransportInterface
36+
class KafkaProducer : public TransportInterface
3737
{
3838
public:
3939
/// Creates producer
4040
/// \param hostname Hostname
4141
/// \param port Port number
4242
/// \param topic Kafka topic
43-
Kafka(const std::string& host, unsigned int port, const std::string& topic = "test");
43+
KafkaProducer(const std::string& host, unsigned int port, const std::string& topic);
4444

4545
/// Deletes producer
46-
~Kafka();
46+
~KafkaProducer();
4747

4848
/// Sends metric via Kafka
4949
/// \param message r-value string formated
5050
void send(std::string&& message) override;
51-
5251
private:
5352
/// Kafka producer instance
54-
RdKafka::Producer* producer;
53+
RdKafka::Producer* mProducer;
5554

5655
/// Kafka topic
5756
std::string mTopic;

src/Transports/TransportInterface.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
#ifndef ALICEO2_MONITORING_TRANSPORTS_TRANSPORTINTERFACE_H
1818
#define ALICEO2_MONITORING_TRANSPORTS_TRANSPORTINTERFACE_H
1919

20+
#include "../Exceptions/MonitoringException.h"
2021
#include <string>
22+
#include <vector>
2123

2224
namespace o2
2325
{
@@ -39,6 +41,12 @@ class TransportInterface
3941
/// Sends metric via given transport
4042
/// \param message r-value to string formatted metric
4143
virtual void send(std::string&& message) = 0;
44+
45+
/// Receives metric via given transport
46+
/// \return List of messages
47+
virtual std::vector<std::string> receive() {
48+
throw MonitoringException("Transport", "This transport does not implement receiving metrics");
49+
}
4250
};
4351

4452
} // namespace transports

0 commit comments

Comments
 (0)