Skip to content

Commit dde3b11

Browse files
BonnyAD9sedmicha
authored andcommitted
TCP - Add plugin class
1 parent 91861ae commit dde3b11

File tree

3 files changed

+132
-0
lines changed

3 files changed

+132
-0
lines changed

src/plugins/input/tcp/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ add_library(tcp-input MODULE
1111
src/Lz4Decoder.cpp
1212
src/DecoderFactory.cpp
1313
src/Acceptor.cpp
14+
src/Plugin.cpp
1415
)
1516

1617
if (CMAKE_HOST_SYSTEM_NAME STREQUAL "FreeBSD" OR CMAKE_HOST_SYSTEM_NAME STREQUAL "OpenBSD")
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Tcp input plugin for ipfixcol2 (source file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#include "Plugin.hpp"
12+
13+
#include <array> // array
14+
#include <stdexcept> // exception
15+
#include <cstddef> // size_t
16+
17+
#include <ipfixcol2.h> // ipx_ctx_t, IPX_CTX_ERROR, IPX_CTX_INFO, IPX_CTX_WARNING, ipx_session
18+
19+
#include "Config.hpp" // Config
20+
#include "DecoderFactory.hpp" // DecoderFactory
21+
#include "Connection.hpp" // Connection
22+
23+
namespace tcp_in {
24+
25+
Plugin::Plugin(ipx_ctx_t *ctx, Config &config) :
26+
m_ctx(ctx),
27+
m_clients(ctx),
28+
m_acceptor(m_clients, DecoderFactory(), ctx)
29+
{
30+
m_acceptor.bind_addresses(config);
31+
m_acceptor.start();
32+
}
33+
34+
void Plugin::get() {
35+
constexpr int MAX_CONNECTIONS = 16;
36+
std::array<Connection *, MAX_CONNECTIONS> connections{};
37+
38+
auto count = m_clients.wait_for_connections(connections.begin(), connections.size());
39+
40+
for (size_t i = 0; i < count; ++i) {
41+
try {
42+
if (!connections[i]->receive(m_ctx)) {
43+
// EOF reached
44+
auto session = connections[i]->get_session();
45+
IPX_CTX_INFO(m_ctx, "Closing %s", session->ident);
46+
m_clients.close_connection(session);
47+
}
48+
} catch (std::exception &ex) {
49+
IPX_CTX_ERROR(m_ctx, "%s", ex.what());
50+
auto session = connections[i]->get_session();
51+
IPX_CTX_INFO(m_ctx, "Closing %s", session->ident);
52+
m_clients.close_connection(session);
53+
}
54+
}
55+
}
56+
57+
void Plugin::close_session(const ipx_session *session) noexcept {
58+
m_clients.close_connection(session);
59+
}
60+
61+
Plugin::~Plugin() {
62+
try {
63+
m_acceptor.stop();
64+
m_clients.close_all_connections();
65+
} catch (std::exception &ex) {
66+
IPX_CTX_WARNING(m_ctx, "%s", ex.what());
67+
}
68+
}
69+
70+
} // namespace tcp_in
71+
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* \file
3+
* \author Jakub Antonín Štigler <[email protected]>
4+
* \brief Tcp input plugin for ipfixcol2 (header file)
5+
* \date 2024
6+
*
7+
* Copyright: (C) 2023 CESNET, z.s.p.o.
8+
* SPDX-License-Identifier: BSD-3-Clause
9+
*/
10+
11+
#pragma once
12+
13+
#include <ipfixcol2.h> // ipx_ctx_t, ipx_session
14+
15+
#include "Config.hpp" // Config
16+
#include "ClientManager.hpp" // ClientManager
17+
#include "Acceptor.hpp" // Acceptor
18+
19+
namespace tcp_in {
20+
21+
/** TCP input plugin for ipfixcol2. */
22+
class Plugin {
23+
public:
24+
/**
25+
* @brief Creates new TCP plugin instance.
26+
* @param[in] ctx
27+
* @param[in] config configuration of the plugin
28+
* @throws when fails to init acceptor
29+
* @throws when fails to bind to addresses from config
30+
*/
31+
Plugin(ipx_ctx_t *ctx, Config &config);
32+
33+
// force that Plugin stays in its original memory (so that reference to `m_clients` in acceptor
34+
// stays valid)
35+
Plugin(const Plugin &) = delete;
36+
Plugin(Plugin &&) = delete;
37+
38+
/**
39+
* @brief Wait for the next tcp message and process all received messages.
40+
* @throws when fails to wait for connections
41+
* @throws when fails to receive
42+
*/
43+
void get();
44+
45+
/**
46+
* @brief Close the given session.
47+
* @param session Session to close.
48+
*/
49+
void close_session(const ipx_session *session) noexcept;
50+
51+
~Plugin();
52+
private:
53+
ipx_ctx_t *m_ctx;
54+
ClientManager m_clients;
55+
/** Acceptor thread. */
56+
Acceptor m_acceptor;
57+
};
58+
59+
} // namespace tcp_in
60+

0 commit comments

Comments
 (0)