Skip to content

Commit 36b1e9a

Browse files
author
Lukas Hutak
committed
TCP input: replace waiting for whole IPFIX Messages with continous buffering of its parts
The fixes situation when a connection with an exporter is unexpectly closed by the collector when a part of IPFIX message is sent over TCP but the rest is still on the way for long pariod of time. Previously the plugin waited up to half a second for the rest of the message. However, if no parts have been received, the connection was prematurely closed due to connection timeout. Moreover, waiting for IPFIX Message parts (i.e. blocking) could caused performance degredation if multiple exporters were connected to the collector at the same time. In this commit, input sockets of all expoters are always non-blocking and parts of IPFIX Messages are buffered until the whole IPFIX Message is received.
1 parent 66c1c6f commit 36b1e9a

File tree

1 file changed

+237
-57
lines changed
  • src/plugins/input/tcp

1 file changed

+237
-57
lines changed

src/plugins/input/tcp/tcp.c

Lines changed: 237 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include <arpa/inet.h>
4848
#include <netinet/in.h>
4949
#include <unistd.h>
50+
#include <fcntl.h>
5051

5152
#include <stdlib.h>
5253
#include <pthread.h>
@@ -62,10 +63,6 @@
6263
#define GETTER_TIMEOUT (10)
6364
/** Max sockets events processed in the getter - i.e. epoll_wait array size */
6465
#define GETTER_MAX_EVENTS (16)
65-
/** Timeout to read whole IPFIX Message after at least part has been received (in microseconds) */
66-
#define GETTER_RECV_TIMEOUT (500000)
67-
/** Default size of a buffer prepared for new IPFIX/NetFlow message (bytes) */
68-
#define DEF_MSG_SIZE (4096)
6966

7067
/** Plugin description */
7168
IPX_API struct ipx_plugin_info ipx_plugin_info = {
@@ -91,6 +88,13 @@ struct tcp_pair {
9188
struct ipx_session *session;
9289
/** No message has been received from the Session yet */
9390
bool new_connection;
91+
92+
/** Partly received message that waits for completition */
93+
uint8_t *msg;
94+
/** Allocated size of the partly received <em>msg</em> message */
95+
uint16_t msg_size;
96+
/** Already receive part of the <em>msg</em> message */
97+
uint16_t msg_offset;
9498
};
9599

96100
/** Instance data */
@@ -143,7 +147,7 @@ static int
143147
active_session_add(struct tcp_data *data, int sd, struct ipx_session *session)
144148
{
145149
// Create a new pair
146-
struct tcp_pair *pair = malloc(sizeof(*pair));
150+
struct tcp_pair *pair = calloc(1, sizeof(*pair));
147151
if (!pair) {
148152
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
149153
return IPX_ERR_NOMEM;
@@ -242,7 +246,11 @@ active_session_remove_aux(struct tcp_data *data, size_t idx)
242246
}
243247
}
244248

245-
// Close internal structures an remove it from the list (do NOT free SESSION)
249+
// Free internal structures and remove the pair from the list (do NOT free SESSION)
250+
if (pair->msg) {
251+
free(pair->msg);
252+
}
253+
246254
close(pair->fd);
247255
free(pair);
248256

@@ -309,14 +317,21 @@ listener_add_connection(struct tcp_data *data, int sd)
309317
assert(sd >= 0);
310318
const char *err_str;
311319

312-
// Set receive timeout (after data on the socket is ready)
313-
struct timeval rcv_timeout;
314-
rcv_timeout.tv_sec = GETTER_RECV_TIMEOUT / 1000000;
315-
rcv_timeout.tv_usec = GETTER_RECV_TIMEOUT % 1000000;
316-
if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &rcv_timeout, sizeof(rcv_timeout)) == -1) {
320+
// Set non-blocking mode on the socket
321+
int flags = fcntl(sd, F_GETFL, 0);
322+
if (flags == -1) {
317323
ipx_strerror(errno, err_str);
318-
IPX_CTX_WARNING(data->ctx, "Listener: Failed to specify receiving timeout of a socket: %s",
324+
IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
319325
err_str);
326+
return IPX_ERR_DENIED;
327+
}
328+
329+
flags |= O_NONBLOCK;
330+
if (fcntl(sd, F_SETFL, flags) == -1) {
331+
ipx_strerror(errno, err_str);
332+
IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
333+
err_str);
334+
return IPX_ERR_DENIED;
320335
}
321336

322337
// Get the description of the remove address
@@ -810,102 +825,267 @@ active_destroy(ipx_ctx_t *ctx, struct tcp_data *instance)
810825
}
811826

812827
/**
813-
* \brief Get an IPFIX message from a socket and pass it
828+
* \brief Try to read an IPFIX Message header
829+
*
830+
* The whole header might not be available right now. In that case, the function
831+
* will only read and store available part and it will successfully return. When
832+
* this happens, the allocated message is smaller that IPFIX Message header. The
833+
* next call of this function will tried to read the rest.
834+
*
835+
* If the whole header is available, the function will allocate message buffer
836+
* sufficient for the rest of the message.
814837
*
815838
* \param[in] ctx Instance data (necessary for passing messages)
816839
* \param[in] pair Connection pair (socket descriptor and session) to receive from
817840
* \return #IPX_OK on success
818-
* \return #IPX_ERR_EOF if the socket is closed
841+
* \return #IPX_ERR_EOF if the socket has been closed
819842
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
820843
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
821844
*/
822845
static int
823-
socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
846+
socket_process_receive_header(ipx_ctx_t *ctx, struct tcp_pair *pair)
824847
{
825-
const char *err_str;
826-
struct fds_ipfix_msg_hdr hdr;
827-
static_assert(sizeof(hdr) == FDS_IPFIX_MSG_HDR_LEN, "Invalid size of IPFIX Message header");
848+
struct fds_ipfix_msg_hdr hdr = {0};
849+
uint8_t *hdr_raw = (uint8_t *) &hdr;
828850

829-
// Get the message header (do not move pointer)
830-
ssize_t len = recv(pair->fd, &hdr, FDS_IPFIX_MSG_HDR_LEN, MSG_WAITALL | MSG_PEEK);
851+
uint16_t remains = FDS_IPFIX_MSG_HDR_LEN;
852+
uint16_t offset = 0;
853+
ssize_t len;
854+
855+
uint8_t *msg_buffer;
856+
uint16_t msg_version;
857+
uint16_t msg_size;
858+
859+
assert(!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN);
860+
861+
if (pair->msg) {
862+
// Fill the header with previously received data
863+
offset = pair->msg_offset;
864+
remains = FDS_IPFIX_MSG_HDR_LEN - offset;
865+
866+
memcpy(hdr_raw, pair->msg, offset);
867+
}
868+
869+
len = recv(pair->fd, &hdr_raw[offset], remains, 0);
831870
if (len == 0) {
832-
// Connection terminated
833-
IPX_CTX_INFO(ctx, "Connection from '%s' closed.", pair->session->ident);
834-
return IPX_ERR_EOF;
871+
// Connection has been closed
872+
if (offset > 0) {
873+
IPX_CTX_WARNING(ctx, "Connection with '%s' has been unexpectly closed",
874+
pair->session->ident);
875+
return IPX_ERR_FORMAT;
876+
} else {
877+
IPX_CTX_INFO(ctx, "Connection with '%s' closed.", pair->session->ident);
878+
return IPX_ERR_EOF;
879+
}
835880
}
836881

837-
if (len == -1 || len < FDS_IPFIX_MSG_HDR_LEN) {
838-
// Failed to read header -> close
839-
int error_code = (len == -1) ? errno : EINTR;
840-
ipx_strerror(error_code, err_str);
841-
IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to failure to receive "
842-
"an IPFIX Message header: %s", pair->session->ident, err_str);
882+
if (len < 0) {
883+
// Something went wrong
884+
const char *err_str;
885+
886+
if (errno == EWOULDBLOCK || errno == EAGAIN) {
887+
return IPX_OK;
888+
}
889+
890+
ipx_strerror(errno, err_str);
891+
IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
892+
pair->session->ident, err_str);
843893
return IPX_ERR_FORMAT;
844894
}
845-
assert(len == FDS_IPFIX_MSG_HDR_LEN);
846895

847-
// Check the header (version, size)
848-
uint16_t msg_version = ntohs(hdr.version);
849-
uint16_t msg_size = ntohs(hdr.length);
850-
uint32_t msg_odid = ntohl(hdr.odid);
896+
offset += len;
897+
898+
if (offset < FDS_IPFIX_MSG_HDR_LEN) {
899+
// We don't have whole IPFIX Message header
900+
uint8_t *ptr = realloc(pair->msg, offset);
901+
if (!ptr) {
902+
IPX_CTX_ERROR(ctx,
903+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
904+
pair->session->ident, __FILE__, __LINE__);
905+
return IPX_ERR_NOMEM;
906+
}
907+
908+
memcpy(ptr, hdr_raw, offset);
909+
pair->msg = ptr;
910+
pair->msg_offset = offset;
911+
pair->msg_size = offset;
912+
913+
return IPX_OK;
914+
}
915+
916+
// Check the IPFIX Message header
917+
msg_version = ntohs(hdr.version);
918+
msg_size = ntohs(hdr.length);
851919

852920
if (msg_version != FDS_IPFIX_VERSION || msg_size < FDS_IPFIX_MSG_HDR_LEN) {
853-
// Unsupported header version
854-
IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to the unsupported version of "
855-
"IPFIX/NetFlow.", pair->session->ident);
921+
// Invalid header version
922+
IPX_CTX_WARNING(ctx,
923+
"Connection with '%s' closed due to invalid IPFIX Message header.",
924+
pair->session->ident);
856925
return IPX_ERR_FORMAT;
857926
}
858927

859-
// Read the whole message
860-
uint8_t *buffer = malloc(msg_size * sizeof(uint8_t));
861-
if (!buffer) {
862-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation failure! (%s:%d).",
928+
// Preallocated buffer for the rest of the IPFIX Message body
929+
msg_buffer = realloc(pair->msg, msg_size);
930+
if (!msg_buffer) {
931+
IPX_CTX_ERROR(ctx,
932+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
863933
pair->session->ident, __FILE__, __LINE__);
864934
return IPX_ERR_NOMEM;
865935
}
866936

867-
len = recv(pair->fd, buffer, msg_size, MSG_WAITALL);
868-
if (len != msg_size) {
869-
int error_code = (len == -1) ? errno : ETIMEDOUT;
870-
ipx_strerror(error_code, err_str);
871-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to failure while reading from "
872-
"its socket: %s.", pair->session->ident, err_str);
873-
free(buffer);
937+
memcpy(msg_buffer, hdr_raw, FDS_IPFIX_MSG_HDR_LEN);
938+
pair->msg = msg_buffer;
939+
pair->msg_offset = FDS_IPFIX_MSG_HDR_LEN;
940+
pair->msg_size = msg_size;
941+
942+
return IPX_OK;
943+
}
944+
945+
/**
946+
* \brief Try to read an IPFIX Message body
947+
*
948+
* The whole body might not be available right now. In that case, the function
949+
* will only read and store available part and it will successfully return. When
950+
* this happens, the message offset is smaller that the message size. The next call
951+
* of this function will tried to read the rest.
952+
*
953+
* \param[in] ctx Instance data (necessary for passing messages)
954+
* \param[in] pair Connection pair (socket descriptor and session) to receive from
955+
* \return #IPX_OK on success
956+
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
957+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
958+
*/
959+
static int
960+
socket_process_receive_body(ipx_ctx_t *ctx, struct tcp_pair *pair)
961+
{
962+
const uint16_t remains = pair->msg_size - pair->msg_offset;
963+
assert(pair->msg && pair->msg_offset >= FDS_IPFIX_MSG_HDR_LEN);
964+
965+
if (remains == 0) {
966+
// This is an IPFIX Message without body...
967+
return IPX_OK;
968+
}
969+
970+
ssize_t len = recv(pair->fd, &pair->msg[pair->msg_offset], remains, 0);
971+
if (len == 0) {
972+
// Connection closed
973+
IPX_CTX_WARNING(ctx, "Connection from '%s' has been unexpectly closed",
974+
pair->session->ident);
874975
return IPX_ERR_FORMAT;
875976
}
876977

978+
if (len < 0) {
979+
// Something went wrong
980+
const char *err_str;
981+
982+
if (errno == EWOULDBLOCK || errno == EAGAIN) {
983+
return IPX_OK;
984+
}
985+
986+
ipx_strerror(errno, err_str);
987+
IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
988+
pair->session->ident, err_str);
989+
return IPX_ERR_FORMAT;
990+
}
991+
992+
pair->msg_offset += (uint16_t) len;
993+
return IPX_OK;
994+
}
995+
996+
/**
997+
* \brief Try to pass fully received IPFIX Message to the collector.
998+
*
999+
* \param[in] ctx Instance data (necessary for passing messages)
1000+
* \param[in] pair Connection pair (socket descriptor and session)
1001+
* \return #IPX_OK on success
1002+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
1003+
*/
1004+
static int
1005+
socket_process_pass_msg(ipx_ctx_t *ctx, struct tcp_pair *pair)
1006+
{
1007+
assert(pair->msg && pair->msg_offset == pair->msg_size && "Partly received message");
1008+
8771009
if (pair->new_connection) {
8781010
// Send information about the new Transport Session
879-
pair->new_connection = false;
8801011
ipx_msg_session_t *msg = ipx_msg_session_create(pair->session, IPX_MSG_SESSION_OPEN);
8811012
if (!msg) {
882-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
883-
"failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
884-
free(buffer);
1013+
IPX_CTX_ERROR(ctx,
1014+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
1015+
pair->session->ident, __FILE__, __LINE__);
8851016
return IPX_ERR_NOMEM;
8861017
}
8871018

8881019
ipx_ctx_msg_pass(ctx, ipx_msg_session2base(msg));
1020+
pair->new_connection = false;
8891021
}
8901022

8911023
// Create a message wrapper and pass the message
8921024
struct ipx_msg_ctx msg_ctx;
8931025
msg_ctx.session = pair->session;
894-
msg_ctx.odid = msg_odid;
1026+
msg_ctx.odid = ntohl(((struct fds_ipfix_msg_hdr *) pair->msg)->odid);
8951027
msg_ctx.stream = 0; // Streams are not supported over TCP
8961028

897-
ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, buffer, msg_size);
1029+
ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, pair->msg, pair->msg_size);
8981030
if (!msg) {
899-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
900-
"failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
901-
free(buffer);
1031+
IPX_CTX_ERROR(ctx,
1032+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
1033+
pair->session->ident, __FILE__, __LINE__);
9021034
return IPX_ERR_NOMEM;
9031035
}
9041036

9051037
ipx_ctx_msg_pass(ctx, ipx_msg_ipfix2base(msg));
1038+
1039+
pair->msg = NULL;
1040+
pair->msg_offset = 0;
1041+
pair->msg_size = 0;
1042+
9061043
return IPX_OK;
9071044
}
9081045

1046+
/**
1047+
* \brief Get an IPFIX message from a socket and pass it
1048+
*
1049+
* \param[in] ctx Instance data (necessary for passing messages)
1050+
* \param[in] pair Connection pair (socket descriptor and session) to receive from
1051+
* \return #IPX_OK on success
1052+
* \return #IPX_ERR_EOF if the socket is closed
1053+
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
1054+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
1055+
*/
1056+
static int
1057+
socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
1058+
{
1059+
int ret;
1060+
1061+
if (!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
1062+
// Try to receive IPFIX Message header first
1063+
ret = socket_process_receive_header(ctx, pair);
1064+
if (ret != IPX_OK) {
1065+
return ret;
1066+
}
1067+
1068+
if (pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
1069+
// Incomplete IPFIX Message header, read the rest later...
1070+
return IPX_OK;
1071+
}
1072+
}
1073+
1074+
// Receive rest of the message body
1075+
ret = socket_process_receive_body(ctx, pair);
1076+
if (ret != IPX_OK) {
1077+
return ret;
1078+
}
1079+
1080+
if (pair->msg_offset < pair->msg_size) {
1081+
// Incomplete IPFIX Message body, read the rest later...
1082+
return IPX_OK;
1083+
}
1084+
1085+
// Pass the message
1086+
return socket_process_pass_msg(ctx, pair);
1087+
}
1088+
9091089
// -------------------------------------------------------------------------------------------------
9101090

9111091
int

0 commit comments

Comments
 (0)