Skip to content

Commit a686900

Browse files
authored
Zabbix backend init (#10)
1 parent 7f6ec44 commit a686900

File tree

9 files changed

+326
-0
lines changed

9 files changed

+326
-0
lines changed

CMakeLists.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,12 +71,14 @@ set(SRCS
7171
src/Collector.cxx
7272
src/Metric.cxx
7373
src/Backends/InfoLoggerBackend.cxx
74+
src/Backends/Zabbix.cxx
7475
src/Backends/Flume.cxx
7576
src/DerivedMetrics.cxx
7677
src/ProcessMonitor.cxx
7778
src/ProcessDetails.cxx
7879
src/MonitoringFactory.cxx
7980
src/Transports/UDP.cxx
81+
src/Transports/TCP.cxx
8082
src/Transports/HTTP.cxx
8183
src/Exceptions/MonitoringException.cxx
8284
src/Exceptions/MonitoringInternalException.cxx
@@ -143,6 +145,7 @@ set(TEST_SRCS
143145
test/testDerived.cxx
144146
test/testFlume.cxx
145147
test/testMetric.cxx
148+
test/testZabbix.cxx
146149
test/testProcessDetails.cxx
147150
test/testProcessMonitor.cxx
148151
)

README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ Metrics are pushed to one or multiple backends. The module currently supports th
150150
| ApMonBackend | MonALISA Serivce | UDP | ApMon | Default tags concatenated with entity; Metric tags concatenated with name |
151151
| InfoLoggerBackned| O2 Logging module | - | (as log message) | Added to the end of message |
152152
| Flume | Collects, aggragate monitoring data | UDP (JSON) | boost asio | In Flume Event header |
153+
| Zabbix | Via Zabbix trapper item | TCP (Zabbix protocol) | boost asio | Not supported |
153154

154155
Instruction how to install and configure server-sides backends are available in [Server-side backend installation and configuration](#server-side-backend-installation-and-configuration) section.
155156

@@ -169,6 +170,10 @@ Instruction how to install and configure server-sides backends are available in
169170
+ enable - enable Flume HTTP backend
170171
+ port
171172
+ hostname
173+
+ Zabbix
174+
+ enable - enable Zabbix backend
175+
+ port
176+
+ hostname
172177
+ ProcessMonitor
173178
+ enable - enable process monitor
174179
+ interval - updates interval[]

examples/config-default.ini

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ enable=0
1717
hostname=<hostname>
1818
port=4445
1919

20+
[Zabbix]
21+
enable=0
22+
hostname=<hostname>
23+
port=10051
24+
2025
[ProcessMonitor]
2126
enable=1
2227
interval=5

src/Backends/Zabbix.cxx

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
///
2+
/// \file Zabbix.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "Zabbix.h"
7+
#include <boost/lexical_cast.hpp>
8+
#include <boost/property_tree/json_parser.hpp>
9+
#include <string>
10+
#include "../Transports/TCP.h"
11+
#include "../Exceptions/MonitoringInternalException.h"
12+
13+
namespace AliceO2
14+
{
15+
/// ALICE O2 Monitoring system
16+
namespace Monitoring
17+
{
18+
/// Monitoring backends
19+
namespace Backends
20+
{
21+
22+
Zabbix::Zabbix(const std::string &hostname, int port)
23+
{
24+
transport = std::make_unique<Transports::TCP>(hostname, port);
25+
MonLogger::Get() << "Zabbix/TCP backend initialized"
26+
<< " ("<< hostname << ":" << port << ")" << MonLogger::End();
27+
}
28+
29+
inline std::string Zabbix::convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp)
30+
{
31+
std::string converted = std::to_string(
32+
std::chrono::duration_cast <std::chrono::nanoseconds>(
33+
timestamp.time_since_epoch()
34+
).count());
35+
converted.erase(converted.begin()+10, converted.end());
36+
return converted;
37+
}
38+
39+
std::string Zabbix::metricToZabbix(const Metric& metric)
40+
{
41+
// create JSON payload
42+
boost::property_tree::ptree request, dataArray, data;
43+
data.put<std::string>("host", hostname);
44+
data.put<std::string>("key", metric.getName());
45+
data.put<std::string>("value", boost::lexical_cast<std::string>(metric.getValue()));
46+
data.put<std::string>("clock", convertTimestamp(metric.getTimestamp()));
47+
48+
dataArray.push_back(std::make_pair("", data));
49+
request.put<std::string>("request", "sender data");
50+
request.add_child("data", dataArray);
51+
52+
std::stringstream ss;
53+
write_json(ss, request);
54+
55+
std::string noWhiteSpaces = ss.str();
56+
noWhiteSpaces.erase(std::remove_if( noWhiteSpaces.begin(), noWhiteSpaces.end(),
57+
[](char c){ return (c =='\r' || c =='\t' || c == ' ' || c == '\n');}), noWhiteSpaces.end() );
58+
noWhiteSpaces.insert(18, " ");
59+
60+
// prepare Zabbix header
61+
uint32_t length = noWhiteSpaces.length();
62+
std::vector<unsigned char> header = {
63+
'Z', 'B', 'X', 'D', '\x01',
64+
static_cast<unsigned char>(length & 0xFF),
65+
static_cast<unsigned char>((length >> 8) & 0x00FF),
66+
static_cast<unsigned char>((length >> 16) & 0x0000FF),
67+
static_cast<unsigned char>((length >> 24) & 0x000000FF),
68+
'\x00','\x00','\x00','\x00'
69+
};
70+
71+
return std::string(header.begin(), header.end()) + noWhiteSpaces;
72+
}
73+
74+
void Zabbix::send(const Metric& metric) {
75+
try {
76+
transport->send(metricToZabbix(metric));
77+
} catch (MonitoringInternalException&) {
78+
}
79+
}
80+
81+
void Zabbix::addGlobalTag(std::string name, std::string value)
82+
{
83+
if (name == "hostname") {
84+
hostname = value;
85+
}
86+
}
87+
88+
} // namespace Backends
89+
} // namespace Monitoring
90+
} // namespace AliceO2

src/Backends/Zabbix.h

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
///
2+
/// \file Zabbix.h
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#ifndef ALICEO2_MONITORING_BACKENDS_ZABBIX_H
7+
#define ALICEO2_MONITORING_BACKENDS_ZABBIX_H
8+
9+
#include "Monitoring/Backend.h"
10+
#include "../Transports/TCP.h"
11+
#include "../MonLogger.h"
12+
#include <chrono>
13+
#include <string>
14+
15+
namespace AliceO2
16+
{
17+
/// ALICE O2 Monitoring system
18+
namespace Monitoring
19+
{
20+
/// Monitoring backends
21+
namespace Backends
22+
{
23+
24+
/// Backend that sends metrics to InfluxDB time-series databse
25+
///
26+
/// Metrics are converted into Influx Line protocol and then sent via one of available transports
27+
class Zabbix final : public Backend
28+
{
29+
public:
30+
/// Constructor, uses UDP transport
31+
/// \param hostname InfluxDB UDP endpoint hostname
32+
/// \param port InfluxDB UDP endpoint port number
33+
Zabbix(const std::string &hostname, int port);
34+
35+
/// Default destructor
36+
~Zabbix() = default;
37+
38+
/// Sends metric to InfluxDB instance via one transport
39+
/// \param metric reference to metric object
40+
void send(const Metric& metric) override;
41+
42+
/// Adds tag
43+
/// \param name tag name
44+
/// \param value tag value
45+
void addGlobalTag(std::string name, std::string value) override;
46+
47+
private:
48+
/// TCP transport
49+
std::unique_ptr<Transports::TCP> transport;
50+
51+
/// Hostname as required by Zabbix protocol
52+
std::string hostname;
53+
54+
/// Prepares Zabbix protocol message
55+
std::string metricToZabbix(const Metric& metric);
56+
57+
/// Converts timestamp into unix format
58+
/// \param timestamp chrono system_clock timestamp
59+
/// \return unix timestamp
60+
std::string convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp);
61+
};
62+
63+
} // namespace Backends
64+
} // namespace Monitoring
65+
} // namespace AliceO2
66+
67+
#endif // ALICEO2_MONITORING_BACKENDS_ZABBIX_H

src/Collector.cxx

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "Exceptions/MonitoringInternalException.h"
1919
#include "Backends/InfoLoggerBackend.h"
2020
#include "Backends/Flume.h"
21+
#include "Backends/Zabbix.h"
2122

2223
#ifdef _WITH_APPMON
2324
#include "Backends/ApMonBackend.h"
@@ -89,6 +90,16 @@ Collector::Collector(const std::string& configPath)
8990
MonLogger::Get() << "Flume backend disabled" << MonLogger::End();
9091
}
9192

93+
if (configFile->get<int>("Zabbix/enable").value_or(0) == 1) {
94+
mBackends.emplace_back(std::make_unique<Backends::Zabbix>(
95+
configFile->get<std::string>("Zabbix/hostname").value(),
96+
configFile->get<int>("Zabbix/port").value()
97+
));
98+
}
99+
else {
100+
MonLogger::Get() << "Zabbix backend disabled" << MonLogger::End();
101+
}
102+
92103
#ifdef _OS_LINUX
93104
mProcessMonitor = std::make_unique<ProcessMonitor>();
94105
if (configFile->get<int>("ProcessMonitor/enable").value_or(0) == 1) {

src/Transports/TCP.cxx

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
///
2+
/// \file TCP.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "TCP.h"
7+
#include <string>
8+
#include <iostream>
9+
#include "Exceptions/MonitoringInternalException.h"
10+
11+
namespace AliceO2
12+
{
13+
/// ALICE O2 Monitoring system
14+
namespace Monitoring
15+
{
16+
/// Monitoring transports
17+
namespace Transports
18+
{
19+
20+
TCP::TCP(const std::string &hostname, int port) :
21+
mSocket(mIoService)
22+
{
23+
boost::asio::ip::tcp::resolver resolver(mIoService);
24+
boost::asio::ip::tcp::resolver::query query(hostname, std::to_string(port));
25+
boost::asio::ip::tcp::resolver::iterator resolverIterator = resolver.resolve(query);
26+
27+
boost::asio::ip::tcp::resolver::iterator end;
28+
boost::system::error_code error = boost::asio::error::host_not_found;
29+
while (error && resolverIterator != end) {
30+
mSocket.close();
31+
mSocket.connect(*resolverIterator++, error);
32+
}
33+
if (error) {
34+
throw MonitoringInternalException("TCP connection", error.message());
35+
}
36+
}
37+
38+
void TCP::send(std::string&& message)
39+
{
40+
try {
41+
mSocket.send(boost::asio::buffer(message));
42+
} catch(const boost::system::system_error& e) {
43+
throw MonitoringInternalException("TCP send", e.what());
44+
}
45+
}
46+
47+
void TCP::read() {
48+
for (;;) {
49+
boost::system::error_code error;
50+
boost::array<char, 128> buf;
51+
size_t len = mSocket.read_some(boost::asio::buffer(buf), error);
52+
if (error == boost::asio::error::eof) {
53+
break;
54+
}
55+
std::cout.write(buf.data(), len);
56+
}
57+
}
58+
59+
} // namespace Transports
60+
} // namespace Monitoring
61+
} // namespace AliceO2

src/Transports/TCP.h

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
///
2+
/// \file TCP.h
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#ifndef ALICEO2_MONITORING_TRANSPORTS_TCP_H
7+
#define ALICEO2_MONITORING_TRANSPORTS_TCP_H
8+
9+
#include "TransportInterface.h"
10+
11+
#include <boost/array.hpp>
12+
#include <boost/asio.hpp>
13+
#include <boost/algorithm/string.hpp>
14+
#include <chrono>
15+
#include <string>
16+
17+
namespace AliceO2
18+
{
19+
/// ALICE O2 Monitoring system
20+
namespace Monitoring
21+
{
22+
/// Monitoring transports
23+
namespace Transports
24+
{
25+
26+
/// Transport that sends string formatted metrics via UDP
27+
class TCP : public TransportInterface
28+
{
29+
public:
30+
/// Constructor
31+
/// \param hostname InfluxDB instance hostname
32+
// \param port InfluxDB instance port number
33+
TCP(const std::string &hostname, int port);
34+
35+
/// Default destructor
36+
~TCP() = default;
37+
38+
/// Sends metric via UDP
39+
/// \param lineMessage r-value string formated
40+
void send(std::string&& message) override;
41+
42+
/// Dummy read method - Forwards read out buffer to cout
43+
void read();
44+
45+
private:
46+
/// Boost Asio I/O functionality
47+
boost::asio::io_service mIoService;
48+
49+
/// TCP socket
50+
boost::asio::ip::tcp::socket mSocket;
51+
52+
/// TCP endpoint interator
53+
boost::asio::ip::tcp::resolver::iterator mEndpoint;
54+
55+
};
56+
57+
} // namespace Transports
58+
} // namespace Monitoring
59+
} // namespace AliceO2
60+
61+
#endif // ALICEO2_MONITORING_TRANSPORTS_TCP_H

test/testZabbix.cxx

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include "../src/Backends/Zabbix.h"
2+
#include "../src/Exceptions/MonitoringInternalException.h"
3+
4+
#define BOOST_TEST_MODULE testZabbix
5+
#include <boost/test/included/unit_test.hpp>
6+
7+
8+
namespace AliceO2 {
9+
namespace Monitoring {
10+
namespace Test {
11+
12+
BOOST_AUTO_TEST_CASE(crateZabbixInstance)
13+
{
14+
try {
15+
AliceO2::Monitoring::Backends::Zabbix zabbixBackend("localhost", 1000);
16+
} catch (MonitoringInternalException &e) {
17+
BOOST_CHECK_EQUAL(std::string(e.what()), "Connection refused");
18+
}
19+
}
20+
21+
} // namespace Test
22+
} // namespace Monitoring
23+
} // namespace AliceO2

0 commit comments

Comments
 (0)