Skip to content

Commit 3ad9ede

Browse files
authored
Add InfluxDB over Unix datagram socket backend (#115)
* Add unix socket transport * fix verbosities * Use datagram unix sockets
1 parent dc428c7 commit 3ad9ede

File tree

7 files changed

+116
-7
lines changed

7 files changed

+116
-7
lines changed

CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ set(SRCS
7878
src/Transports/UDP.cxx
7979
src/Transports/TCP.cxx
8080
src/Transports/HTTP.cxx
81+
src/Transports/Unix.cxx
8182
src/Exceptions/MonitoringException.cxx
8283
)
8384

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ See table below to find out how to create `URI` for each backend:
7171
| ------------ |:---------:|:----------------------:|:----------:| -----------------:|
7272
| InfluxDB | HTTP | `influxdb-http` | `?db=<db>` | `info` |
7373
| InfluxDB | UDP | `influxdb-udp` | - | `info` |
74+
| InfluxDB | Unix datagram | `influxdb-unix` | - | `info` |
7475
| ApMon | UDP | `apmon` | - | `info` |
7576
| StdOut | - | `stdout`, `infologger` | - | `debug` |
7677
| Flume | UDP | `flume` | - | `info` |

src/Backends/InfluxDB.cxx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
#include <string>
99
#include "../Transports/UDP.h"
1010
#include "../Transports/HTTP.h"
11+
#include "../Transports/Unix.h"
1112
#include "../Exceptions/MonitoringException.h"
1213

1314
namespace o2
@@ -35,6 +36,12 @@ InfluxDB::InfluxDB(const std::string& host, unsigned int port, const std::string
3536
<< ":" << std::to_string(port) << "/write?" << search << ")" << MonLogger::End();
3637
}
3738

39+
InfluxDB::InfluxDB(const std::string& socketPath)
40+
{
41+
transport = std::make_unique<transports::Unix>(socketPath);
42+
MonLogger::Get() << "InfluxDB/Unix backend initialized (" << socketPath << ")" << MonLogger::End();
43+
}
44+
3845
inline unsigned long InfluxDB::convertTimestamp(const std::chrono::time_point<std::chrono::system_clock>& timestamp)
3946
{
4047
return std::chrono::duration_cast <std::chrono::nanoseconds>(

src/Backends/InfluxDB.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,8 @@ class InfluxDB final : public Backend
3838
/// \param path Query search providing database name
3939
InfluxDB(const std::string& host, unsigned int port, const std::string& search);
4040

41+
InfluxDB(const std::string& socketPath);
42+
4143
/// Default destructor
4244
~InfluxDB() = default;
4345

src/MonitoringFactory.cxx

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ namespace o2
2727
namespace monitoring
2828
{
2929

30+
static const std::map<std::string, Verbosity> verbosities = {
31+
{"/prod", Verbosity::Prod},
32+
{"/info", Verbosity::Info},
33+
{"/debug", Verbosity::Debug}
34+
};
35+
3036
std::unique_ptr<Backend> getStdOut(http::url) {
3137
return std::make_unique<backends::StdOut>();
3238
}
@@ -42,6 +48,16 @@ std::unique_ptr<Backend> getInfluxDb(http::url uri) {
4248
if (uri.protocol == "http") {
4349
return std::make_unique<backends::InfluxDB>(uri.host, uri.port, uri.search);
4450
}
51+
if (uri.protocol == "unix") {
52+
std::string path = uri.path;;
53+
auto found = std::find_if(begin(verbosities), end(verbosities),
54+
[&](const auto& s)
55+
{return path.find(s.first) != std::string::npos; });
56+
if (found != end(verbosities)) {
57+
path.erase(path.rfind('/'));
58+
}
59+
return std::make_unique<backends::InfluxDB>(path);
60+
}
4561
throw std::runtime_error("InfluxDB transport protocol not supported");
4662
}
4763

@@ -64,12 +80,6 @@ std::unique_ptr<Backend> getFlume(http::url uri) {
6480
}
6581

6682
void MonitoringFactory::SetVerbosity(std::string selected, std::unique_ptr<Backend>& backend) {
67-
static const std::map<std::string, Verbosity> verbosities = {
68-
{"/prod", Verbosity::Prod},
69-
{"/info", Verbosity::Info},
70-
{"/debug", Verbosity::Debug}
71-
};
72-
7383
auto found = verbosities.find(selected);
7484
if (found == verbosities.end()) {
7585
throw std::runtime_error("Unrecognised verbosity");
@@ -86,6 +96,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url) {
8696
{"stdout", getStdOut},
8797
{"influxdb-udp", getInfluxDb},
8898
{"influxdb-http", getInfluxDb},
99+
{"influxdb-unix", getInfluxDb},
89100
{"apmon", getApMon},
90101
{"flume", getFlume},
91102
{"no-op", getNoop}
@@ -103,7 +114,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url) {
103114

104115
auto backend = iterator->second(parsedUrl);
105116
if (!parsedUrl.path.empty() && parsedUrl.path != "/") {
106-
SetVerbosity(parsedUrl.path, backend);
117+
SetVerbosity(parsedUrl.path.substr(parsedUrl.path.rfind("/")), backend);
107118
}
108119
return backend;
109120
}

src/Transports/Unix.cxx

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
///
2+
/// \file Unix.cxx
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#include "Unix.h"
7+
#include <string>
8+
9+
namespace o2
10+
{
11+
/// ALICE O2 Monitoring system
12+
namespace monitoring
13+
{
14+
/// Monitoring transports
15+
namespace transports
16+
{
17+
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
18+
Unix::Unix(const std::string &socketPath) :
19+
mSocket(mIoService), mEndpoint(socketPath)
20+
{
21+
mSocket.open();
22+
}
23+
24+
void Unix::send(std::string&& message)
25+
{
26+
mSocket.send_to(boost::asio::buffer(message, message.size()), mEndpoint);
27+
}
28+
#endif // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
29+
} // namespace transports
30+
} // namespace monitoring
31+
} // namespace o2

src/Transports/Unix.h

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
///
2+
/// \file Unix.h
3+
/// \author Adam Wegrzynek <[email protected]>
4+
///
5+
6+
#ifndef ALICEO2_MONITORING_TRANSPORTS_UNIX_H
7+
#define ALICEO2_MONITORING_TRANSPORTS_UNIX_H
8+
9+
#include "TransportInterface.h"
10+
11+
#include <boost/array.hpp>
12+
#include <boost/asio.hpp>
13+
#include <boost/algorithm/string.hpp>
14+
#include <chrono>
15+
#include <string>
16+
17+
namespace o2
18+
{
19+
/// ALICE O2 Monitoring system
20+
namespace monitoring
21+
{
22+
/// Monitoring transports
23+
namespace transports
24+
{
25+
26+
/// \brief Transport that sends string formatted metrics via Unix datagram socket
27+
class Unix : public TransportInterface
28+
{
29+
public:
30+
/// \param hostname
31+
/// \param port
32+
Unix(const std::string &socketPath);
33+
34+
/// Default destructor
35+
~Unix() = default;
36+
37+
/// \param message r-value string formated
38+
void send(std::string&& message) override;
39+
40+
private:
41+
/// Boost Asio I/O functionality
42+
boost::asio::io_service mIoService;
43+
#if defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
44+
/// Unix socket
45+
boost::asio::local::datagram_protocol::socket mSocket;
46+
47+
/// Unix endpoint
48+
boost::asio::local::datagram_protocol::endpoint mEndpoint;
49+
#endif // defined(BOOST_ASIO_HAS_LOCAL_SOCKETS)
50+
};
51+
52+
} // namespace transports
53+
} // namespace monitoring
54+
} // namespace o2
55+
56+
#endif // ALICEO2_MONITORING_TRANSPORTS_UNIX_H

0 commit comments

Comments
 (0)