Skip to content

Commit c1a64a3

Browse files
authored
Add Kafka backend (#97)
1 parent bc5ff60 commit c1a64a3

File tree

7 files changed

+199
-7
lines changed

7 files changed

+199
-7
lines changed

CMakeLists.txt

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
5353
find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system filesystem)
5454
find_package(Git QUIET)
5555
find_package(ApMon MODULE)
56-
56+
find_package(RdKafka CONFIG)
5757

5858
####################################
5959
# Library
@@ -86,6 +86,7 @@ add_library(Monitoring SHARED
8686
src/Transports/Unix.cxx
8787
src/Exceptions/MonitoringException.cxx
8888
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
89+
$<$<BOOL:${RdKafka_FOUND}>:src/Backends/Kafka.cxx>
8990
)
9091

9192
target_include_directories(Monitoring
@@ -103,13 +104,18 @@ target_link_libraries(Monitoring
103104
PRIVATE
104105
Boost::system
105106
$<$<BOOL:${ApMon_FOUND}>:ApMon::ApMon>
107+
$<$<BOOL:${RdKafka_FOUND}>:RdKafka::rdkafka++>
106108
)
107109

108110
# Handle ApMon optional dependency
109111
if(ApMon_FOUND)
110112
message(STATUS " Compiling ApMon backend")
111113
endif()
112114

115+
if(RdKafka_FOUND)
116+
message(STATUS " Compiling Kafka backend")
117+
endif()
118+
113119
# Detect operating system
114120
if (UNIX AND NOT APPLE)
115121
message(STATUS "Detected Linux: Process monitor enabled")
@@ -126,6 +132,7 @@ target_compile_definitions(Monitoring
126132
$<$<BOOL:${APPLE}>:O2_MONITORING_OS_MAC>
127133
$<$<BOOL:${LINUX}>:O2_MONITORING_OS_LINUX>
128134
$<$<BOOL:${ApMon_FOUND}>:O2_MONITORING_WITH_APPMON>
135+
$<$<BOOL:${RdKafka_FOUND}>:O2_MONITORING_WITH__WITH_KAFKA>
129136
)
130137

131138
# Use C++17

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ See the table below to find `URI`s for supported backends:
4949
| ApMon | UDP | `apmon` | - | `info` |
5050
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |
5151
| Flume | UDP | `flume` | - | `info` |
52+
| Kafka | TCP | `kafka` | - | `info` |
5253
5354
##### StdCout output format
5455
```

src/Backends/InfluxDB.cxx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ InfluxDB::InfluxDB(const std::string& host, unsigned int port) :
3939
<< " ("<< host << ":" << port << ")" << MonLogger::End();
4040
}
4141

42+
InfluxDB::InfluxDB() {}
43+
4244
InfluxDB::InfluxDB(const std::string& socketPath) :
4345
mTransport(std::make_unique<transports::Unix>(socketPath))
4446
{

src/Backends/InfluxDB.h

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,13 @@ class InfluxDB final : public Backend
4242
/// \param port InfluxDB UDP endpoint port number
4343
InfluxDB(const std::string& host, unsigned int port);
4444

45+
/// Constructor for other backends
46+
InfluxDB();
47+
4548
/// Constructor for Unix socket transport
4649
InfluxDB(const std::string& socketPath);
4750

51+
4852
/// Default destructor
4953
~InfluxDB() = default;
5054

@@ -70,18 +74,17 @@ class InfluxDB final : public Backend
7074
/// \param name tag name
7175
/// \param value tag value
7276
void addGlobalTag(std::string_view name, std::string_view value) override;
73-
77+
78+
/// Converts metric to Influx Line Protocol format
79+
/// \param metric
80+
std::string toInfluxLineProtocol(const Metric& metric);
7481
private:
7582
std::unique_ptr<transports::TransportInterface> mTransport; ///< InfluxDB transport
7683
std::string tagSet; ///< Global tagset (common for each metric)
7784

7885
/// Escapes " ", "," and "=" characters
7986
/// \param escaped string rerference to escape characters from
8087
void escape(std::string& escaped);
81-
82-
/// Converts metric to Influx Line Protocol format
83-
/// \param metric
84-
std::string toInfluxLineProtocol(const Metric& metric);
8588
};
8689

8790
} // namespace backends

src/Backends/Kafka.cxx

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
///
2+
/// \file Kafka.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "Kafka.h"
7+
#include <string>
8+
#include <boost/lexical_cast.hpp>
9+
10+
namespace o2
11+
{
12+
/// ALICE O2 Monitoring system
13+
namespace monitoring
14+
{
15+
/// Monitoring backends
16+
namespace backends
17+
{
18+
19+
Kafka::Kafka(const std::string& host, unsigned int port, const std::string& topic) :
20+
mInfluxDB(), mTopic(topic)
21+
{
22+
std::string errstr;
23+
RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
24+
conf->set("bootstrap.servers", host + ":" + std::to_string(port), errstr);
25+
conf->set("request.required.acks", "0", errstr);
26+
conf->set("message.send.max.retries", "0", errstr);
27+
conf->set("queue.buffering.max.ms", "10", errstr);
28+
conf->set("batch.num.messages", "1000", errstr);
29+
30+
producer = RdKafka::Producer::create(conf, errstr);
31+
if (!producer) {
32+
MonLogger::Get() << "Failed to create producer: " << errstr << MonLogger::End();
33+
exit(1);
34+
}
35+
36+
MonLogger::Get() << "Kafka backend initialized"
37+
<< " ("<< host << ":" << port << ")" << MonLogger::End();
38+
}
39+
40+
Kafka::~Kafka()
41+
{
42+
delete producer;
43+
}
44+
45+
inline std::string Kafka::convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp)
46+
{
47+
return std::to_string(std::chrono::duration_cast <std::chrono::nanoseconds>(
48+
timestamp.time_since_epoch()
49+
).count());
50+
}
51+
52+
void Kafka::sendMultiple(std::string /*measurement*/, std::vector<Metric>&& /*metrics*/)
53+
{
54+
}
55+
56+
void Kafka::send(std::vector<Metric>&& /*metrics*/)
57+
{
58+
}
59+
60+
void Kafka::send(const Metric& metric)
61+
{
62+
std::string influxLine = mInfluxDB.toInfluxLineProtocol(metric);
63+
int32_t partition = RdKafka::Topic::PARTITION_UA;
64+
65+
RdKafka::ErrorCode resp = producer->produce(
66+
mTopic, partition,
67+
RdKafka::Producer::RK_MSG_COPY,
68+
const_cast<char*>(influxLine.c_str()), influxLine.size(),
69+
NULL, 0,
70+
0,
71+
NULL,
72+
NULL
73+
);
74+
if (resp != RdKafka::ERR_NO_ERROR) {
75+
MonLogger::Get() << "% Produce failed: " << RdKafka::err2str(resp) << MonLogger::End();
76+
}
77+
producer->poll(0);
78+
}
79+
80+
void Kafka::addGlobalTag(std::string_view name, std::string_view value)
81+
{
82+
std::string sName = name.data();
83+
std::string sValue = value.data();
84+
if (!tagSet.empty()) tagSet += ",";
85+
tagSet += sName + "=" + sValue;
86+
}
87+
88+
} // namespace backends
89+
} // namespace monitoring
90+
} // namespace o2

src/Backends/Kafka.h

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
///
2+
/// \file Kafka.h
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#ifndef ALICEO2_MONITORING_BACKENDS_KAFKA_H
7+
#define ALICEO2_MONITORING_BACKENDS_KAFKA_H
8+
9+
#include "Monitoring/Backend.h"
10+
#include "InfluxDB.h"
11+
#include "../MonLogger.h"
12+
#include <chrono>
13+
#include <string>
14+
#include <unordered_map>
15+
#include <librdkafka/rdkafkacpp.h>
16+
17+
namespace o2
18+
{
19+
/// ALICE O2 Monitoring system
20+
namespace monitoring
21+
{
22+
/// Monitoring backends
23+
namespace backends
24+
{
25+
26+
/// \brief Backend that sends metrics to Kafka over Influx Line protocol
27+
class Kafka final : public Backend
28+
{
29+
public:
30+
/// \param host Kafka UDP endpoint hostname
31+
/// \param port Kafka UDP endpoint port number
32+
Kafka(const std::string& host, unsigned int port, const std::string& topic = "test");
33+
34+
/// Default destructor
35+
~Kafka();
36+
37+
/// Convert timestamp to unsigned long as required by Kafka
38+
/// \param timestamp chrono time_point timestamp
39+
/// \return timestamp in nanoseconds
40+
inline std::string convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp);
41+
42+
/// Sends metric to Kafka instance via one transport
43+
/// \param metric reference to metric object
44+
void send(const Metric& metric) override;
45+
46+
/// Sends multiple metrics not related to each other - NOT SUPPORTED
47+
/// \@param metrics vector of metrics
48+
void send(std::vector<Metric>&& metrics) override;
49+
50+
/// Sends multiple values in single measurement - NOT SUPPORTED
51+
/// \param measurement measurement name
52+
/// \param metrics list of metrics
53+
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;
54+
55+
/// Adds tag
56+
/// \param name tag name
57+
/// \param value tag value
58+
void addGlobalTag(std::string_view name, std::string_view value) override;
59+
60+
private:
61+
RdKafka::Producer *producer; ///< Kafka producer instance
62+
std::string tagSet; ///< Global tagset (common for each metric)
63+
InfluxDB mInfluxDB; ///< InfluxDB instance
64+
std::string mTopic; ///< Kafka topic
65+
};
66+
67+
} // namespace backends
68+
} // namespace monitoring
69+
} // namespace o2
70+
71+
#endif // ALICEO2_MONITORING_BACKENDS_KAFKA_H

src/MonitoringFactory.cxx

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,28 @@
3030
#include "Backends/ApMonBackend.h"
3131
#endif
3232

33+
#ifdef O2_MONITORING__WITH_KAFKA
34+
#include "Backends/Kafka.h"
35+
#endif
36+
3337
namespace o2
3438
{
3539
/// ALICE O2 Monitoring system
3640
namespace monitoring
3741
{
42+
#ifdef _WITH_KAFKA
43+
std::unique_ptr<Backend> getKafka(http::url uri) {
44+
if (uri.search.size() > 0) {
45+
return std::make_unique<backends::Kafka>(uri.host, uri.port, uri.search);
46+
} else {
47+
return std::make_unique<backends::Kafka>(uri.host, uri.port);
48+
}
49+
}
50+
#else
51+
std::unique_ptr<Backend> getKafka(http::url /*uri*/) {
52+
throw std::runtime_error("Kafka backend is not enabled");
53+
}
54+
#endif
3855

3956
static const std::map<std::string, Verbosity> verbosities = {
4057
{"/prod", Verbosity::Prod},
@@ -108,7 +125,8 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url) {
108125
{"influxdb-unix", getInfluxDb},
109126
{"apmon", getApMon},
110127
{"flume", getFlume},
111-
{"no-op", getNoop}
128+
{"no-op", getNoop},
129+
{"kafka", getKafka}
112130
};
113131

114132
http::url parsedUrl = http::ParseHttpUrl(url);

0 commit comments

Comments
 (0)