Skip to content

Commit 0224701

Browse files
authored
[OMON-532] Document connecting to metric stream using library (#290)
1 parent 5a39026 commit 0224701

File tree

13 files changed

+129
-66
lines changed

13 files changed

+129
-66
lines changed

README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,3 +190,50 @@ Metric::setVerbosityPolicy(Verbosity verbosity, const std::regex& regex)
190190

191191
## System monitoring, server-side backends installation and configuration
192192
This guide explains manual installation. For `ansible` deployment see [AliceO2Group/system-configuration](https://gitlab.cern.ch/AliceO2Group/system-configuration/tree/master/ansible) gitlab repo.
193+
194+
---
195+
196+
## Receiving metrics from Monitoring system (development instructions)
197+
198+
### Compile Monitoring library with Kafka backend
199+
- Install `boost`
200+
- Compile `librdkafka`
201+
```bash
202+
git clone -b v1.8.2 https://github.com/edenhill/librdkafka && cd librdkafka
203+
cmake -H. -B./_cmake_build -DENABLE_LZ4_EXT=OFF -DCMAKE_INSTALL_LIBDIR=lib -DRDKAFKA_BUILD_TESTS=OFF -DRDKAFKA_BUILD_EXAMPLES=OFF -DCMAKE_INSTALL_PREFIX=~/librdkafka_install
204+
cmake --build ./_cmake_build --target install -j
205+
```
206+
- Compile Monitoring library, make sure to define `RdKafka_DIR` and point to CMake config directory:
207+
```bash
208+
git clone https://github.com/AliceO2Group/Monitoring && cd Monitoring
209+
cmake -H. -B./_cmake_build -DRdKafka_DIR=~/librdkafka_install/lib/cmake/RdKafka/ -DCMAKE_INSTALL_PREFIX=~/Monitoring_install
210+
cmake --build ./_cmake_build --target install -j
211+
```
212+
213+
### Look for Monitoring library in your CMake
214+
As `librdkafka` is optional dependency of Monitoring it is not handled by CMakeConfig, therefore you need:
215+
```cmake
216+
find_package(RdKafka CONFIG REQUIRED)
217+
find_package(Monitoring CONFIG REQUIRED)
218+
```
219+
220+
And then, link against `AliceO2::Monitoring` target.
221+
222+
### Connect to Monitoring server
223+
```cpp
224+
#include "Monitoring/MonitoringFactory.h"
225+
...
226+
227+
std::vector<std::string> topics = {"topic-to-subscribe"};
228+
auto client = MonitoringFactory::GetPullClient("kafka-server:9092", topics);
229+
for (;;) {
230+
auto metrics = client->pull();
231+
if (!metrics.empty()) {
232+
// DO SOMETHING !
233+
}
234+
std::this_thread::sleep_for(std::chrono::milliseconds(100));
235+
}
236+
```
237+
238+
### Data format
239+
Native data format is [Influx Line Protocol](https://docs.influxdata.com/influxdb/latest/reference/syntax/line-protocol/) but metrics can be converted into any format listed in here: https://docs.influxdata.com/telegraf/latest/data_formats/output/

doc/DPL.md

Lines changed: 0 additions & 29 deletions
This file was deleted.

examples/11-KafkaToWebsocket.cxx

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
/// \author Adam Wegrzynek <[email protected]>
44
///
55

6-
#include "../src/Transports/KafkaConsumer.h"
6+
#include "Monitoring/MonitoringFactory.h"
77
#include "../src/Transports/HTTP.h"
88
#include "../src/Transports/WebSocket.h"
99

@@ -27,7 +27,8 @@ int main(int argc, char* argv[])
2727
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
2828
boost::program_options::notify(vm);
2929

30-
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()});
30+
31+
auto kafkaConsumer = MonitoringFactory::GetPullClient(vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()});
3132
auto outTransport = std::make_unique<transports::WebSocket>(vm["grafana-host"].as<std::string>(), 3000, vm["grafana-key"].as<std::string>(), "alice_o2");
3233
std::thread readThread([&outTransport](){
3334
for (;;) {
@@ -36,7 +37,7 @@ int main(int argc, char* argv[])
3637
}
3738
});
3839
for (;;) {
39-
auto metrics = kafkaConsumer->receive();
40+
auto metrics = kafkaConsumer->pull();
4041
if (!metrics.empty()) {
4142
outTransport->send(boost::algorithm::join(metrics, "\n"));
4243
}

examples/12-KafkaToInfluxDb.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ int main(int argc, char* argv[])
3333
boost::program_options::notify(vm);
3434

3535
std::vector<std::string> topics = vm["kafka-topics"].as<std::vector<std::string>>();
36-
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, topics);
36+
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>() + ":9092", topics, "kafka-to-influxdb");
3737
auto httpTransport = std::make_unique<transports::HTTP>(
3838
"http://" + vm["influxdb-host"].as<std::string>() + ":8086/api/v2/write?" +
3939
"org=" + vm["influxdb-org"].as<std::string>() + "&" +
@@ -42,7 +42,7 @@ int main(int argc, char* argv[])
4242
httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
4343
auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport));
4444
for (;;) {
45-
auto changes = kafkaConsumer->receive();
45+
auto changes = kafkaConsumer->pull();
4646
if (!changes.empty()) {
4747
for (auto& change : changes) {
4848
aliceo2::envs::NewStateNotification stateChange;

examples/8-KafkaToHttpServer.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,10 +231,10 @@ int main(int argc, char* argv[]) {
231231
});
232232

233233
auto kafkaConsumer = std::make_unique<o2::monitoring::transports::KafkaConsumer>(
234-
vm["kafka-host"].as<std::string>(), 9092, std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}
234+
vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}, "kafka-aliecs"
235235
);
236236
for (;;) {
237-
auto serializedRuns = kafkaConsumer->receive();
237+
auto serializedRuns = kafkaConsumer->pull();
238238
if (!serializedRuns.empty()) {
239239
deserializeActiveRuns(serializedRuns.back());
240240
}

include/Monitoring/MonitoringFactory.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#define ALICEO2_MONITORING_FACTORY_H
1919

2020
#include "Monitoring/Monitoring.h"
21+
#include "Monitoring/PullClient.h"
2122

2223
namespace o2
2324
{
@@ -42,7 +43,7 @@ class MonitoringFactory
4243
/// \return monitoring backend
4344
/// \throw MonitoringException when backend initialisation failed
4445
static std::unique_ptr<Backend> GetBackend(std::string& url);
45-
46+
static std::unique_ptr<PullClient> GetPullClient(const std::string &url, const std::vector<std::string>& topics, const std::string &label = "o2-monitoring-group");
4647
private:
4748
/// Private constructor disallows to create instance of Factory
4849
MonitoringFactory() = default;

include/Monitoring/PullClient.h

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
///
13+
/// \file PullClient.h
14+
/// \author Adam Wegrzynek <[email protected]>
15+
///
16+
17+
#ifndef ALICEO2_MONITORING_CORE_PULLCLIENT_H
18+
#define ALICEO2_MONITORING_CORE_PULLCLIENT_H
19+
20+
#include <vector>
21+
#include <string>
22+
23+
namespace o2
24+
{
25+
/// ALICE O2 Monitoring system
26+
namespace monitoring
27+
{
28+
29+
/// \brief PullClient pure virtual interface
30+
///
31+
/// Interface that allows to send a metric to remote backend.
32+
/// In addition, default tagset (for all handled metrics) can be created.
33+
class PullClient
34+
{
35+
public:
36+
/// Default destructor
37+
virtual ~PullClient() = default;
38+
virtual std::vector<std::string> pull() = 0;
39+
};
40+
41+
} // namespace monitoring
42+
} // namespace o2
43+
44+
#endif // ALICEO2_MONITORING_CORE_PULLCLIENT_H

src/MonitoringFactory.cxx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -219,6 +219,15 @@ std::unique_ptr<Monitoring> MonitoringFactory::Get(std::string urlsString)
219219
}
220220
return monitoring;
221221
}
222+
#ifdef O2_MONITORING_WITH_KAFKA
223+
std::unique_ptr<PullClient> MonitoringFactory::GetPullClient(const std::string &url, const std::vector<std::string>& topics, const std::string &label) {
224+
auto client = std::make_unique<transports::KafkaConsumer>(url, topics, label);
225+
return client;
226+
#else
227+
std::unique_ptr<PullClient> MonitoringFactory::GetPullClient(const std::string&, const std::vector<std::string>&, const std::string&) {
228+
throw MonitoringException("Factory", "Compile library with Kafka");
229+
#endif
230+
}
222231

223232
} // namespace monitoring
224233
} // namespace o2

src/Transports/KafkaConsumer.cxx

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "KafkaConsumer.h"
1818
#include <string>
1919
#include "../MonLogger.h"
20+
#include "../Exceptions/MonitoringException.h"
2021

2122
namespace o2
2223
{
@@ -27,13 +28,13 @@ namespace monitoring
2728
namespace transports
2829
{
2930

30-
KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const std::vector<std::string>& topics) : mTopics(topics)
31+
KafkaConsumer::KafkaConsumer(const std::string& url, const std::vector<std::string>& topics, const std::string& groupId) : mTopics(topics)
3132
{
3233
std::string errstr;
3334
std::unique_ptr<RdKafka::Conf> conf{RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)};
34-
if (conf->set("bootstrap.servers", host + ":" + std::to_string(port), errstr) != RdKafka::Conf::CONF_OK
35+
if (conf->set("bootstrap.servers", url, errstr) != RdKafka::Conf::CONF_OK
3536
|| conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK
36-
|| conf->set("group.id", "o2-monitoring-group", errstr) != RdKafka::Conf::CONF_OK
37+
|| conf->set("group.id", groupId, errstr) != RdKafka::Conf::CONF_OK
3738
|| conf->set("auto.offset.reset", "latest", errstr) != RdKafka::Conf::CONF_OK
3839
|| conf->set("heartbeat.interval.ms", "2000", errstr) != RdKafka::Conf::CONF_OK
3940
|| conf->set("session.timeout.ms", "6000", errstr) != RdKafka::Conf::CONF_OK
@@ -50,7 +51,7 @@ KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const s
5051
}
5152
}
5253

53-
std::vector<std::string> KafkaConsumer::receive()
54+
std::vector<std::string> KafkaConsumer::pull()
5455
{
5556
std::vector<std::string> received;
5657
size_t batch_size = 5;

src/Transports/KafkaConsumer.h

Lines changed: 10 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@
1414
/// \author Adam Wegrzynek <[email protected]>
1515
///
1616

17-
#ifndef ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
18-
#define ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
17+
#ifndef ALICEO2_MONITORING_TRANSPORTS_KAFKACONSUMER_H
18+
#define ALICEO2_MONITORING_TRANSPORTS_KAFKACONSUMER_H
1919

20-
#include "TransportInterface.h"
20+
#include "Monitoring/PullClient.h"
2121

2222
#include <chrono>
2323
#include <string>
@@ -33,25 +33,21 @@ namespace transports
3333
{
3434

3535
/// \brief Transport that sends string formatted metrics via Kafka
36-
class KafkaConsumer : public TransportInterface
36+
class KafkaConsumer : public PullClient
3737
{
3838
public:
3939
/// Creates producer
40-
/// \param hostname Hostname
41-
/// \param port Port number
42-
/// \param topic Kafka topic
43-
KafkaConsumer(const std::string& host, unsigned int port, const std::vector<std::string>& topic);
40+
/// \param url Broker URL (host:port)
41+
/// \param topics Kafka topics to subscribe to
42+
/// \param groupId Kafka consumer group id
43+
KafkaConsumer(const std::string& url, const std::vector<std::string>& topics, const std::string& groupId);
4444

4545
/// Deletes producer
4646
~KafkaConsumer();
4747

48-
void send(std::string&&/* message*/) override {
49-
throw MonitoringException("Transport", "This transport does not implement sending");
50-
}
51-
5248
/// Sends metric via Kafka
5349
/// \param message r-value string formated
54-
std::vector<std::string> receive() override;
50+
std::vector<std::string> pull() override;
5551
private:
5652
/// Kafka producer instance
5753
RdKafka::KafkaConsumer* mConsumer;
@@ -64,4 +60,4 @@ class KafkaConsumer : public TransportInterface
6460
} // namespace monitoring
6561
} // namespace o2
6662

67-
#endif // ALICEO2_MONITORING_TRANSPORTS_KAFKA_H
63+
#endif // ALICEO2_MONITORING_TRANSPORTS_KAFKACONSUMER_H

0 commit comments

Comments
 (0)