Skip to content

Commit 9f4b289

Browse files
authored
[OMON-595] Use run number as record key in Kafka transport (#304)
1 parent f934a7d commit 9f4b289

File tree

6 files changed

+25
-3
lines changed

6 files changed

+25
-3
lines changed

examples/5-Benchmark.cxx

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ int main(int argc, char* argv[])
6666
}
6767

6868
auto monitoring = MonitoringFactory::Get(vm["url"].as<std::string>());
69+
monitoring->setRunNumber(intDist(mt));
6970
if (vm["monitor"].as<bool>()) {
7071
monitoring->enableProcessMonitoring(1, {PmMeasurement::Cpu, PmMeasurement::Mem, PmMeasurement::Smaps});
7172
}
@@ -102,7 +103,7 @@ int main(int argc, char* argv[])
102103
for (int i = 1; i <= measurements; i++) {
103104
for (int k = 1; k <= flps; k++) {
104105
monitoring->send(Metric{doubleDist(mt), "doubleMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
105-
monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
106+
monitoring->send(Metric{intDist(mt), "intMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k).addTag(tags::Key::Subsystem, tags::Value::Readout));
106107
monitoring->send(Metric{std::rand() % 2, "onOffMetric" + std::to_string(i)}.addTag(tags::Key::FLP, k));
107108
std::this_thread::sleep_for(std::chrono::microseconds(10));
108109
}

include/Monitoring/Backend.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ class Backend
4949
}
5050

5151
/// Run number setter
52-
void setRunNumber(uint32_t runNumber) {
52+
virtual void setRunNumber(uint32_t runNumber) {
5353
mRunNumber = runNumber;
5454
}
5555

src/Backends/InfluxDB.cxx

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,11 @@ void InfluxDB::send(std::vector<Metric>&& metrics)
7373
mTransport->send(std::move(influxMetrics));
7474
}
7575

76+
void InfluxDB::setRunNumber(uint32_t runNumber) {
77+
mRunNumber = runNumber;
78+
mTransport->setKey(std::to_string(runNumber));
79+
}
80+
7681
void InfluxDB::send(const Metric& metric)
7782
{
7883
mTransport->send(toInfluxLineProtocol(metric));

src/Backends/InfluxDB.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class InfluxDB final : public Backend
5050
/// \return timestamp in nanoseconds
5151
inline unsigned long convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp);
5252

53+
/// Reimplements setting run number and some transport needs to be notified about that as well
54+
/// \param runNumber run number to be set
55+
virtual void setRunNumber(uint32_t runNumber) override;
56+
5357
/// Sends metric to InfluxDB instance via one transport
5458
/// \param metric reference to metric object
5559
void send(const Metric& metric) override;

src/Transports/KafkaProducer.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void KafkaProducer::send(std::string&& message)
6060
mTopic, partition,
6161
RdKafka::Producer::RK_MSG_COPY,
6262
const_cast<char*>(message.c_str()), message.size(),
63-
NULL, 0,
63+
const_cast<char*>(mKey.c_str()), mKey.size(),
6464
0,
6565
NULL,
6666
NULL);

src/Transports/TransportInterface.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#ifndef ALICEO2_MONITORING_TRANSPORTS_TRANSPORTINTERFACE_H
1818
#define ALICEO2_MONITORING_TRANSPORTS_TRANSPORTINTERFACE_H
1919

20+
#include <atomic>
2021
#include <string>
2122
#include <vector>
2223

@@ -32,6 +33,12 @@ namespace transports
3233
/// \brief Transport interface for backends
3334
class TransportInterface
3435
{
36+
protected:
37+
/// Transport record key
38+
// This is needed by some transports to route metrics, eg. Kafka
39+
// This can be set to run number or measurement name and run number
40+
std::string mKey;
41+
3542
public:
3643
TransportInterface() = default;
3744

@@ -40,6 +47,11 @@ class TransportInterface
4047
/// Sends metric via given transport
4148
/// \param message r-value to string formatted metric
4249
virtual void send(std::string&& message) = 0;
50+
51+
/// Transport record key setter
52+
void setKey(const std::string& key) {
53+
mKey = key;
54+
}
4355
};
4456

4557
} // namespace transports

0 commit comments

Comments
 (0)