Skip to content

Commit 789be01

Browse files
authored
[OMON-539] Send AliECS state transitions to InfluxDB in example (#284)
1 parent a9c4b76 commit 789be01

File tree

7 files changed

+52
-14
lines changed

7 files changed

+52
-14
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ endif()
242242
####################################
243243
# Kafka protobuf deserializer
244244
####################################
245-
if(RdKafka_FOUND AND Protobuf_FOUND)
245+
if(RdKafka_FOUND AND Protobuf_FOUND AND CURL_FOUND)
246246
set(PROTO_FILE ${CMAKE_CURRENT_SOURCE_DIR}/proto/envs.proto)
247247
get_filename_component(PROTO_OUTPUT_NAME ${PROTO_FILE} NAME_WE)
248248
get_filename_component(PROTO_FILE_PREFIX ${PROTO_FILE} PATH)

examples/11-KafkaToWebsocket.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,14 @@ int main(int argc, char* argv[])
2020
boost::program_options::options_description desc("Program options");
2121
desc.add_options()
2222
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname")
23-
("kafka-topic", boost::program_options::value<std::string>()->required(), "Kafka topic")
23+
("kafka-topics", boost::program_options::value<std::vector<std::string>>()->multitoken()->required(), "Kafka topics")
2424
("grafana-host", boost::program_options::value<std::string>()->required(), "Grafana hostname")
2525
("grafana-key", boost::program_options::value<std::string>()->required(), "Grafana API key");
2626
boost::program_options::variables_map vm;
2727
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
2828
boost::program_options::notify(vm);
2929

30-
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, vm["kafka-topic"].as<std::string>());
30+
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()});
3131
auto outTransport = std::make_unique<transports::WebSocket>(vm["grafana-host"].as<std::string>(), 3000, vm["grafana-key"].as<std::string>(), "alice_o2");
3232
std::thread readThread([&outTransport](){
3333
for (;;) {

examples/12-KafkaToInfluxDb.cxx

Lines changed: 33 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44
///
55

66
#include "../src/Transports/KafkaConsumer.h"
7+
#include "../src/Backends/InfluxDB.h"
78
#include "../src/Transports/HTTP.h"
8-
#include "../src/Transports/WebSocket.h"
99

1010
#include <iostream>
1111
#include <memory>
1212
#include <thread>
13-
#include <boost/algorithm/string/join.hpp>
1413
#include <boost/program_options.hpp>
14+
#include <boost/algorithm/string/join.hpp>
15+
#include <boost/algorithm/string.hpp>
1516

1617
#include "envs.pb.h"
1718

@@ -22,21 +23,46 @@ int main(int argc, char* argv[])
2223
boost::program_options::options_description desc("Program options");
2324
desc.add_options()
2425
("kafka-host", boost::program_options::value<std::string>()->required(), "Kafka broker hostname")
25-
("kafka-topic", boost::program_options::value<std::string>()->required(), "Kafka topic");
26+
("kafka-topics", boost::program_options::value<std::vector<std::string>>()->multitoken()->required(), "Kafka topics")
27+
("influxdb-host", boost::program_options::value<std::string>()->required(), "InfluxDB hostname")
28+
("influxdb-token", boost::program_options::value<std::string>()->required(), "InfluxDB token")
29+
("influxdb-org", boost::program_options::value<std::string>()->default_value("cern"), "InfluxDB organisation")
30+
("influxdb-bucket", boost::program_options::value<std::string>()->default_value("aliecs"), "InfluxDB bucket");
2631
boost::program_options::variables_map vm;
2732
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
2833
boost::program_options::notify(vm);
2934

30-
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, vm["kafka-topic"].as<std::string>());
35+
std::vector<std::string> topics = vm["kafka-topics"].as<std::vector<std::string>>();
36+
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>(), 9092, topics);
37+
auto httpTransport = std::make_unique<transports::HTTP>(
38+
"http://" + vm["influxdb-host"].as<std::string>() + ":8086/api/v2/write?" +
39+
"org=" + vm["influxdb-org"].as<std::string>() + "&" +
40+
"bucket=" + vm["influxdb-bucket"].as<std::string>()
41+
);
42+
httpTransport->addHeader("Authorization: Token " + vm["influxdb-token"].as<std::string>());
43+
auto influxdbBackend = std::make_unique<backends::InfluxDB>(std::move(httpTransport));
3144
for (;;) {
3245
auto changes = kafkaConsumer->receive();
3346
if (!changes.empty()) {
3447
for (auto& change : changes) {
3548
aliceo2::envs::NewStateNotification stateChange;
3649
stateChange.ParseFromString(change);
37-
std::cout << "EnvID: " << stateChange.envinfo().environmentid() << std::endl
38-
<< "State: " << stateChange.envinfo().state() << std::endl
39-
<< "First detector: " << stateChange.envinfo().detectors().Get(0) << std::endl;
50+
if (stateChange.envinfo().state().empty()) {
51+
continue;
52+
}
53+
std::cout << stateChange.envinfo().environmentid() << " => " << stateChange.envinfo().state()
54+
<< " (" << stateChange.envinfo().runnumber() << ")" << std::endl;
55+
auto metric = Metric{"env_info"}.addValue(stateChange.envinfo().state(), "state");
56+
auto detectorsProto = stateChange.envinfo().detectors();
57+
std::vector<std::string> detectors(detectorsProto.begin(), detectorsProto.end());
58+
if (detectors.size() > 0) {
59+
metric.addValue(boost::algorithm::join(detectors, " "), "detectors");
60+
}
61+
int run = stateChange.envinfo().runnumber();
62+
if (run > 1) {
63+
metric.addValue(run, "run");
64+
}
65+
influxdbBackend->sendWithId(metric, stateChange.envinfo().environmentid());
4066
}
4167
}
4268
std::this_thread::sleep_for(std::chrono::milliseconds(1000));

src/Backends/InfluxDB.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ void InfluxDB::send(const Metric& metric)
7878
mTransport->send(toInfluxLineProtocol(metric));
7979
}
8080

81+
void InfluxDB::sendWithId(const Metric& metric, const std::string& id)
82+
{
83+
auto serialized = toInfluxLineProtocol(metric);
84+
serialized.insert(serialized.find(',') + 1, "id=" + id);
85+
mTransport->send(std::move(serialized));
86+
}
87+
8188
std::string InfluxDB::toInfluxLineProtocol(const Metric& metric)
8289
{
8390
std::stringstream convert;

src/Backends/InfluxDB.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,11 @@ class InfluxDB final : public Backend
5454
/// \param metric reference to metric object
5555
void send(const Metric& metric) override;
5656

57+
/// Sends metric with ID tag
58+
/// \param metric reference to metric object
59+
/// \param id ID tag value
60+
void sendWithId(const Metric& metric, const std::string& id);
61+
5762
/// Sends multiple metrics not related to each other
5863
/// \@param metrics vector of metrics
5964
void send(std::vector<Metric>&& metrics) override;

src/Transports/KafkaConsumer.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ namespace monitoring
2727
namespace transports
2828
{
2929

30-
KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const std::string& topic) : mTopic(topic)
30+
KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const std::vector<std::string>& topics) : mTopics(topics)
3131
{
3232
std::string errstr;
3333
std::unique_ptr<RdKafka::Conf> conf{RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL)};
@@ -45,7 +45,7 @@ KafkaConsumer::KafkaConsumer(const std::string& host, unsigned int port, const s
4545
if (!mConsumer) {
4646
MonLogger::Get(Severity::Error) << "Could not initialize Kafka consumer" << MonLogger::End();
4747
}
48-
if (mConsumer->subscribe({mTopic})) {
48+
if (mConsumer->subscribe(mTopics)) {
4949
MonLogger::Get(Severity::Warn) << "Failed to subscribe to topic" << MonLogger::End();
5050
}
5151
}

src/Transports/KafkaConsumer.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class KafkaConsumer : public TransportInterface
4040
/// \param hostname Hostname
4141
/// \param port Port number
4242
/// \param topic Kafka topic
43-
KafkaConsumer(const std::string& host, unsigned int port, const std::string& topic);
43+
KafkaConsumer(const std::string& host, unsigned int port, const std::vector<std::string>& topic);
4444

4545
/// Deletes producer
4646
~KafkaConsumer();
@@ -57,7 +57,7 @@ class KafkaConsumer : public TransportInterface
5757
RdKafka::KafkaConsumer* mConsumer;
5858

5959
/// Kafka topic
60-
std::string mTopic;
60+
std::vector<std::string> mTopics;
6161
};
6262

6363
} // namespace transports

0 commit comments

Comments
 (0)