22// / \file 12-KafkaToInfluxDb.cxx
33// / \author Adam Wegrzynek <[email protected] >44// /
5-
65#include " ../src/Transports/KafkaConsumer.h"
76#include " ../src/Backends/InfluxDB.h"
87#include " ../src/Transports/HTTP.h"
@@ -23,7 +22,6 @@ int main(int argc, char* argv[])
2322 boost::program_options::options_description desc (" Program options" );
2423 desc.add_options ()
2524 (" kafka-host" , boost::program_options::value<std::string>()->required (), " Kafka broker hostname" )
26- (" kafka-topics" , boost::program_options::value<std::vector<std::string>>()->multitoken ()->required (), " Kafka topics" )
2725 (" influxdb-host" , boost::program_options::value<std::string>()->required (), " InfluxDB hostname" )
2826 (" influxdb-token" , boost::program_options::value<std::string>()->required (), " InfluxDB token" )
2927 (" influxdb-org" , boost::program_options::value<std::string>()->default_value (" cern" ), " InfluxDB organisation" )
@@ -32,8 +30,8 @@ int main(int argc, char* argv[])
3230 boost::program_options::store (boost::program_options::parse_command_line (argc, argv, desc), vm);
3331 boost::program_options::notify (vm);
3432
35- std::vector<std::string> topics = vm[ " kafka-topics " ]. as <std::vector<std::string>>() ;
36- auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm[" kafka-host" ].as <std::string>() + " :9092" , topics, " kafka-to-influxdb " );
33+ std::vector<std::string> topics = { " aliecs.env_leave_state.RUNNING " } ;
34+ auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm[" kafka-host" ].as <std::string>() + " :9092" , topics, " aliecs-run-times " );
3735 auto httpTransport = std::make_unique<transports::HTTP>(
3836 " http://" + vm[" influxdb-host" ].as <std::string>() + " :8086/api/v2/write?" +
3937 " org=" + vm[" influxdb-org" ].as <std::string>() + " &" +
@@ -50,19 +48,14 @@ int main(int argc, char* argv[])
5048 if (stateChange.envinfo ().state ().empty ()) {
5149 continue ;
5250 }
53- std::cout << stateChange.envinfo ().environmentid () << " => " << stateChange.envinfo ().state ()
54- << " (" << stateChange.envinfo ().runnumber () << " )" << std::endl;
55- auto metric = Metric{" env_info" }.addValue (stateChange.envinfo ().state (), " state" );
56- auto detectorsProto = stateChange.envinfo ().detectors ();
57- std::vector<std::string> detectors (detectorsProto.begin (), detectorsProto.end ());
58- if (detectors.size () > 0 ) {
59- metric.addValue (boost::algorithm::join (detectors, " " ), " detectors" );
60- }
51+ std::cout << stateChange.envinfo ().environmentid () << " (" << stateChange.envinfo ().runnumber () << " ) EOR: from " <<stateChange.envinfo ().enterstatetimestamp () << " to " << stateChange.timestamp () << std::endl;
52+ auto metric = Metric{" run_times" }
53+ .addValue (stateChange.envinfo ().enterstatetimestamp (), " sor" )
54+ .addValue (stateChange.timestamp (), " eor" );
6155 int run = stateChange.envinfo ().runnumber ();
6256 if (run > 1 ) {
63- metric. addValue (run, " run" );
57+ influxdbBackend-> sendWithRun ( metric, stateChange. envinfo (). environmentid (), std::to_string ( run) );
6458 }
65- influxdbBackend->sendWithId (metric, stateChange.envinfo ().environmentid ());
6659 }
6760 }
6861 std::this_thread::sleep_for (std::chrono::milliseconds (1000 ));
0 commit comments