Skip to content

Commit 936b1b6

Browse files
authored
Merge pull request #47 from sedmicha/forwarder
Forwarder output: define new output module
2 parents 9bf092f + 13cebfe commit 936b1b6

17 files changed

+2219
-0
lines changed

src/plugins/output/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ add_subdirectory(json-kafka)
66
add_subdirectory(timecheck)
77
add_subdirectory(viewer)
88
add_subdirectory(ipfix)
9+
add_subdirectory(forwarder)
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
# Create a linkable module
2+
add_library(forwarder-output MODULE
3+
src/main.cpp
4+
src/config.h
5+
src/Forwarder.h
6+
src/ConnectionManager.h
7+
src/ConnectionManager.cpp
8+
src/ConnectionParams.h
9+
src/Connection.h
10+
src/Connection.cpp
11+
src/ConnectionBuffer.h
12+
src/SyncPipe.h
13+
src/IPFIXMessage.h
14+
src/MessageBuilder.h
15+
)
16+
17+
install(
18+
TARGETS forwarder-output
19+
LIBRARY DESTINATION "${INSTALL_DIR_LIB}/ipfixcol2/"
20+
)
21+
22+
if (ENABLE_DOC_MANPAGE)
23+
# Build a manual page
24+
set(SRC_FILE "${CMAKE_CURRENT_SOURCE_DIR}/doc/ipfixcol2-forwarder-output.7.rst")
25+
set(DST_FILE "${CMAKE_CURRENT_BINARY_DIR}/ipfixcol2-forwarder-output.7")
26+
27+
add_custom_command(TARGET forwarder-output PRE_BUILD
28+
COMMAND ${RST2MAN_EXECUTABLE} --syntax-highlight=none ${SRC_FILE} ${DST_FILE}
29+
DEPENDS ${SRC_FILE}
30+
VERBATIM
31+
)
32+
33+
install(
34+
FILES "${DST_FILE}"
35+
DESTINATION "${INSTALL_DIR_MAN}/man7"
36+
)
37+
endif()
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
Forwarder (output plugin)
2+
==========================
3+
4+
This plugin allows forwarding incoming IPFIX messages to other collector in various modes.
5+
6+
It can be used to broadcast messages to multiple collectors (e.g. a main and a backup collector),
7+
or to distribute messages across multiple collectors (e.g. for load balancing).
8+
9+
Example configuration
10+
---------------------
11+
12+
.. code-block:: xml
13+
14+
<output>
15+
<name>Forwarder</name>
16+
<plugin>forwarder</plugin>
17+
<params>
18+
<mode>roundrobin</mode>
19+
<protocol>tcp</protocol>
20+
<hosts>
21+
<host>
22+
<name>Subcollector 1</name>
23+
<address>127.0.0.1</address>
24+
<port>4751</port>
25+
</host>
26+
<host>
27+
<name>Subcollector 2</name>
28+
<address>localhost</address>
29+
<port>4752</port>
30+
</host>
31+
</hosts>
32+
</params>
33+
</output>
34+
35+
Parameters
36+
----------
37+
38+
:``mode``:
39+
The forwarding mode; round robin (messages are sent to one host at time and hosts are cycled through) or all (messages are broadcasted to all hosts)
40+
[values: RoundRobin/All]
41+
42+
:``protocol``:
43+
The transport protocol to use
44+
[values: TCP/UDP]
45+
46+
:``connectionBufferSize``:
47+
Size of the buffer of each connection (Warning: number of connections = number of input exporters * number of hosts)
48+
[value: number of bytes, default: 4194304]
49+
50+
:``templateRefreshIntervalSecs``:
51+
Send templates again every N seconds (UDP only)
52+
[value: number of seconds, default: 600]
53+
54+
:``templateRefreshIntervalBytes``:
55+
Send templates again every N bytes (UDP only)
56+
[value: number of bytes, default: 5000000]
57+
58+
:``reconnectIntervalSecs``:
59+
Attempt to reconnect every N seconds in case the connection drops (TCP only)
60+
[value: number of seconds, default: 10]
61+
62+
:``hosts``:
63+
The receiving hosts
64+
65+
:``host``:
66+
:``name``:
67+
Optional identification of the host
68+
[value: string, default: <address>:<port>]
69+
70+
:``address``:
71+
The address of the host
72+
[value: IPv4/IPv6 address or a hostname]
73+
74+
:``port``:
75+
The port to connect to
76+
[value: port number]
77+
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
* Template withdrawals
2+
* More effective way of handling template changes - currently all the templates are being sent again every time any change in templates is detected
3+
* Message MTU
4+
* Possible bug: when testing, a small number of data records seems to be lost (something like 20 out of 1,000,000)
5+
* Connection buffer size
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
============================
2+
ipfixcol2-forwarder-output
3+
============================
4+
5+
--------------------------
6+
Forwarder (output plugin)
7+
--------------------------
8+
9+
:Author: Michal Sedlak ([email protected])
10+
:Date: 2021-01-28
11+
:Copyright: Copyright © 2021 CESNET, z.s.p.o.
12+
:Version: 1.0
13+
:Manual section: 7
14+
:Manual group: IPFIXcol collector
15+
16+
Description
17+
-----------
18+
19+
.. include:: ../README.rst
20+
:start-line: 3
Lines changed: 130 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
/**
2+
* \file src/plugins/output/forwarder/src/Connection.cpp
3+
* \author Michal Sedlak <[email protected]>
4+
* \brief Buffered socket connection
5+
* \date 2021
6+
*/
7+
8+
/* Copyright (C) 2021 CESNET, z.s.p.o.
9+
*
10+
* Redistribution and use in source and binary forms, with or without
11+
* modification, are permitted provided that the following conditions
12+
* are met:
13+
* 1. Redistributions of source code must retain the above copyright
14+
* notice, this list of conditions and the following disclaimer.
15+
* 2. Redistributions in binary form must reproduce the above copyright
16+
* notice, this list of conditions and the following disclaimer in
17+
* the documentation and/or other materials provided with the
18+
* distribution.
19+
* 3. Neither the name of the Company nor the names of its contributors
20+
* may be used to endorse or promote products derived from this
21+
* software without specific prior written permission.
22+
*
23+
* ALTERNATIVELY, provided that this notice is retained in full, this
24+
* product may be distributed under the terms of the GNU General Public
25+
* License (GPL) version 2 or later, in which case the provisions
26+
* of the GPL apply INSTEAD OF those given above.
27+
*
28+
* This software is provided ``as is'', and any express or implied
29+
* warranties, including, but not limited to, the implied warranties of
30+
* merchantability and fitness for a particular purpose are disclaimed.
31+
* In no event shall the company or contributors be liable for any
32+
* direct, indirect, incidental, special, exemplary, or consequential
33+
* damages (including, but not limited to, procurement of substitute
34+
* goods or services; loss of use, data, or profits; or business
35+
* interruption) however caused and on any theory of liability, whether
36+
* in contract, strict liability, or tort (including negligence or
37+
* otherwise) arising in any way out of the use of this software, even
38+
* if advised of the possibility of such damage.
39+
*
40+
*/
41+
42+
#include "Connection.h"
43+
44+
#include <libfds.h>
45+
46+
Connection::Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size)
47+
: manager(manager)
48+
, params(params)
49+
, buffer(buffer_size)
50+
{
51+
}
52+
53+
bool
54+
Connection::connect()
55+
{
56+
if (sockfd >= 0) {
57+
::close(sockfd);
58+
}
59+
sockfd = params.make_socket();
60+
return sockfd >= 0;
61+
}
62+
63+
std::unique_lock<std::mutex>
64+
Connection::begin_write()
65+
{
66+
return std::unique_lock<std::mutex>(buffer_mutex);
67+
}
68+
69+
bool
70+
Connection::write(void *data, long length)
71+
{
72+
return buffer.write((uint8_t *)data, length);
73+
}
74+
75+
void
76+
Connection::rollback_write()
77+
{
78+
buffer.rollback();
79+
}
80+
81+
long
82+
Connection::writeable()
83+
{
84+
return buffer.writeable();
85+
}
86+
87+
void
88+
Connection::commit_write()
89+
{
90+
buffer.commit();
91+
manager.pipe.notify();
92+
has_data_to_send = buffer.readable();
93+
}
94+
95+
bool
96+
Connection::send_some()
97+
{
98+
if (params.protocol == TransProto::Udp) {
99+
while (1) {
100+
fds_ipfix_msg_hdr ipfix_header;
101+
if (!buffer.peek(ipfix_header)) {
102+
return true;
103+
}
104+
auto message_length = ntohs(ipfix_header.length);
105+
int ret = buffer.send_data(sockfd, message_length);
106+
if (ret == 0 || !buffer.readable()) {
107+
return true;
108+
} else if (ret < 0) {
109+
return false;
110+
}
111+
}
112+
return true;
113+
} else {
114+
return buffer.send_data(sockfd) >= 0;
115+
}
116+
}
117+
118+
void
119+
Connection::close()
120+
{
121+
close_flag = true;
122+
manager.pipe.notify();
123+
}
124+
125+
Connection::~Connection()
126+
{
127+
if (sockfd >= 0) {
128+
::close(sockfd);
129+
}
130+
}
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/**
2+
* \file src/plugins/output/forwarder/src/Connection.h
3+
* \author Michal Sedlak <[email protected]>
4+
* \brief Buffered socket connection
5+
* \date 2021
6+
*/
7+
8+
/* Copyright (C) 2021 CESNET, z.s.p.o.
9+
*
10+
* Redistribution and use in source and binary forms, with or without
11+
* modification, are permitted provided that the following conditions
12+
* are met:
13+
* 1. Redistributions of source code must retain the above copyright
14+
* notice, this list of conditions and the following disclaimer.
15+
* 2. Redistributions in binary form must reproduce the above copyright
16+
* notice, this list of conditions and the following disclaimer in
17+
* the documentation and/or other materials provided with the
18+
* distribution.
19+
* 3. Neither the name of the Company nor the names of its contributors
20+
* may be used to endorse or promote products derived from this
21+
* software without specific prior written permission.
22+
*
23+
* ALTERNATIVELY, provided that this notice is retained in full, this
24+
* product may be distributed under the terms of the GNU General Public
25+
* License (GPL) version 2 or later, in which case the provisions
26+
* of the GPL apply INSTEAD OF those given above.
27+
*
28+
* This software is provided ``as is'', and any express or implied
29+
* warranties, including, but not limited to, the implied warranties of
30+
* merchantability and fitness for a particular purpose are disclaimed.
31+
* In no event shall the company or contributors be liable for any
32+
* direct, indirect, incidental, special, exemplary, or consequential
33+
* damages (including, but not limited to, procurement of substitute
34+
* goods or services; loss of use, data, or profits; or business
35+
* interruption) however caused and on any theory of liability, whether
36+
* in contract, strict liability, or tort (including negligence or
37+
* otherwise) arising in any way out of the use of this software, even
38+
* if advised of the possibility of such damage.
39+
*
40+
*/
41+
42+
#pragma once
43+
44+
#include "ConnectionManager.h"
45+
#include "ConnectionParams.h"
46+
#include "ConnectionBuffer.h"
47+
48+
#include <sys/socket.h>
49+
#include <netinet/in.h>
50+
#include <arpa/inet.h>
51+
#include <unistd.h>
52+
53+
#include <atomic>
54+
#include <mutex>
55+
#include <cstdint>
56+
57+
class ConnectionManager;
58+
59+
class Connection
60+
{
61+
friend class ConnectionManager;
62+
63+
public:
64+
/// Flag indicating that the connection was lost and the forwarder needs to resend templates etc.
65+
/// The flag won't be reset when the connection is reestablished!
66+
std::atomic<bool> connection_lost_flag { false };
67+
68+
Connection(ConnectionManager &manager, ConnectionParams params, long buffer_size);
69+
70+
bool
71+
connect();
72+
73+
std::unique_lock<std::mutex>
74+
begin_write();
75+
76+
bool
77+
write(void *data, long length);
78+
79+
bool
80+
send_some();
81+
82+
void
83+
commit_write();
84+
85+
void
86+
rollback_write();
87+
88+
long
89+
writeable();
90+
91+
void
92+
close();
93+
94+
~Connection();
95+
96+
private:
97+
/// The manager managing this connection
98+
ConnectionManager &manager;
99+
100+
/// The parameters to estabilish the connection
101+
ConnectionParams params;
102+
103+
/// The connection socket
104+
int sockfd = -1;
105+
106+
/// Buffer for the data to send and a mutex guarding it
107+
/// (buffer will be accessed from sender thread and writer thread)
108+
std::mutex buffer_mutex;
109+
ConnectionBuffer buffer;
110+
111+
/// Flag indicating whether the buffer has any data to send so we don't have to lock the mutex every time
112+
/// (doesn't need to be atomic because we only set it while holding the mutex)
113+
bool has_data_to_send = false;
114+
115+
/// Flag indicating that the connection has been closed and can be disposed of after the data is sent
116+
std::atomic<bool> close_flag { false };
117+
};

0 commit comments

Comments
 (0)