Skip to content

Commit 120be31

Browse files
authored
[OMON-584] Pass topic along with the message from Kafka consumer (#297)
1 parent 9f1cc9d commit 120be31

File tree

8 files changed

+22
-19
lines changed

8 files changed

+22
-19
lines changed

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ endif()
3131

3232
# Define project
3333
project(Monitoring
34-
VERSION 3.12.10
34+
VERSION 3.12.11
3535
DESCRIPTION "O2 Monitoring library"
3636
LANGUAGES CXX
3737
)

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ auto client = MonitoringFactory::GetPullClient("kafka-server:9092", topics);
232232
for (;;) {
233233
auto metrics = client->pull();
234234
if (!metrics.empty()) {
235-
// DO SOMETHING !
235+
/// metric.first => topic name; metric.second => metric itself
236236
}
237237
std::this_thread::sleep_for(std::chrono::milliseconds(100));
238238
}

examples/11-KafkaToWebsocket.cxx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
#include <iostream>
1111
#include <memory>
1212
#include <thread>
13-
#include <boost/algorithm/string/join.hpp>
1413
#include <boost/program_options.hpp>
1514

1615
using namespace o2::monitoring;
@@ -38,8 +37,8 @@ int main(int argc, char* argv[])
3837
});
3938
for (;;) {
4039
auto metrics = kafkaConsumer->pull();
41-
if (!metrics.empty()) {
42-
outTransport->send(boost::algorithm::join(metrics, "\n"));
40+
for (auto& metric : metrics) {
41+
outTransport->send(std::move(metric.second));
4342
}
4443
std::this_thread::sleep_for(std::chrono::milliseconds(100));
4544
}

examples/12-KafkaToInfluxDb.cxx

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ int main(int argc, char* argv[])
3030
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm);
3131
boost::program_options::notify(vm);
3232

33-
std::vector<std::string> topics = {"aliecs.env_leave_state.RUNNING"};
33+
std::vector<std::string> topics = {"aliecs.env_leave_state.RUNNING", "aliecs.env_state.RUNNING"};
3434
auto kafkaConsumer = std::make_unique<transports::KafkaConsumer>(vm["kafka-host"].as<std::string>() + ":9092", topics, "aliecs-run-times");
3535
auto httpTransport = std::make_unique<transports::HTTP>(
3636
vm["influxdb-url"].as<std::string>() + "/api/v2/write?" +
@@ -44,14 +44,17 @@ int main(int argc, char* argv[])
4444
if (!changes.empty()) {
4545
for (auto& change : changes) {
4646
aliceo2::envs::NewStateNotification stateChange;
47-
stateChange.ParseFromString(change);
47+
stateChange.ParseFromString(change.second);
4848
if (stateChange.envinfo().state().empty()) {
4949
continue;
5050
}
51-
std::cout << stateChange.envinfo().environmentid() << " (" << stateChange.envinfo().runnumber() << ") EOR: from " <<stateChange.envinfo().enterstatetimestamp() << " to " << stateChange.timestamp() << std::endl;
52-
auto metric = Metric{"run_times"}
53-
.addValue(stateChange.envinfo().enterstatetimestamp(), "sor")
54-
.addValue(stateChange.timestamp(), "eor");
51+
std::cout << stateChange.envinfo().environmentid() << "/" << stateChange.envinfo().runnumber() << " " << change.first << " SOR: " <<stateChange.envinfo().enterstatetimestamp() << " EOR: " << stateChange.timestamp() << std::endl;
52+
auto metric = Metric{"run_times"};
53+
if (change.first.find("leave") != std::string::npos) {
54+
metric.addValue(stateChange.envinfo().enterstatetimestamp(), "sor").addValue(stateChange.timestamp(), "eor");
55+
} else {
56+
metric.addValue(stateChange.envinfo().runtype(), "type");
57+
}
5558
int run = stateChange.envinfo().runnumber();
5659
if (run > 1) {
5760
influxdbBackend->sendWithRun(metric, stateChange.envinfo().environmentid(), std::to_string(run));

examples/8-KafkaToHttpServer.cxx

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -231,12 +231,12 @@ int main(int argc, char* argv[]) {
231231
});
232232

233233
auto kafkaConsumer = std::make_unique<o2::monitoring::transports::KafkaConsumer>(
234-
vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}, "kafka-aliecs"
234+
vm["kafka-host"].as<std::string>() + ":9092", std::vector<std::string>{vm["kafka-topics"].as<std::vector<std::string>>()}, "kafka-aliecs-active-envs"
235235
);
236236
for (;;) {
237237
auto serializedRuns = kafkaConsumer->pull();
238238
if (!serializedRuns.empty()) {
239-
deserializeActiveRuns(serializedRuns.back());
239+
deserializeActiveRuns(serializedRuns.back().second);
240240
}
241241
std::this_thread::sleep_for(std::chrono::milliseconds(500));
242242
}

include/Monitoring/PullClient.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class PullClient
3535
public:
3636
/// Default destructor
3737
virtual ~PullClient() = default;
38-
virtual std::vector<std::string> pull() = 0;
38+
virtual std::vector<std::pair<std::string, std::string>> pull() = 0;
3939
};
4040

4141
} // namespace monitoring

src/Transports/KafkaConsumer.cxx

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,9 @@ KafkaConsumer::KafkaConsumer(const std::string& url, const std::vector<std::stri
5151
}
5252
}
5353

54-
std::vector<std::string> KafkaConsumer::pull()
54+
std::vector<std::pair<std::string, std::string>> KafkaConsumer::pull()
5555
{
56-
std::vector<std::string> received;
56+
std::vector<std::pair<std::string, std::string>> received;
5757
size_t batch_size = 5;
5858
int remaining_timeout = 1000;
5959
auto start = std::chrono::high_resolution_clock::now();
@@ -64,7 +64,7 @@ std::vector<std::string> KafkaConsumer::pull()
6464
case RdKafka::ERR__TIMED_OUT:
6565
break;
6666
case RdKafka::ERR_NO_ERROR:
67-
received.push_back(std::string(static_cast<char*>(message->payload()), message->len()));
67+
received.push_back({message->topic_name(), std::string(static_cast<char*>(message->payload()), message->len())});
6868
break;
6969
default:
7070
std::cerr << "%% Consumer error: " << message->errstr() << std::endl;

src/Transports/KafkaConsumer.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,10 @@ class KafkaConsumer : public PullClient
4545
/// Deletes producer
4646
~KafkaConsumer();
4747

48-
/// Sends metric via Kafka
48+
/// Pulls metrics from Kafka
4949
/// \param message r-value string formated
50-
std::vector<std::string> pull() override;
50+
/// \returns Key-Value pair list as topic : metric
51+
std::vector<std::pair<std::string, std::string>> pull() override;
5152
private:
5253
/// Kafka producer instance
5354
RdKafka::KafkaConsumer* mConsumer;

0 commit comments

Comments
 (0)