Skip to content

Commit 325d3ed

Browse files
authored
Use Kafka as Influx Line Protocol transport (#187)
1 parent 841ce6e commit 325d3ed

File tree

9 files changed

+163
-189
lines changed

9 files changed

+163
-189
lines changed

CMakeLists.txt

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ add_library(Monitoring SHARED
105105
src/Transports/StdOut.cxx
106106
src/Exceptions/MonitoringException.cxx
107107
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
108-
$<$<BOOL:${RdKafka_FOUND}>:src/Backends/Kafka.cxx>
108+
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>
109109
)
110110

111111
target_include_directories(Monitoring
@@ -136,7 +136,7 @@ if(ApMon_FOUND)
136136
endif()
137137

138138
if(RdKafka_FOUND)
139-
message(STATUS " Compiling Kafka backend")
139+
message(STATUS " Compiling Kafka transport")
140140
endif()
141141

142142
# Detect operating system

README.md

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -41,14 +41,15 @@ std::unique_ptr<Monitoring> monitoring = MonitoringFactory::Get("backend[-protoc
4141
4242
See the table below to find `URI`s for supported backends:
4343
44-
| Backend name | Transport | URI backend[-protocol] | URI query | Default verbosity |
45-
| ------------ |:-----------:|:----------------------:|:----------:| -----------------:|
46-
| No-op | - | `no-op` | | - |
47-
| InfluxDB | UDP | `influxdb-udp` | - | `info` |
48-
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
49-
| ApMon | UDP | `apmon` | - | `info` |
50-
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |
51-
| Kafka | TCP | `kafka` | - | `info` |
44+
| Backend name | Transport | URI backend[-protocol] | URI query | Default verbosity |
45+
| ------------ |:-----------:|:----------------------:|:-----------:| -----------------:|
46+
| No-op | - | `no-op` | | - |
47+
| InfluxDB | UDP | `influxdb-udp` | - | `info` |
48+
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
49+
| InfluxDB | StdOut | `influxdb-stdout` | - | `info` |
50+
| InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` |
51+
| ApMon | UDP | `apmon` | - | `info` |
52+
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |
5253
5354
##### StdCout output format
5455
```

src/Backends/InfluxDB.cxx

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,6 @@ InfluxDB::InfluxDB(std::unique_ptr<transports::TransportInterface> transport)
4242
MonLogger::Get() << "InfluxDB backend initialized" << MonLogger::End();
4343
}
4444

45-
InfluxDB::InfluxDB() {}
46-
4745
inline unsigned long InfluxDB::convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp)
4846
{
4947
return std::chrono::duration_cast<std::chrono::nanoseconds>(

src/Backends/InfluxDB.h

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,6 @@ class InfluxDB final : public Backend
4141
/// \param transport Any available transport (udp, unix, kafka)
4242
InfluxDB(std::unique_ptr<transports::TransportInterface> transport);
4343

44-
/// Constructor for other backends
45-
InfluxDB();
46-
4744
/// Default destructor
4845
~InfluxDB() = default;
4946

src/Backends/Kafka.cxx

Lines changed: 0 additions & 85 deletions
This file was deleted.

src/Backends/Kafka.h

Lines changed: 0 additions & 70 deletions
This file was deleted.

src/MonitoringFactory.cxx

Lines changed: 13 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -33,29 +33,14 @@
3333
#endif
3434

3535
#ifdef O2_MONITORING_WITH_KAFKA
36-
#include "Backends/Kafka.h"
36+
#include "Transports/Kafka.h"
3737
#endif
3838

3939
namespace o2
4040
{
4141
/// ALICE O2 Monitoring system
4242
namespace monitoring
4343
{
44-
#ifdef O2_MONITORING_WITH_KAFKA
45-
std::unique_ptr<Backend> getKafka(http::url uri)
46-
{
47-
if (uri.search.size() > 0) {
48-
return std::make_unique<backends::Kafka>(uri.host, uri.port, uri.search);
49-
} else {
50-
return std::make_unique<backends::Kafka>(uri.host, uri.port);
51-
}
52-
}
53-
#else
54-
std::unique_ptr<Backend> getKafka(http::url /*uri*/)
55-
{
56-
throw std::runtime_error("Kafka backend is not enabled");
57-
}
58-
#endif
5944

6045
static const std::map<std::string, Verbosity> verbosities = {
6146
{"/prod", Verbosity::Prod},
@@ -95,7 +80,15 @@ std::unique_ptr<Backend> getInfluxDb(http::url uri)
9580
auto transport = std::make_unique<transports::StdOut>();
9681
return std::make_unique<backends::InfluxDB>(std::move(transport));
9782
}
98-
throw std::runtime_error("InfluxDB transport protocol not supported");
83+
if (uri.protocol == "kafka") {
84+
#ifdef O2_MONITORING_WITH_KAFKA
85+
auto transport = std::make_unique<transports::Kafka>(uri.host, uri.port, uri.search);
86+
return std::make_unique<backends::InfluxDB>(std::move(transport));
87+
#else
88+
throw std::runtime_error("Kafka transport is not enabled");
89+
#endif
90+
}
91+
throw std::runtime_error("InfluxDB transport not supported: " + uri.protocol);
9992
}
10093

10194
#ifdef O2_MONITORING_WITH_APPMON
@@ -135,9 +128,10 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url)
135128
{"influxdb-udp", getInfluxDb},
136129
{"influxdb-unix", getInfluxDb},
137130
{"influxdb-stdout", getInfluxDb},
131+
{"influxdb-kafka", getInfluxDb},
138132
{"apmon", getApMon},
139-
{"no-op", getNoop},
140-
{"kafka", getKafka}};
133+
{"no-op", getNoop}
134+
};
141135

142136
http::url parsedUrl = http::ParseHttpUrl(url);
143137
if (parsedUrl.protocol.empty()) {

src/Transports/Kafka.cxx

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
// Copyright CERN and copyright holders of ALICE O2. This software is
2+
// distributed under the terms of the GNU General Public License v3 (GPL
3+
// Version 3), copied verbatim in the file "COPYING".
4+
//
5+
// See http://alice-o2.web.cern.ch/license for full licensing information.
6+
//
7+
// In applying this license CERN does not waive the privileges and immunities
8+
// granted to it by virtue of its status as an Intergovernmental Organization
9+
// or submit itself to any jurisdiction.
10+
11+
///
12+
/// \file Kafka.cxx
13+
/// \author Adam Wegrzynek <[email protected]>
14+
///
15+
16+
#include "Kafka.h"
17+
#include <string>
18+
#include "../MonLogger.h"
19+
20+
namespace o2
21+
{
22+
/// ALICE O2 Monitoring system
23+
namespace monitoring
24+
{
25+
/// Monitoring transports
26+
namespace transports
27+
{
28+
29+
Kafka::Kafka(const std::string& host, unsigned int port, const std::string& topic)
30+
{
31+
topic.length() > 0 ? mTopic = topic : mTopic = "test";
32+
std::string errstr;
33+
RdKafka::Conf* conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
34+
conf->set("bootstrap.servers", host + ":" + std::to_string(port), errstr);
35+
conf->set("request.required.acks", "0", errstr);
36+
conf->set("message.send.max.retries", "0", errstr);
37+
conf->set("queue.buffering.max.ms", "100", errstr);
38+
conf->set("batch.num.messages", "1000", errstr);
39+
40+
producer = RdKafka::Producer::create(conf, errstr);
41+
if (!producer) {
42+
throw std::runtime_error("Failed to create Kafka producer: " + errstr);
43+
}
44+
45+
MonLogger::Get() << "Kafka transport initialized (" << host << ":" << port << "/" << mTopic << ")" << MonLogger::End();
46+
}
47+
48+
Kafka::~Kafka()
49+
{
50+
delete producer;
51+
}
52+
53+
void Kafka::send(std::string&& message)
54+
{
55+
int32_t partition = RdKafka::Topic::PARTITION_UA;
56+
57+
RdKafka::ErrorCode resp = producer->produce(
58+
mTopic, partition,
59+
RdKafka::Producer::RK_MSG_COPY,
60+
const_cast<char*>(message.c_str()), message.size(),
61+
NULL, 0,
62+
0,
63+
NULL,
64+
NULL);
65+
if (resp != RdKafka::ERR_NO_ERROR) {
66+
MonLogger::Get() << "Kafka send failed: " << RdKafka::err2str(resp) << MonLogger::End();
67+
}
68+
producer->poll(0);
69+
}
70+
71+
} // namespace transports
72+
} // namespace monitoring
73+
} // namespace o2

0 commit comments

Comments
 (0)