Skip to content

Commit cde6533

Browse files
authored
[OMON-517] Add WebSocket backend to the library (#273)
1 parent 7b38395 commit cde6533

File tree

5 files changed

+143
-4
lines changed

5 files changed

+143
-4
lines changed

CMakeLists.txt

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ endif()
3131

3232
# Define project
3333
project(Monitoring
34-
VERSION 3.9.0
34+
VERSION 3.10.0
3535
DESCRIPTION "O2 Monitoring library"
3636
LANGUAGES CXX
3737
)
@@ -73,7 +73,7 @@ endif()
7373

7474
list(APPEND CMAKE_MODULE_PATH "${CMAKE_CURRENT_LIST_DIR}/cmake")
7575

76-
find_package(Boost REQUIRED COMPONENTS unit_test_framework program_options system)
76+
find_package(Boost 1.70 REQUIRED COMPONENTS unit_test_framework program_options system)
7777
find_package(ApMon MODULE)
7878
find_package(CURL MODULE)
7979
find_package(RdKafka CONFIG)
@@ -128,6 +128,7 @@ add_library(Monitoring SHARED
128128
src/Transports/TCP.cxx
129129
src/Transports/Unix.cxx
130130
src/Transports/StdOut.cxx
131+
src/Transports/WebSocket.cxx
131132
src/Exceptions/MonitoringException.cxx
132133
$<$<BOOL:${ApMon_FOUND}>:src/Backends/ApMonBackend.cxx>
133134
$<$<BOOL:${RdKafka_FOUND}>:src/Transports/Kafka.cxx>

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,14 @@ std::unique_ptr<Monitoring> monitoring = MonitoringFactory::Get("backend[-protoc
4141
4242
See the table below to find `URI`s for supported backends:
4343
44-
| Backend name | Transport | URI backend[-protocol] | URI query | Default verbosity |
44+
| Format | Transport | URI backend[-protocol] | URI query | Default verbosity |
4545
| ------------ |:-----------:|:----------------------:|:-----------:| -----------------:|
46-
| No-op | - | `no-op` | - | - |
46+
| - | - | `no-op` | - | - |
4747
| InfluxDB | UDP | `influxdb-udp` | - | `info` |
4848
| InfluxDB | Unix socket | `influxdb-unix` | - | `info` |
4949
| InfluxDB | StdOut | `influxdb-stdout` | - | `info` |
5050
| InfluxDB | Kafka | `influxdb-kafka` | Kafka topic | `info` |
51+
| InfluxDB | WebSocket | `influxdb-ws` | `token=TOKEN` | `info` |
5152
| InfluxDB 2.x | HTTP | `influxdbv2` | `org=ORG&bucket=BUCKET&token=TOKEN` | `info` |
5253
| ApMon | UDP | `apmon` | - | `info` |
5354
| StdOut | - | `stdout`, `infologger` | [Prefix] | `debug` |

src/MonitoringFactory.cxx

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include "Transports/UDP.h"
2929
#include "Transports/Unix.h"
3030
#include "Transports/StdOut.h"
31+
#include "Transports/WebSocket.h"
3132

3233
#ifdef O2_MONITORING_WITH_APPMON
3334
#include "Backends/ApMonBackend.h"
@@ -116,6 +117,17 @@ std::unique_ptr<Backend> getInfluxDb(http::url uri)
116117
auto transport = std::make_unique<transports::StdOut>();
117118
return std::make_unique<backends::InfluxDB>(std::move(transport));
118119
}
120+
if (uri.protocol == "ws") {
121+
std::string tokenLabel = "token=";
122+
auto tokenSearch = uri.search.find(tokenLabel);
123+
uri.path.erase(std::remove(uri.path.begin(), uri.path.end(), '/'), uri.path.end());
124+
if (tokenSearch == std::string::npos) {
125+
throw MonitoringException("Factory", "Grafana token is required for WebSocket backend");
126+
}
127+
std::string token = uri.search.substr(tokenSearch + tokenLabel.length());
128+
auto transport = std::make_unique<transports::WebSocket>(uri.host, uri.port, token, uri.path);
129+
return std::make_unique<backends::InfluxDB>(std::move(transport));
130+
}
119131
if (uri.protocol == "kafka") {
120132
#ifdef O2_MONITORING_WITH_KAFKA
121133
auto transport = std::make_unique<transports::Kafka>(uri.host, uri.port, uri.search);
@@ -165,6 +177,7 @@ std::unique_ptr<Backend> MonitoringFactory::GetBackend(std::string& url)
165177
{"influxdb-unix", getInfluxDb},
166178
{"influxdb-stdout", getInfluxDb},
167179
{"influxdb-kafka", getInfluxDb},
180+
{"influxdb-ws", getInfluxDb},
168181
{"influxdbv2", getInfluxDbv2},
169182
{"apmon", getApMon},
170183
{"no-op", getNoop}

src/Transports/WebSocket.cxx

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
///
13+
/// \file WebSocket.cxx
14+
/// \author Adam Wegrzynek <[email protected]>
15+
///
16+
17+
#include "WebSocket.h"
18+
#include "Exceptions/MonitoringException.h"
19+
#include "../MonLogger.h"
20+
21+
namespace o2
22+
{
23+
/// ALICE O2 Monitoring system
24+
namespace monitoring
25+
{
26+
/// Monitoring transports
27+
namespace transports
28+
{
29+
30+
WebSocket::WebSocket(const std::string& hostname, int port, const std::string& token, const std::string& stream) : mWebSocket(mIoContext)
31+
{
32+
boost::asio::ip::tcp::resolver resolver{mIoContext};
33+
auto const results = resolver.resolve(hostname, std::to_string(port));
34+
boost::asio::connect(mWebSocket.next_layer(), results.begin(), results.end());
35+
mWebSocket.set_option(beast::websocket::stream_base::decorator(
36+
[&token](beast::websocket::request_type& req) {
37+
req.set(beast::http::field::authorization, "Bearer " + token);
38+
}
39+
));
40+
mWebSocket.handshake(hostname, "/api/live/push/" + stream);
41+
}
42+
43+
WebSocket::~WebSocket()
44+
{
45+
mWebSocket.close(beast::websocket::close_code::normal);
46+
}
47+
48+
void WebSocket::send(std::string&& message)
49+
{
50+
mWebSocket.write(boost::asio::buffer(message));
51+
}
52+
53+
} // namespace transports
54+
} // namespace monitoring
55+
} // namespace o2

src/Transports/WebSocket.h

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
2+
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
3+
// All rights not expressly granted are reserved.
4+
//
5+
// This software is distributed under the terms of the GNU General Public
6+
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
7+
//
8+
// In applying this license CERN does not waive the privileges and immunities
9+
// granted to it by virtue of its status as an Intergovernmental Organization
10+
// or submit itself to any jurisdiction.
11+
12+
///
13+
/// \file WebSocket.h
14+
/// \author Adam Wegrzynek <[email protected]>
15+
///
16+
17+
#ifndef ALICEO2_MONITORING_TRANSPORTS_WEBSOCKET_H
18+
#define ALICEO2_MONITORING_TRANSPORTS_WEBSOCKET_H
19+
20+
#include "TransportInterface.h"
21+
22+
#include <boost/beast/core.hpp>
23+
#include <boost/beast/websocket.hpp>
24+
#include <boost/asio/connect.hpp>
25+
#include <boost/asio/ip/tcp.hpp>
26+
#include <string>
27+
28+
namespace beast = boost::beast;
29+
30+
namespace o2
31+
{
32+
/// ALICE O2 Monitoring system
33+
namespace monitoring
34+
{
35+
/// Monitoring transports
36+
namespace transports
37+
{
38+
39+
/// \brief Transport that sends string formatted metrics via WebSocket
40+
class WebSocket : public TransportInterface
41+
{
42+
public:
43+
/// Constructor
44+
/// \param hostname Grafana host
45+
/// \param port Grafana port
46+
/// \param token Grafana API token
47+
/// \param stream Name of WebSocket stream
48+
WebSocket(const std::string& hostname, int port, const std::string& token, const std::string& stream = "alice_o2");
49+
50+
/// Gracefull disconnect
51+
~WebSocket();
52+
53+
/// Sends metric via UDP
54+
/// \param message r-value string formated
55+
void send(std::string&& message) override;
56+
57+
private:
58+
/// IO context
59+
boost::asio::io_context mIoContext;
60+
61+
/// Websocket stream
62+
beast::websocket::stream<boost::asio::ip::tcp::socket> mWebSocket;
63+
};
64+
65+
} // namespace transports
66+
} // namespace monitoring
67+
} // namespace o2
68+
69+
#endif // ALICEO2_MONITORING_TRANSPORTS_WEBSOCKET_H

0 commit comments

Comments
 (0)