Skip to content

Commit 71387e9

Browse files
BonnyAD9sedmicha
authored andcommitted
TCP - Add ClientManager
1 parent ac0e5aa commit 71387e9

File tree

3 files changed

+233
-0
lines changed

3 files changed

+233
-0
lines changed

src/plugins/input/tcp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ add_library(tcp-input MODULE
66
src/DecodeBuffer.cpp
77
src/Connection.cpp
88
src/Epoll.cpp
9+
src/ClientManager.cpp
910
)
1011

1112
if (CMAKE_HOST_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_HOST_SYSTEM_NAME STREQUAL "OpenBSD")
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Manages TCP connection (source file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "ClientManager.hpp"
12+
13+
#include <stdexcept> // runtime_error, exception
14+
#include <string> // string
15+
#include <memory> // unique_ptr
16+
#include <cerrno> // errno, EINTR
17+
#include <array> // array
18+
#include <cstddef> // size_t
19+
#include <mutex> // mutex, lock_guard
20+
#include <vector> // vector
21+
22+
#include <fcntl.h> // fcntl, F_GETFL, F_SETFL, O_NONBLOCK
23+
#include <netinet/in.h> // INET6_ADDRSTRLEN
24+
25+
#include <ipfixcol2.h> // ipx_strerror, ipx_ctx_t, ipx_session, IPX_CTX_INFO, IPX_CTX_WARNING
26+
27+
#include "Connection.hpp" // Connection
28+
#include "Decoder.hpp" // Decoder
29+
#include "UniqueFd.hpp" // UniqueFd
30+
31+
namespace tcp_in {
32+
33+
ClientManager::ClientManager(ipx_ctx_t *ctx) :
34+
m_ctx(ctx),
35+
m_epoll(),
36+
m_mutex(),
37+
m_connections()
38+
{}
39+
40+
void ClientManager::add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder) {
41+
const char *err_str;
42+
43+
// get the flags and set it to non-blocking mode
44+
int flags = fcntl(fd.get(), F_GETFL, 0);
45+
if (flags == -1) {
46+
ipx_strerror(errno, err_str);
47+
throw std::runtime_error(
48+
"Failed to get flags from file descriptor: " + std::string(err_str)
49+
);
50+
}
51+
52+
flags |= O_NONBLOCK;
53+
if (fcntl(fd.get(), F_SETFL, flags) == -1) {
54+
ipx_strerror(errno, err_str);
55+
throw std::runtime_error("Failed to set non-blocking mode: " + std::string(err_str));
56+
}
57+
58+
int borrowed_fd = fd.get();
59+
60+
std::unique_ptr<Connection> connection(new Connection(std::move(fd), std::move(decoder)));
61+
62+
auto net = &connection->get_session()->tcp.net;
63+
std::array<char, INET6_ADDRSTRLEN> src_addr_str{};
64+
inet_ntop(net->l3_proto, &net->addr_src, src_addr_str.begin(), src_addr_str.size());
65+
IPX_CTX_INFO(m_ctx, "New exporter connected from '%s'.", src_addr_str.begin());
66+
IPX_CTX_INFO(
67+
m_ctx,
68+
"Using %s Decoder for the new connection",
69+
connection->get_decoder().get_name()
70+
);
71+
72+
std::lock_guard<std::mutex> lock(m_mutex);
73+
74+
auto con_ptr = connection.get();
75+
m_connections.push_back(std::move(connection));
76+
77+
m_epoll.add(borrowed_fd, con_ptr);
78+
}
79+
80+
void ClientManager::close_connection(const ipx_session *session) {
81+
std::lock_guard<std::mutex> lock(m_mutex);
82+
83+
size_t i;
84+
for (i = 0; i < m_connections.size(); ++i) {
85+
if (m_connections[i]->get_session() == session) {
86+
break;
87+
}
88+
}
89+
90+
if (i == m_connections.size()) {
91+
return;
92+
}
93+
94+
close_connection_internal(i);
95+
}
96+
97+
98+
size_t ClientManager::wait_for_connections(Connection **connections, int max_connections) {
99+
// timeout for waiting for new data (in milliseconds, -1 => infinite)
100+
constexpr int GETTER_TIMEOUT = 10;
101+
102+
std::vector<epoll_event> events(max_connections);
103+
104+
int ev_valid = m_epoll.wait(events.data(), max_connections, GETTER_TIMEOUT);
105+
if (ev_valid == -1) {
106+
const char *err_str;
107+
ipx_strerror(errno, err_str);
108+
throw std::runtime_error("Failed to wait for new data: " + std::string(err_str));
109+
}
110+
111+
events.resize(ev_valid);
112+
113+
for (size_t i = 0; i < events.size(); ++i) {
114+
connections[i] = reinterpret_cast<Connection *>(events[i].data.ptr);
115+
}
116+
117+
return events.size();
118+
}
119+
120+
void ClientManager::close_all_connections() {
121+
std::lock_guard<std::mutex> lock(m_mutex);
122+
123+
while (m_connections.size() != 0) {
124+
close_connection_internal(m_connections.size() - 1);
125+
}
126+
}
127+
128+
void ClientManager::close_connection_internal(
129+
size_t connection_idx
130+
) noexcept {
131+
if (connection_idx != m_connections.size() - 1) {
132+
m_connections[connection_idx].swap(m_connections[m_connections.size() - 1]);
133+
}
134+
135+
auto con = m_connections[m_connections.size() - 1].get();
136+
137+
if (!m_epoll.remove(con->get_fd())) {
138+
const char *err_str;
139+
ipx_strerror(errno, err_str);
140+
IPX_CTX_WARNING(
141+
m_ctx,
142+
"Failed to deregister the session %s from epoll: %s",
143+
con->get_session()->ident,
144+
err_str
145+
)
146+
}
147+
148+
try {
149+
con->close(m_ctx);
150+
} catch (std::exception &ex) {
151+
IPX_CTX_WARNING(m_ctx, "%s", ex.what());
152+
}
153+
154+
m_connections.pop_back();
155+
}
156+
157+
} // namespace tcp_in
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Manages TCP connection (header file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <vector> // std::vector
14+
#include <memory> // std::unique_ptr
15+
#include <mutex> // std::mutex
16+
#include <cstddef> // size_t
17+
18+
#include <ipfixcol2.h> // ipx_session, ipx_ctx_t
19+
20+
#include "Connection.hpp" // Connection
21+
#include "Decoder.hpp" // Decoder
22+
#include "UniqueFd.hpp" // UniqueFd
23+
#include "Epoll.hpp" // Epoll
24+
25+
namespace tcp_in {
26+
27+
/** Manager for TCP connections */
28+
class ClientManager {
29+
public:
30+
/**
31+
* @brief Creates client manager with no clients.
32+
* @throws when fails to create epoll
33+
*/
34+
ClientManager(ipx_ctx_t *ctx);
35+
36+
/**
37+
* @brief Adds connection to the vector and epoll.
38+
* @param fd file descriptor of the new tcp connection.
39+
* @param decoder decoder for the connection.
40+
* @throws when fails to add the new connection to epoll or when fails to create new session.
41+
*/
42+
void add_connection(UniqueFd fd, std::unique_ptr<Decoder> decoder);
43+
44+
/**
45+
* @brief Removes connection from the vector based on its session. This is safe only for the
46+
* main thread (not the acceptor thread).
47+
* @param ctx Context used for closing sessions.
48+
* @param session session of the connection to remove.
49+
*/
50+
void close_connection(const ipx_session *session);
51+
52+
/**
53+
* @brief Waits for new data on at least one connection and gets all the connections with new
54+
* data.
55+
* @param connectons Where to put the connections.
56+
* @param max_connections Max number of connections to set to `connections`
57+
* @return number of connections set to `connectinos`
58+
* @throws when fails to wait
59+
*/
60+
size_t wait_for_connections(Connection **connections, int max_connections);
61+
62+
/** Closes all connections. */
63+
void close_all_connections();
64+
private:
65+
/** Closes connection at the given index. DOES NOT SYNCHRONIZE */
66+
void close_connection_internal(size_t connection_idx) noexcept;
67+
68+
ipx_ctx_t *m_ctx;
69+
Epoll m_epoll;
70+
std::mutex m_mutex;
71+
std::vector<std::unique_ptr<Connection>> m_connections;
72+
};
73+
74+
} // namespace tcp_in
75+

0 commit comments

Comments
 (0)