Skip to content

Commit 91861ae

Browse files
BonnyAD9sedmicha
authored andcommitted
TCP - Add acceptor
1 parent b90cfe5 commit 91861ae

File tree

3 files changed

+330
-0
lines changed

3 files changed

+330
-0
lines changed

src/plugins/input/tcp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ add_library(tcp-input MODULE
1010
src/IpfixDecoder.cpp
1111
src/Lz4Decoder.cpp
1212
src/DecoderFactory.cpp
13+
src/Acceptor.cpp
1314
)
1415

1516
if (CMAKE_HOST_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_HOST_SYSTEM_NAME STREQUAL "OpenBSD")
Lines changed: 242 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,242 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Acceptor thread for TCP clients (header file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "Acceptor.hpp"
12+
13+
#include <array> // array
14+
#include <stdexcept> // runtime_error
15+
#include <cerrno> // errno, EINTR
16+
#include <string> // string
17+
#include <cstdint> // uint16_t
18+
#include <cinttypes> // PRIu16
19+
#include <cstddef> // size_t
20+
21+
#include <unistd.h> // pipe, write
22+
#include <sys/socket.h> // AF_INET, AF_INET6, SOCK_STREAM, sockaddr, socket, setsockopt, SOL_SOCKET,
23+
// SO_REUSEADDR, IPPROTO_IPV6, IPV6_V6ONLY, SOMAXCONN
24+
#include <netinet/in.h> // INET6_ADDRSTRLEN, sockaddr_in, sockaddr_in6, inet_ntop, in6addr_any
25+
26+
#include <ipfixcol2.h> // ipx_ctx_t, ipx_strerror, IPX_CTX_WARNING
27+
28+
#include "ClientManager.hpp" // ClientManager
29+
#include "DecoderFactory.hpp" // DecoderFactory
30+
#include "Config.hpp" // Config
31+
#include "UniqueFd.hpp" // UniqueFd
32+
#include "IpAddress.hpp" // IpAddress, IpVersion
33+
34+
namespace tcp_in {
35+
36+
Acceptor::Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx) :
37+
m_epoll(),
38+
m_sockets(),
39+
m_pipe_in(),
40+
m_pipe_out(),
41+
m_clients(clients),
42+
m_factory(std::move(factory)),
43+
m_thread(),
44+
m_ctx(ctx)
45+
{
46+
std::array<int, 2> pipe_fd{};
47+
if (pipe(pipe_fd.begin()) != 0) {
48+
const char *err_str;
49+
ipx_strerror(errno, err_str);
50+
throw std::runtime_error("Failed to create pipe: " + std::string(err_str));
51+
}
52+
53+
UniqueFd pipe_in(pipe_fd[1]);
54+
UniqueFd pipe_out(pipe_fd[0]);
55+
m_pipe_in.swap(pipe_in);
56+
m_pipe_out.swap(pipe_out);
57+
58+
m_epoll.add(m_pipe_out.get(), nullptr);
59+
}
60+
61+
void Acceptor::bind_addresses(Config &config) {
62+
if (config.local_addrs.size() == 0) {
63+
IpAddress addr(in6addr_any);
64+
add_address(addr, config.local_port, false);
65+
}
66+
67+
for (auto &addr : config.local_addrs) {
68+
add_address(addr, config.local_port, true);
69+
}
70+
}
71+
72+
void Acceptor::add_address(IpAddress &adr, uint16_t port, bool ipv6_only) {
73+
auto sd = bind_address(adr, port, ipv6_only);
74+
m_epoll.add(sd.get(), nullptr);
75+
m_sockets.push_back(std::move(sd));
76+
}
77+
78+
UniqueFd Acceptor::bind_address(IpAddress &addr, uint16_t port, bool ipv6_only) {
79+
sockaddr saddr{};
80+
auto v4 = reinterpret_cast<sockaddr_in *>(&saddr);
81+
auto v6 = reinterpret_cast<sockaddr_in6 *>(&saddr);
82+
size_t addr_len;
83+
84+
if (addr.version == IpVersion::IP4) {
85+
v4->sin_family = AF_INET;
86+
v4->sin_port = htons(port);
87+
v4->sin_addr = addr.v4;
88+
addr_len = sizeof(*v4);
89+
} else {
90+
v6->sin6_scope_id = 0;
91+
v6->sin6_family = AF_INET6;
92+
v6->sin6_port = htons(port);
93+
v6->sin6_addr = addr.v6;
94+
addr_len = sizeof(*v6);
95+
}
96+
97+
const char *err_str;
98+
99+
UniqueFd sd(socket(saddr.sa_family, SOCK_STREAM, 0));
100+
if (!sd) {
101+
ipx_strerror(errno, err_str);
102+
throw std::runtime_error("Failed to create socket: " + std::string(err_str));
103+
}
104+
105+
int on = 1;
106+
if (setsockopt(sd.get(), SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on)) == -1) {
107+
ipx_strerror(errno, err_str);
108+
IPX_CTX_WARNING(
109+
m_ctx,
110+
"Cannot turn on socket reuse option. It may take a while before the port can be used "
111+
"again: %s",
112+
err_str
113+
);
114+
}
115+
116+
if (addr.version == IpVersion::IP6) {
117+
int is_on = ipv6_only;
118+
if (setsockopt(sd.get(), IPPROTO_IPV6, IPV6_V6ONLY, &is_on, sizeof(is_on)) == -1) {
119+
ipx_strerror(errno, err_str);
120+
IPX_CTX_WARNING(
121+
m_ctx,
122+
"Failed to turn %s socket option IPV6_VONLY. Plugin may %s connections: %s",
123+
ipv6_only ? "on" : "off",
124+
ipv6_only ? "accept IPV6" : "not accept IPV4",
125+
err_str
126+
)
127+
}
128+
}
129+
130+
std::array<char, INET6_ADDRSTRLEN> addr_str{};
131+
inet_ntop(static_cast<int>(addr.version), &addr.v6, addr_str.begin(), INET6_ADDRSTRLEN);
132+
133+
if (bind(sd.get(), &saddr, addr_len) == -1) {
134+
ipx_strerror(errno, err_str);
135+
throw std::runtime_error(
136+
"Failed to bind to socket (local IP: "
137+
+ std::string(addr_str.begin())
138+
+ ", port: "
139+
+ std::to_string(port)
140+
+ "): "
141+
+ err_str
142+
);
143+
}
144+
145+
if (listen(sd.get(), SOMAXCONN) == -1) {
146+
ipx_strerror(errno, err_str);
147+
throw std::runtime_error(
148+
"Failed to listen on a socket (local IP: "
149+
+ std::string(addr_str.begin())
150+
+ ", port: "
151+
+ std::to_string(port)
152+
+ "): "
153+
+ err_str
154+
);
155+
}
156+
157+
IPX_CTX_INFO(m_ctx, "Listening on %s (port %" PRIu16 ")", addr_str.begin(), port);
158+
return sd;
159+
}
160+
161+
void Acceptor::start() {
162+
if (m_thread.joinable()) {
163+
throw std::runtime_error("Cannot start acceptor, it is already running.");
164+
}
165+
std::thread acceptor([=](){ mainloop(); });
166+
m_thread.swap(acceptor);
167+
}
168+
169+
void Acceptor::stop() {
170+
if (!m_thread.joinable()) {
171+
return;
172+
}
173+
174+
if (write(m_pipe_in.get(), "x", 1) != 1) {
175+
const char *err_str;
176+
ipx_strerror(errno, err_str);
177+
throw std::runtime_error(
178+
"Faidled to notify acceptor thread to exit by writing to pipe: "
179+
+ std::string(err_str)
180+
);
181+
}
182+
183+
m_thread.join();
184+
}
185+
186+
void Acceptor::mainloop() {
187+
epoll_event ev;
188+
const char *err_str;
189+
190+
while (true) {
191+
int ret = m_epoll.wait(&ev, 1);
192+
if (ret == -1) {
193+
ipx_strerror(errno, err_str);
194+
IPX_CTX_ERROR(m_ctx, "Acceptor: failed to wait for new connections: %s", err_str);
195+
return;
196+
}
197+
198+
if (ret != 1) {
199+
continue;
200+
}
201+
202+
if (ev.data.fd == m_pipe_out.get()) {
203+
char b;
204+
int ret = read(m_pipe_out.get(), &b, 1);
205+
if (ret == -1) {
206+
ipx_strerror(errno, err_str);
207+
IPX_CTX_ERROR(m_ctx, "Acceptor: Failed to read command from pipe: %s", err_str);
208+
return;
209+
}
210+
211+
// the other end of pipe was closed
212+
if (ret == 0) {
213+
IPX_CTX_INFO(m_ctx, "Acceptor: Command pipe was closed. Exiting.");
214+
return;
215+
}
216+
217+
// exit command was sent
218+
if (b == 'x') {
219+
IPX_CTX_INFO(m_ctx, "Acceptor: Exit command received. Exiting.");
220+
return;
221+
}
222+
223+
IPX_CTX_WARNING(m_ctx, "Acceptor: Received unknown command: '%c'", b);
224+
}
225+
226+
UniqueFd new_sd(accept(ev.data.fd, nullptr, nullptr));
227+
if (!new_sd) {
228+
ipx_strerror(errno, err_str);
229+
IPX_CTX_ERROR(m_ctx, "Acceptor: Failed to accept a new connection: ");
230+
continue;
231+
}
232+
233+
try {
234+
auto decoder = m_factory.detect_decoder(new_sd.get());
235+
m_clients.add_connection(std::move(new_sd), std::move(decoder));
236+
} catch (std::exception &ex) {
237+
IPX_CTX_ERROR(m_ctx, "Acceptor: %s", ex.what());
238+
}
239+
}
240+
}
241+
242+
} // namespace tcp_in
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Acceptor thread for TCP clients (header file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <thread> // std::thread
14+
#include <vector> // std::vector
15+
#include <cstdint> // uint16_t
16+
17+
#include <ipfixcol2.h> // ipx_ctx_t
18+
19+
#include "ClientManager.hpp" // ClientManager
20+
#include "DecoderFactory.hpp" // DecoderFactory
21+
#include "Config.hpp" // Config
22+
#include "IpAddress.hpp" // IpAddress
23+
#include "UniqueFd.hpp" // UniqueFd
24+
#include "Epoll.hpp" // Epoll
25+
26+
namespace tcp_in {
27+
28+
/** Acceptor thread for TCP clients. */
29+
class Acceptor {
30+
public:
31+
/**
32+
* @brief Creates the acceptor thread.
33+
*
34+
* @param clients Reference to client manager.
35+
* @param factory Initialized decoder factory.
36+
* @param config File configuration.
37+
* @param ctx The plugin context.
38+
*/
39+
Acceptor(ClientManager &clients, DecoderFactory factory, ipx_ctx_t *ctx);
40+
41+
// force that acceptor stays in its original memory (so that `this` pointer stays valid on the
42+
// other thread)
43+
Acceptor(const Acceptor &) = delete;
44+
Acceptor(Acceptor &&) = delete;
45+
46+
/**
47+
* @brief Creates sockets for each address. If there are no addresses, listens on all
48+
* interfaces.
49+
* @param config Configuration with the addresses and port.
50+
*/
51+
void bind_addresses(Config &config);
52+
53+
/**
54+
* @brief Starts the acceptor thread.
55+
* @throws When the acceptor thread is already started.
56+
*/
57+
void start();
58+
59+
/** Stops the acceptor thread. */
60+
void stop();
61+
62+
private:
63+
void add_address(IpAddress &adr, uint16_t port, bool ipv6_only);
64+
65+
UniqueFd bind_address(IpAddress &adr, uint16_t port, bool ipv6_only);
66+
67+
/** Runs on the other therad */
68+
void mainloop();
69+
70+
/** File descriptor of epoll for accepting connections. */
71+
Epoll m_epoll;
72+
/** Sockets listened to by epoll. */
73+
std::vector<UniqueFd> m_sockets;
74+
75+
/** Write 'x' to this to gracefully exit the thread. */
76+
UniqueFd m_pipe_in;
77+
/** Epoll listens to this, when it activates the acceptor thread will gracefuly exit. */
78+
UniqueFd m_pipe_out;
79+
80+
/** Accepted clients. */
81+
ClientManager &m_clients;
82+
DecoderFactory m_factory;
83+
std::thread m_thread;
84+
ipx_ctx_t *m_ctx;
85+
};
86+
87+
} // namespace tcp_in

0 commit comments

Comments
 (0)