Skip to content

Commit 0cd404d

Browse files
committed
added a monitoring queue to push spontaneous measurements
1 parent 22ab99e commit 0cd404d

File tree

5 files changed

+90
-0
lines changed

5 files changed

+90
-0
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@ add_library(
258258
${SOURCE_DIR}/MemoryBank.cxx
259259
${SOURCE_DIR}/MemoryBankManager.cxx
260260
${SOURCE_DIR}/MemoryPagesPool.cxx
261+
${SOURCE_DIR}/ReadoutMonitoringQueue.cxx
261262
$<$<BOOL:${ZMQ_FOUND}>:${SOURCE_DIR}/ZmqServer.cxx>
262263
$<$<BOOL:${ZMQ_FOUND}>:${SOURCE_DIR}/ZmqClient.cxx>
263264
)

src/ConsumerStats.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "DataSet.h"
2424
#include "ReadoutUtils.h"
2525
#include "ReadoutStats.h"
26+
#include "ReadoutMonitoringQueue.h"
2627

2728
using namespace o2::monitoring;
2829

@@ -182,6 +183,12 @@ class ConsumerStats : public Consumer
182183
sendMetricNoException(Metric{"readout.bufferUsage"}.addValue((int)(r*100), "value").addValue(b, "bytes").addTag(tags::Key::ID, i));
183184
}
184185
}
186+
187+
// publish measurements stored in monitoring queue
188+
auto ff = [&] (const ReadoutMonitoringMetric &m) -> void {
189+
sendMetricNoException(Metric{m.value, m.name}.addTag(tags::Key::ID, m.tag));
190+
};
191+
gReadoutMonitoringQueue.execute(ff);
185192
}
186193

187194
#ifdef WITH_ZMQ

src/ReadoutMonitoringQueue.cxx

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
#include "ReadoutMonitoringQueue.h"
2+
3+
ReadoutMonitoringQueue::ReadoutMonitoringQueue() {
4+
}
5+
6+
ReadoutMonitoringQueue::~ReadoutMonitoringQueue() {
7+
}
8+
9+
void ReadoutMonitoringQueue::push(ReadoutMonitoringMetric m) {
10+
std::unique_lock<std::mutex> lock(qMutex);
11+
q.push_front(std::move(m));
12+
}
13+
14+
void ReadoutMonitoringQueue::execute(std::function<void(const ReadoutMonitoringMetric &)> f) {
15+
for (;;) {
16+
ReadoutMonitoringMetric m;
17+
18+
{
19+
std::unique_lock<std::mutex> lock(qMutex);
20+
if (q.empty()) {
21+
break;
22+
}
23+
m = std::move(q.back());
24+
q.pop_back();
25+
}
26+
27+
f(m);
28+
}
29+
}
30+
31+
void ReadoutMonitoringQueue::clear() {
32+
std::unique_lock<std::mutex> lock(qMutex);
33+
q.clear();
34+
}
35+
36+
ReadoutMonitoringQueue gReadoutMonitoringQueue;

src/ReadoutMonitoringQueue.h

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#include <deque>
2+
#include <mutex>
3+
#include <string>
4+
#include <functional>
5+
6+
// a metric to be stored in queue for later processing
7+
// fields as for o2::monitoring::metric
8+
struct ReadoutMonitoringMetric {
9+
std::string name;
10+
unsigned short int tag;
11+
uint64_t value;
12+
};
13+
14+
15+
// producer-consumer queue to define and publish metrics
16+
// typical use:
17+
// the module that pushes has no access to o2 Monitoring
18+
// the module that publishes reads from the queue and publish them to o2 Monitoring
19+
// the class is not aware of o2::monitoring, it's just a transient thread-safe storage
20+
21+
class ReadoutMonitoringQueue {
22+
public:
23+
24+
ReadoutMonitoringQueue();
25+
~ReadoutMonitoringQueue();
26+
27+
// push an element in the queue
28+
void push(ReadoutMonitoringMetric);
29+
30+
// execute provided functions on all elements in the queue
31+
// (and remove them from the queue)
32+
void execute(std::function<void(const ReadoutMonitoringMetric &)>);
33+
34+
// remove all elements in queue
35+
void clear();
36+
37+
private:
38+
std::mutex qMutex;
39+
std::deque<ReadoutMonitoringMetric> q;
40+
};
41+
42+
extern ReadoutMonitoringQueue gReadoutMonitoringQueue;

src/mainReadout.cxx

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@
7979
#include "ReadoutVersion.h"
8080
#include "TtyChecker.h"
8181
#include "ReadoutConst.h"
82+
#include "ReadoutMonitoringQueue.h"
8283

8384
#ifdef WITH_NUMA
8485
#include <numa.h>
@@ -774,6 +775,9 @@ int Readout::_configure(const boost::property_tree::ptree& properties)
774775
// reset some flags
775776
gReadoutStats.isFairMQ = 0; // disable FMQ stats
776777

778+
// reset monitoring queue
779+
gReadoutMonitoringQueue.clear();
780+
777781
// load configuration file
778782
theLog.log(LogInfoSupport, "Reading configuration from %s %s", cfgFileURI, cfgFileEntryPoint);
779783

0 commit comments

Comments
 (0)