Skip to content

Commit 03fd42d

Browse files
authored
Auto push metrics (#62)
1 parent 3fb1f84 commit 03fd42d

File tree

11 files changed

+174
-26
lines changed

11 files changed

+174
-26
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@ set(EXAMPLES
171171
examples/8-Multiple.cxx
172172
examples/9-Timer.cxx
173173
examples/10-Buffering.cxx
174+
examples/11-AutoUpdate.cxx
174175
)
175176

176177
foreach (example ${EXAMPLES})

examples/10-Buffering.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ using Monitoring = o2::monitoring::MonitoringFactory;
1010
int main() {
1111
// Configure monitoring
1212
// Pass string with list of URLs as parameter
13-
auto monitoring = Monitoring::Get("infologger://,flume://localhost:1234");
13+
auto monitoring = Monitoring::Get("infologger://");
1414
monitoring->enableBuffering(10);
1515

1616
// now send an application specific metric

examples/11-AutoUpdate.cxx

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
///
2+
/// \file 1-Basic.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "Monitoring/MonitoringFactory.h"
7+
8+
using namespace o2::monitoring;
9+
10+
int main() {
11+
auto monitoring = MonitoringFactory::Get("infologger://");
12+
monitoring->enableAutoPush();
13+
auto& qcMetric = monitoring->getAutoPushMetric("qcMetric");
14+
auto& qcMetric2 = monitoring->getAutoPushMetric("qcMetric2");
15+
std::this_thread::sleep_for (std::chrono::milliseconds(2000));
16+
qcMetric = 123;
17+
qcMetric2 = 321;
18+
std::this_thread::sleep_for (std::chrono::milliseconds(2000));
19+
}

include/Monitoring/Metric.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
#include <string>
1010
#include <chrono>
11+
#include <mutex>
1112
#include <vector>
1213
#include <boost/variant.hpp>
1314
#include "Tag.h"
@@ -56,6 +57,15 @@ class Metric
5657

5758
/// Default destructor
5859
~Metric() = default;
60+
61+
/// Copy initialization
62+
Metric(const Metric& other);
63+
64+
/// Copy assignment
65+
Metric& operator=(Metric const& other);
66+
67+
/// Assign operator overload, assignes new values to the metric object
68+
Metric& operator=(const boost::variant< int, std::string, double, uint64_t >& value);
5969

6070
/// Name getter
6171
/// \return metric name
@@ -107,6 +117,9 @@ class Metric
107117

108118
/// Metric tags
109119
std::vector<Tag> tagSet;
120+
121+
/// Mutex for accesing metric value
122+
mutable std::mutex mValueMutex;
110123
};
111124

112125
} // namespace monitoring

include/Monitoring/Monitoring.h

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include <tuple>
1616
#include <unordered_map>
1717
#include <vector>
18+
#include <deque>
1819

1920
#include "Monitoring/Backend.h"
2021
#include "Monitoring/DerivedMetrics.h"
@@ -96,6 +97,16 @@ class Monitoring
9697
/// \param name tag name
9798
/// \param value tag value
9899
void addGlobalTag(std::string name, std::string value);
100+
101+
/// Returns a metric which will be periodically sent to backends
102+
/// \param name metric name
103+
/// \return periodically send metric
104+
Metric& getAutoPushMetric(std::string name);
105+
106+
/// Enables periodical push interval
107+
/// \param interval interval in seconds
108+
void enableAutoPush(const unsigned int interval = 1);
109+
99110
private:
100111
/// Derived metrics handler
101112
/// \see class DerivedMetrics
@@ -119,9 +130,8 @@ class Monitoring
119130
/// Process Monitor object that sends updates about the process itself
120131
std::unique_ptr<ProcessMonitor> mProcessMonitor;
121132

122-
/// Process Monitor thread loop
123-
/// \param interval sleep time in seconds
124-
void processMonitorLoop(int interval);
133+
/// Push metric loop
134+
void pushLoop();
125135

126136
/// Metric buffer
127137
std::vector<Metric> mStorage;
@@ -131,6 +141,15 @@ class Monitoring
131141

132142
/// Size of buffer
133143
unsigned int mBufferSize;
144+
145+
/// Store for automatically pushed metrics
146+
std::deque<Metric> mPushStore;
147+
148+
/// Process monitor interval
149+
std::atomic<unsigned int> mProcessMonitoringInterval;
150+
151+
/// Automatic metric push interval
152+
std::atomic<unsigned int> mAutoPushInterval;
134153
};
135154

136155
} // namespace monitoring

src/Metric.cxx

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,11 +55,40 @@ Metric::Metric(boost::variant< int, std::string, double, uint64_t > value, const
5555
mValue(value), mName(name), mTimestamp(timestamp)
5656
{}
5757

58+
Metric::Metric(const Metric& other)
59+
{
60+
std::lock_guard<std::mutex> lock(other.mValueMutex);
61+
mName = other.mName;
62+
mValue = other.mValue;
63+
mTimestamp = other.mTimestamp;
64+
tagSet = other.tagSet;
65+
}
66+
67+
Metric& Metric::operator=(Metric const& other)
68+
{
69+
if (&other != this) {
70+
std::unique_lock<std::mutex> lockThis(mValueMutex, std::defer_lock);
71+
std::unique_lock<std::mutex> lockOther(other.mValueMutex, std::defer_lock);
72+
std::lock(lockThis, lockOther);
73+
74+
mName = other.mName;
75+
mValue = other.mValue;
76+
mTimestamp = other.mTimestamp;
77+
tagSet = other.tagSet;
78+
}
79+
return *this;
80+
}
81+
5882
boost::variant< int, std::string, double, uint64_t > Metric::getValue() const
5983
{
6084
return mValue;
6185
}
6286

87+
Metric& Metric::operator=(const boost::variant< int, std::string, double, uint64_t >& value) {
88+
mValue = value;
89+
return *this;
90+
}
91+
6392
Metric&& Metric::addTags(std::vector<Tag>&& tags)
6493
{
6594
tagSet = std::move(tags);

src/Monitoring.cxx

Lines changed: 48 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ Monitoring::Monitoring()
3939
mProcessMonitor = std::make_unique<ProcessMonitor>();
4040
mDerivedHandler = std::make_unique<DerivedMetrics>();
4141
mBuffering = false;
42+
mProcessMonitoringInterval = 0;
43+
mAutoPushInterval = 0;
44+
mMonitorRunning = false;
4245
}
4346

4447
void Monitoring::enableBuffering(const unsigned int size)
@@ -59,13 +62,16 @@ void Monitoring::flushBuffer() {
5962
}
6063

6164
void Monitoring::enableProcessMonitoring(const unsigned int interval) {
62-
mMonitorRunning = true;
63-
mMonitorThread = std::thread(&Monitoring::processMonitorLoop, this, interval);
64-
#ifdef _OS_LINUX
65+
mProcessMonitoringInterval = interval;
66+
if (!mMonitorRunning) {
67+
mMonitorRunning = true;
68+
mMonitorThread = std::thread(&Monitoring::pushLoop, this);
69+
}
70+
#ifdef _OS_LINUX
6571
MonLogger::Get() << "Process Monitor : Automatic updates enabled" << MonLogger::End();
66-
#else
72+
#else
6773
MonLogger::Get() << "!! Process Monitor : Limited metrics available" << MonLogger::End();
68-
#endif
74+
#endif
6975
}
7076

7177
void Monitoring::startTimer(std::string name) {
@@ -115,21 +121,39 @@ Monitoring::~Monitoring()
115121
}
116122
}
117123

118-
void Monitoring::processMonitorLoop(int interval)
124+
void Monitoring::pushLoop()
119125
{
120-
// loopCount - no need to wait full sleep time to terminame the thread
121-
int loopCount = 0;
126+
unsigned int loopCount = 0;
127+
std::this_thread::sleep_for (std::chrono::milliseconds(100));
122128
while (mMonitorRunning) {
123-
std::this_thread::sleep_for (std::chrono::milliseconds(interval*10));
124-
if ((++loopCount % 100) != 0) continue;
125-
send(mProcessMonitor->getCpuAndContexts());
126-
#ifdef _OS_LINUX
127-
send(mProcessMonitor->getMemoryUsage());
128-
#endif
129-
loopCount = 0;
129+
if (mProcessMonitoringInterval != 0 && (loopCount % (mProcessMonitoringInterval*10)) == 0) {
130+
send(mProcessMonitor->getCpuAndContexts());
131+
#ifdef _OS_LINUX
132+
send(mProcessMonitor->getMemoryUsage());
133+
#endif
134+
}
135+
136+
if (mAutoPushInterval != 0 && (loopCount % (mAutoPushInterval*10)) == 0) {
137+
std::vector<Metric> metrics;
138+
for (auto& metric : mPushStore) {
139+
metrics.push_back(metric);
140+
}
141+
send(std::move(metrics));
142+
}
143+
std::this_thread::sleep_for (std::chrono::milliseconds(100));
144+
(loopCount >= 600) ? loopCount = 0 : loopCount++;
130145
}
131146
}
132147

148+
Metric& Monitoring::getAutoPushMetric(std::string name)
149+
{
150+
if (mAutoPushInterval == 0) {
151+
MonLogger::Get() << "[WARN] AutoPush is not enabled" << MonLogger::End();
152+
}
153+
mPushStore.emplace_back(boost::variant< int, std::string, double, uint64_t > {}, name);
154+
return mPushStore.back();
155+
}
156+
133157
void Monitoring::sendGrouped(std::string measurement, std::vector<Metric>&& metrics)
134158
{
135159
for (auto& b: mBackends) {
@@ -153,6 +177,15 @@ void Monitoring::debug(Metric&& metric)
153177
}
154178
}
155179

180+
void Monitoring::enableAutoPush(unsigned int interval)
181+
{
182+
if (!mMonitorRunning) {
183+
mMonitorRunning = true;
184+
mMonitorThread = std::thread(&Monitoring::pushLoop, this);
185+
}
186+
mAutoPushInterval = interval;
187+
}
188+
156189
void Monitoring::pushToBackends(Metric&& metric)
157190
{
158191
if (mBuffering) {

src/ProcessMonitor.cxx

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ ProcessMonitor::ProcessMonitor()
2424
mPid = static_cast<unsigned int>(::getpid());
2525
getrusage(RUSAGE_SELF, &mPreviousGetrUsage);
2626
mTimeLastRun = std::chrono::high_resolution_clock::now();
27-
#ifdef _OS_LINUX
27+
#ifdef _OS_LINUX
2828
setTotalMemory();
29-
#endif
29+
#endif
3030
}
3131

3232
void ProcessMonitor::setTotalMemory()
@@ -62,8 +62,9 @@ std::vector<Metric> ProcessMonitor::getCpuAndContexts() {
6262
getrusage(RUSAGE_SELF, &currentUsage);
6363
auto timeNow = std::chrono::high_resolution_clock::now();
6464
double timePassed = std::chrono::duration_cast<std::chrono::microseconds>(timeNow - mTimeLastRun).count();
65-
if (timePassed < 950) { // do not run too often
66-
throw MonitoringInternalException("Process Monitor getrusage", "Do not invoke more often then 1ms");
65+
if (timePassed < 950) {
66+
MonLogger::Get() << "[WARN] Do not invoke Process Monitor more frequent then every 1s" << MonLogger::End();
67+
return {};
6768
}
6869
double fractionCpuUsed = (
6970
currentUsage.ru_utime.tv_sec*1000000.0 + currentUsage.ru_utime.tv_usec - (mPreviousGetrUsage.ru_utime.tv_sec*1000000.0 + mPreviousGetrUsage.ru_utime.tv_usec)

test/testMetric.cxx

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,19 @@ BOOST_AUTO_TEST_CASE(tags) {
7676
}
7777
}
7878

79+
BOOST_AUTO_TEST_CASE(customCopyConstructor) {
80+
o2::monitoring::Metric metric = o2::monitoring::Metric{10, "myMetric"}.addTags({{"tag1", "value1"}, {"tag2", "value2"}});
81+
auto copied = metric;
82+
BOOST_CHECK_EQUAL(boost::get<int>(copied.getValue()), 10);
83+
BOOST_CHECK_EQUAL(copied.getName(), "myMetric");
84+
85+
std::vector<Tag> tags = copied.getTags();
86+
for (auto const& tag: tags) {
87+
BOOST_TEST(tag.name.find("tag") != std::string::npos);
88+
BOOST_TEST(tag.value.find("value") != std::string::npos);
89+
}
90+
}
91+
7992

8093

8194
} // namespace Test

test/testMonitoring.cxx

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,18 @@ BOOST_AUTO_TEST_CASE(testTimer)
5050
monitoring->stopAndSendTimer("timer");
5151
}
5252

53+
BOOST_AUTO_TEST_CASE(testPush)
54+
{
55+
auto monitoring = Monitoring::Get("infologger://");
56+
monitoring->enableAutoPush();
57+
auto& qcMetric = monitoring->getAutoPushMetric("qcMetric");
58+
auto& qcMetric2 = monitoring->getAutoPushMetric("qcMetric2");
59+
std::this_thread::sleep_for (std::chrono::milliseconds(1500));
60+
qcMetric = 133 + 11 + 2.2;
61+
qcMetric2 = 133 - 11 - 2.2;
62+
std::this_thread::sleep_for (std::chrono::milliseconds(1500));
63+
}
64+
5365
BOOST_AUTO_TEST_CASE(testSymbols)
5466
{
5567
BOOST_WARN_MESSAGE(!BOOST_IS_DEFINED( _WITH_APPMON ), "ApMon Backend disabled");

0 commit comments

Comments
 (0)