Skip to content

Commit b0ea0f2

Browse files
authored
Allow sending vector of metrics (#40)
1 parent 6561612 commit b0ea0f2

File tree

14 files changed

+159
-25
lines changed

14 files changed

+159
-25
lines changed

examples/5-Benchmark.cxx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ int main(int argc, char *argv[]) {
5757
}
5858
} else {
5959
for (int i = 0; i <= count; i += add) {
60-
monitoring->send("benchmarkMeasurement",{
60+
monitoring->sendGrouped("benchmarkMeasurement",{
6161
{"string" + std::to_string(intDist(mt)), "stringMetric"},
6262
{doubleDist(mt), "doubleMetric"},
6363
{intDist(mt), "intMetric"}

examples/8-Multiple.cxx

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@ using Monitoring = o2::monitoring::MonitoringFactory;
1010
int main() {
1111
// Configure monitoring
1212
// Pass string with list of URLs as parameter
13-
auto monitoring = Monitoring::Get("infologger://");
13+
auto monitoring = Monitoring::Get("flume://pcald03.cern.ch:8092");
1414

15-
monitoring->send("measurementName", {{20, "myMetricIntMultiple"}, {20.30, "myMetricFloatMultple"}});
15+
//monitoring->sendGrouped("measurementName", {{20, "myMetricIntMultiple"}, {20.30, "myMetricFloatMultple"}});
16+
monitoring->send({{201, "myMetricIntMultiple"}, {2.34, "myMetricFloatMultple"}});
1617
}

include/Monitoring/Backend.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,11 @@ class Backend
3232
/// Sends metric via backend
3333
virtual void send(const Metric& metric) = 0;
3434

35-
/// Sends multiple metrics (if supported), otherwise falls back into sending single metrics
35+
/// Sends multiple metrics not related to each other
36+
virtual void send(std::vector<Metric>&& metrics) = 0;
37+
38+
/// Sends multiple related to each other metrics under a common name
39+
/// If not supported by backend, falls back into sending single metrics
3640
virtual void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) = 0;
3741

3842
/// Sets a tag

include/Monitoring/Monitoring.h

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,16 @@ class Monitoring
5454
/// \param metric r-value to metric object
5555
void send(Metric&& metric, DerivedMetricMode mode = DerivedMetricMode::NONE);
5656

57-
/// Sends multiple metrics to as a single measurement
58-
/// If it's not supported by backend it fallbacks into sending multiple metrics
57+
/// Sends multiple not related to each other metrics
58+
/// \@param metrics vector of metrics
59+
void send(std::vector<Metric>&& metrics);
60+
61+
/// Sends multiple realated to each other metrics under a common measurement name
62+
/// You can imagine it as a data point with multiple values
63+
/// If it's not supported by a backend it fallbacks into sending one by one
5964
/// \param name measurement name
6065
/// \param metrics list of metrics
61-
void send(std::string name, std::vector<Metric>&& metrics);
66+
void sendGrouped(std::string name, std::vector<Metric>&& metrics);
6267

6368
/// Enables process monitoring
6469
/// \param interval refresh interval

src/Backends/ApMonBackend.cxx

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,56 @@ void ApMonBackend::addGlobalTag(std::string, std::string value)
4040
entity += value;
4141
}
4242

43+
void ApMonBackend::send(std::vector<Metric>&& metrics)
44+
{
45+
int noMetrics = metrics.size();
46+
char **paramNames, **paramValues;
47+
int *valueTypes;
48+
paramNames = (char **)std::malloc(noMetrics * sizeof(char *));
49+
paramValues = (char **)std::malloc(noMetrics * sizeof(char *));
50+
valueTypes = (int *)std::malloc(noMetrics * sizeof(int));
51+
// the scope of values must be the same as sendTimedParameters method
52+
int intValue;
53+
double doubleValue;
54+
std::string stringValue;
55+
56+
for (int i = 0; i < noMetrics; i++) {
57+
paramNames[i] = const_cast<char*>(metrics[i].getName().c_str());
58+
switch(metrics[i].getType()) {
59+
case MetricType::INT :
60+
{
61+
valueTypes[i] = XDR_INT32;
62+
intValue = boost::get<int>(metrics[i].getValue());
63+
paramValues[i] = reinterpret_cast<char*>(&intValue);
64+
}
65+
break;
66+
67+
case MetricType::STRING :
68+
{
69+
valueTypes[i] = XDR_STRING;
70+
stringValue = boost::get<std::string>(metrics[i].getValue());
71+
paramValues[i] = const_cast<char*>(stringValue.c_str());
72+
}
73+
break;
74+
75+
case MetricType::DOUBLE :
76+
{
77+
valueTypes[i] = XDR_REAL64;
78+
doubleValue = boost::get<double>(metrics[i].getValue());
79+
paramValues[i] = reinterpret_cast<char*>(&doubleValue);
80+
}
81+
break;
82+
}
83+
}
84+
85+
mApMon->sendTimedParameters(const_cast<char*>(entity.c_str()), const_cast<char*>(entity.c_str()),
86+
noMetrics, paramNames, valueTypes, paramValues, convertTimestamp(metrics[0].getTimestamp()));
87+
88+
std::free(paramNames);
89+
std::free(paramValues);
90+
std::free(valueTypes);
91+
}
92+
4393
void ApMonBackend::sendMultiple(std::string, std::vector<Metric>&& metrics)
4494
{
4595
int noMetrics = metrics.size();

src/Backends/ApMonBackend.h

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,16 @@ class ApMonBackend final : public Backend
3737
/// Default destructor
3838
~ApMonBackend() = default;
3939

40+
/// Sends multiple metrics not related to each other
41+
/// \@param metrics vector of metrics
42+
void send(std::vector<Metric>&& metrics) override;
43+
4044
/// Sends metric via MonALISA
4145
/// ApMonBackend's intances is type-aware therefore cast of metric value is needed
4246
/// \param metric reference to metric object:
4347
void send(const Metric& metric) override;
4448

45-
/// Sends multiple metric in single packet
49+
/// Sends grouped metrics under common measuremet name
4650
/// \param name measurement name
4751
/// \param metrics list of metrics
4852
void sendMultiple(std::string measurement, std::vector<Metric>&& metrics) override;

src/Backends/Flume.cxx

Lines changed: 35 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@ std::string Flume::metricToJson(const Metric& metric)
3131
boost::property_tree::ptree header = globalHeader;
3232
header.put<std::string>("timestamp", std::to_string(convertTimestamp(metric.getTimestamp())));
3333
header.put<std::string>("name", metric.getName());
34-
header.put<std::string>("value", boost::lexical_cast<std::string>(metric.getValue()));
34+
header.put<std::string>("value_value", boost::lexical_cast<std::string>(metric.getValue()));
3535
for (const auto& tag : metric.getTags()) {
36-
header.put<std::string>(tag.name, tag.value);
36+
header.put<std::string>("tag_" + tag.name, tag.value);
3737
}
3838
event.push_back(std::make_pair("headers", header));
39-
event.put<std::string>("body", boost::lexical_cast<std::string>(metric.getValue()));
39+
event.put<std::string>("body", "");
4040
std::stringstream ss;
4141
write_json(ss, event);
4242
std::string s = ss.str();
@@ -45,14 +45,41 @@ std::string Flume::metricToJson(const Metric& metric)
4545
return s;
4646
}
4747

48+
void Flume::send(std::vector<Metric>&& metrics)
49+
{
50+
std::string flumeMetrics = "";
51+
for (auto& metric : metrics) {
52+
flumeMetrics += metricToJson(metric);
53+
flumeMetrics += "\n";
54+
}
55+
mTransport->send(std::move(flumeMetrics));
56+
}
57+
4858
void Flume::sendMultiple(std::string measurement, std::vector<Metric>&& metrics)
4959
{
50-
for (auto& m : metrics) {
51-
std::string tempName = m.getName();
52-
m.setName(measurement + "-" + m.getName());
53-
send(m);
54-
m.setName(tempName);
60+
mTransport->send(metricsToJson(measurement, std::move(metrics)));
61+
}
62+
63+
std::string Flume::metricsToJson(std::string measurement, std::vector<Metric>&& metrics)
64+
{
65+
boost::property_tree::ptree event;
66+
boost::property_tree::ptree header = globalHeader;
67+
header.put<std::string>("timestamp", std::to_string(convertTimestamp(metrics.front().getTimestamp())));
68+
header.put<std::string>("name", measurement);
69+
for (const auto& tag : metrics.front().getTags()) {
70+
header.put<std::string>("tag_" + tag.name, tag.value);
71+
}
72+
for (auto& metric : metrics) {
73+
header.put<std::string>("value_" + metric.getName(), boost::lexical_cast<std::string>(metric.getValue()));
5574
}
75+
event.push_back(std::make_pair("headers", header));
76+
event.put<std::string>("body", "");
77+
std::stringstream ss;
78+
write_json(ss, event);
79+
std::string s = ss.str();
80+
s.erase(std::remove_if( s.begin(), s.end(),
81+
[](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), s.end() );
82+
return s;
5683
}
5784

5885
void Flume::send(const Metric& metric)

src/Backends/Flume.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,10 @@ class Flume final : public Backend
4646
/// \param metric reference to metric object
4747
void send(const Metric& metric) override;
4848

49+
/// Sends multiple metrics not related to each other
50+
/// \@param metrics vector of metrics
51+
void send(std::vector<Metric>&& metrics) override;
52+
4953
/// Sends multiple metric in single packet
5054
/// Not supported by the backend therefore it falls back to sending metric one by one
5155
/// TODO: changed required in Flume Source
@@ -68,7 +72,8 @@ class Flume final : public Backend
6872
/// Serializes metric object to JSON
6973
/// \param metric
7074
/// \return JSON serializes metric
71-
std::string metricToJson(const Metric& metric);
75+
std::string metricToJson(const Metric& metric);
76+
std::string metricsToJson(std::string measurement, std::vector<Metric>&& metrics);
7277
};
7378

7479
} // namespace backends

src/Backends/InfluxDB.cxx

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,29 @@ void InfluxDB::sendMultiple(std::string measurement, std::vector<Metric>&& metri
6969
}
7070
}
7171

72+
void InfluxDB::send(std::vector<Metric>&& metrics) {
73+
std::string influxMetrics = "";
74+
for (const auto& metric : metrics) {
75+
influxMetrics += toInfluxLineProtocol(metric);
76+
influxMetrics += "\n";
77+
}
78+
79+
try {
80+
transport->send(std::move(influxMetrics));
81+
} catch (MonitoringInternalException&) {
82+
}
83+
84+
}
85+
7286
void InfluxDB::send(const Metric& metric)
7387
{
88+
try {
89+
transport->send(toInfluxLineProtocol(metric));
90+
} catch (MonitoringInternalException&) {
91+
}
92+
}
93+
94+
std::string InfluxDB::toInfluxLineProtocol(const Metric& metric) {
7495
std::string metricTags{};
7596
for (const auto& tag : metric.getTags()) {
7697
metricTags += "," + tag.name + "=" + tag.value;
@@ -83,11 +104,7 @@ void InfluxDB::send(const Metric& metric)
83104

84105
std::stringstream convert;
85106
convert << name << "," << tagSet << metricTags << " value=" << value << " " << convertTimestamp(metric.getTimestamp());
86-
87-
try {
88-
transport->send(convert.str());
89-
} catch (MonitoringInternalException&) {
90-
}
107+
return convert.str();
91108
}
92109

93110
void InfluxDB::prepareValue(std::string& value, int type)

src/Backends/InfluxDB.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,10 @@ class InfluxDB final : public Backend
5050
/// \param metric reference to metric object
5151
void send(const Metric& metric) override;
5252

53+
/// Sends multiple metrics not related to each other
54+
/// \@param metrics vector of metrics
55+
void send(std::vector<Metric>&& metrics) override;
56+
5357
/// Sends multiple values in single measurement
5458
/// \param name measurement name
5559
/// \param metrics list of metrics
@@ -61,7 +65,7 @@ class InfluxDB final : public Backend
6165
void addGlobalTag(std::string name, std::string value) override;
6266

6367
private:
64-
std::unique_ptr<transports::TransportInterface> transport;
68+
std::unique_ptr<transports::TransportInterface> transport; ///< InfluxDB transport
6569
std::string tagSet; ///< Global tagset (common for each metric)
6670

6771
/// Escapes " ", "," and "=" characters
@@ -72,6 +76,7 @@ class InfluxDB final : public Backend
7276
/// \param value reference to value
7377
/// \param type type of the metric
7478
void prepareValue(std::string& value, int type);
79+
std::string toInfluxLineProtocol(const Metric& metric);
7580
};
7681

7782
} // namespace backends

0 commit comments

Comments
 (0)