Skip to content

Commit e5cfcdf

Browse files
authored
Merge pull request #55 from CESNET/hutak-tcp-input-remove-read-timeout
TCP input: replace waiting for whole IPFIX Messages with continous buffering of its parts
2 parents 66c1c6f + 36b1e9a commit e5cfcdf

File tree

1 file changed

+237
-57
lines changed
  • src/plugins/input/tcp

1 file changed

+237
-57
lines changed

src/plugins/input/tcp/tcp.c

Lines changed: 237 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
#include <arpa/inet.h>
4848
#include <netinet/in.h>
4949
#include <unistd.h>
50+
#include <fcntl.h>
5051

5152
#include <stdlib.h>
5253
#include <pthread.h>
@@ -62,10 +63,6 @@
6263
#define GETTER_TIMEOUT (10)
6364
/** Max sockets events processed in the getter - i.e. epoll_wait array size */
6465
#define GETTER_MAX_EVENTS (16)
65-
/** Timeout to read whole IPFIX Message after at least part has been received (in microseconds) */
66-
#define GETTER_RECV_TIMEOUT (500000)
67-
/** Default size of a buffer prepared for new IPFIX/NetFlow message (bytes) */
68-
#define DEF_MSG_SIZE (4096)
6966

7067
/** Plugin description */
7168
IPX_API struct ipx_plugin_info ipx_plugin_info = {
@@ -91,6 +88,13 @@ struct tcp_pair {
9188
struct ipx_session *session;
9289
/** No message has been received from the Session yet */
9390
bool new_connection;
91+
92+
/** Partly received message that waits for completition */
93+
uint8_t *msg;
94+
/** Allocated size of the partly received <em>msg</em> message */
95+
uint16_t msg_size;
96+
/** Already receive part of the <em>msg</em> message */
97+
uint16_t msg_offset;
9498
};
9599

96100
/** Instance data */
@@ -143,7 +147,7 @@ static int
143147
active_session_add(struct tcp_data *data, int sd, struct ipx_session *session)
144148
{
145149
// Create a new pair
146-
struct tcp_pair *pair = malloc(sizeof(*pair));
150+
struct tcp_pair *pair = calloc(1, sizeof(*pair));
147151
if (!pair) {
148152
IPX_CTX_ERROR(data->ctx, "Memory allocation failed! (%s:%d)", __FILE__, __LINE__);
149153
return IPX_ERR_NOMEM;
@@ -242,7 +246,11 @@ active_session_remove_aux(struct tcp_data *data, size_t idx)
242246
}
243247
}
244248

245-
// Close internal structures an remove it from the list (do NOT free SESSION)
249+
// Free internal structures and remove the pair from the list (do NOT free SESSION)
250+
if (pair->msg) {
251+
free(pair->msg);
252+
}
253+
246254
close(pair->fd);
247255
free(pair);
248256

@@ -309,14 +317,21 @@ listener_add_connection(struct tcp_data *data, int sd)
309317
assert(sd >= 0);
310318
const char *err_str;
311319

312-
// Set receive timeout (after data on the socket is ready)
313-
struct timeval rcv_timeout;
314-
rcv_timeout.tv_sec = GETTER_RECV_TIMEOUT / 1000000;
315-
rcv_timeout.tv_usec = GETTER_RECV_TIMEOUT % 1000000;
316-
if (setsockopt(sd, SOL_SOCKET, SO_RCVTIMEO, &rcv_timeout, sizeof(rcv_timeout)) == -1) {
320+
// Set non-blocking mode on the socket
321+
int flags = fcntl(sd, F_GETFL, 0);
322+
if (flags == -1) {
317323
ipx_strerror(errno, err_str);
318-
IPX_CTX_WARNING(data->ctx, "Listener: Failed to specify receiving timeout of a socket: %s",
324+
IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
319325
err_str);
326+
return IPX_ERR_DENIED;
327+
}
328+
329+
flags |= O_NONBLOCK;
330+
if (fcntl(sd, F_SETFL, flags) == -1) {
331+
ipx_strerror(errno, err_str);
332+
IPX_CTX_WARNING(data->ctx, "Listener: Failed to set non-blocking mode: fcntl() failed: %s",
333+
err_str);
334+
return IPX_ERR_DENIED;
320335
}
321336

322337
// Get the description of the remove address
@@ -810,102 +825,267 @@ active_destroy(ipx_ctx_t *ctx, struct tcp_data *instance)
810825
}
811826

812827
/**
813-
* \brief Get an IPFIX message from a socket and pass it
828+
* \brief Try to read an IPFIX Message header
829+
*
830+
* The whole header might not be available right now. In that case, the function
831+
* will only read and store available part and it will successfully return. When
832+
* this happens, the allocated message is smaller that IPFIX Message header. The
833+
* next call of this function will tried to read the rest.
834+
*
835+
* If the whole header is available, the function will allocate message buffer
836+
* sufficient for the rest of the message.
814837
*
815838
* \param[in] ctx Instance data (necessary for passing messages)
816839
* \param[in] pair Connection pair (socket descriptor and session) to receive from
817840
* \return #IPX_OK on success
818-
* \return #IPX_ERR_EOF if the socket is closed
841+
* \return #IPX_ERR_EOF if the socket has been closed
819842
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
820843
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
821844
*/
822845
static int
823-
socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
846+
socket_process_receive_header(ipx_ctx_t *ctx, struct tcp_pair *pair)
824847
{
825-
const char *err_str;
826-
struct fds_ipfix_msg_hdr hdr;
827-
static_assert(sizeof(hdr) == FDS_IPFIX_MSG_HDR_LEN, "Invalid size of IPFIX Message header");
848+
struct fds_ipfix_msg_hdr hdr = {0};
849+
uint8_t *hdr_raw = (uint8_t *) &hdr;
828850

829-
// Get the message header (do not move pointer)
830-
ssize_t len = recv(pair->fd, &hdr, FDS_IPFIX_MSG_HDR_LEN, MSG_WAITALL | MSG_PEEK);
851+
uint16_t remains = FDS_IPFIX_MSG_HDR_LEN;
852+
uint16_t offset = 0;
853+
ssize_t len;
854+
855+
uint8_t *msg_buffer;
856+
uint16_t msg_version;
857+
uint16_t msg_size;
858+
859+
assert(!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN);
860+
861+
if (pair->msg) {
862+
// Fill the header with previously received data
863+
offset = pair->msg_offset;
864+
remains = FDS_IPFIX_MSG_HDR_LEN - offset;
865+
866+
memcpy(hdr_raw, pair->msg, offset);
867+
}
868+
869+
len = recv(pair->fd, &hdr_raw[offset], remains, 0);
831870
if (len == 0) {
832-
// Connection terminated
833-
IPX_CTX_INFO(ctx, "Connection from '%s' closed.", pair->session->ident);
834-
return IPX_ERR_EOF;
871+
// Connection has been closed
872+
if (offset > 0) {
873+
IPX_CTX_WARNING(ctx, "Connection with '%s' has been unexpectly closed",
874+
pair->session->ident);
875+
return IPX_ERR_FORMAT;
876+
} else {
877+
IPX_CTX_INFO(ctx, "Connection with '%s' closed.", pair->session->ident);
878+
return IPX_ERR_EOF;
879+
}
835880
}
836881

837-
if (len == -1 || len < FDS_IPFIX_MSG_HDR_LEN) {
838-
// Failed to read header -> close
839-
int error_code = (len == -1) ? errno : EINTR;
840-
ipx_strerror(error_code, err_str);
841-
IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to failure to receive "
842-
"an IPFIX Message header: %s", pair->session->ident, err_str);
882+
if (len < 0) {
883+
// Something went wrong
884+
const char *err_str;
885+
886+
if (errno == EWOULDBLOCK || errno == EAGAIN) {
887+
return IPX_OK;
888+
}
889+
890+
ipx_strerror(errno, err_str);
891+
IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
892+
pair->session->ident, err_str);
843893
return IPX_ERR_FORMAT;
844894
}
845-
assert(len == FDS_IPFIX_MSG_HDR_LEN);
846895

847-
// Check the header (version, size)
848-
uint16_t msg_version = ntohs(hdr.version);
849-
uint16_t msg_size = ntohs(hdr.length);
850-
uint32_t msg_odid = ntohl(hdr.odid);
896+
offset += len;
897+
898+
if (offset < FDS_IPFIX_MSG_HDR_LEN) {
899+
// We don't have whole IPFIX Message header
900+
uint8_t *ptr = realloc(pair->msg, offset);
901+
if (!ptr) {
902+
IPX_CTX_ERROR(ctx,
903+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
904+
pair->session->ident, __FILE__, __LINE__);
905+
return IPX_ERR_NOMEM;
906+
}
907+
908+
memcpy(ptr, hdr_raw, offset);
909+
pair->msg = ptr;
910+
pair->msg_offset = offset;
911+
pair->msg_size = offset;
912+
913+
return IPX_OK;
914+
}
915+
916+
// Check the IPFIX Message header
917+
msg_version = ntohs(hdr.version);
918+
msg_size = ntohs(hdr.length);
851919

852920
if (msg_version != FDS_IPFIX_VERSION || msg_size < FDS_IPFIX_MSG_HDR_LEN) {
853-
// Unsupported header version
854-
IPX_CTX_WARNING(ctx, "Connection from '%s' closed due to the unsupported version of "
855-
"IPFIX/NetFlow.", pair->session->ident);
921+
// Invalid header version
922+
IPX_CTX_WARNING(ctx,
923+
"Connection with '%s' closed due to invalid IPFIX Message header.",
924+
pair->session->ident);
856925
return IPX_ERR_FORMAT;
857926
}
858927

859-
// Read the whole message
860-
uint8_t *buffer = malloc(msg_size * sizeof(uint8_t));
861-
if (!buffer) {
862-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation failure! (%s:%d).",
928+
// Preallocated buffer for the rest of the IPFIX Message body
929+
msg_buffer = realloc(pair->msg, msg_size);
930+
if (!msg_buffer) {
931+
IPX_CTX_ERROR(ctx,
932+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
863933
pair->session->ident, __FILE__, __LINE__);
864934
return IPX_ERR_NOMEM;
865935
}
866936

867-
len = recv(pair->fd, buffer, msg_size, MSG_WAITALL);
868-
if (len != msg_size) {
869-
int error_code = (len == -1) ? errno : ETIMEDOUT;
870-
ipx_strerror(error_code, err_str);
871-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to failure while reading from "
872-
"its socket: %s.", pair->session->ident, err_str);
873-
free(buffer);
937+
memcpy(msg_buffer, hdr_raw, FDS_IPFIX_MSG_HDR_LEN);
938+
pair->msg = msg_buffer;
939+
pair->msg_offset = FDS_IPFIX_MSG_HDR_LEN;
940+
pair->msg_size = msg_size;
941+
942+
return IPX_OK;
943+
}
944+
945+
/**
946+
* \brief Try to read an IPFIX Message body
947+
*
948+
* The whole body might not be available right now. In that case, the function
949+
* will only read and store available part and it will successfully return. When
950+
* this happens, the message offset is smaller that the message size. The next call
951+
* of this function will tried to read the rest.
952+
*
953+
* \param[in] ctx Instance data (necessary for passing messages)
954+
* \param[in] pair Connection pair (socket descriptor and session) to receive from
955+
* \return #IPX_OK on success
956+
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
957+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
958+
*/
959+
static int
960+
socket_process_receive_body(ipx_ctx_t *ctx, struct tcp_pair *pair)
961+
{
962+
const uint16_t remains = pair->msg_size - pair->msg_offset;
963+
assert(pair->msg && pair->msg_offset >= FDS_IPFIX_MSG_HDR_LEN);
964+
965+
if (remains == 0) {
966+
// This is an IPFIX Message without body...
967+
return IPX_OK;
968+
}
969+
970+
ssize_t len = recv(pair->fd, &pair->msg[pair->msg_offset], remains, 0);
971+
if (len == 0) {
972+
// Connection closed
973+
IPX_CTX_WARNING(ctx, "Connection from '%s' has been unexpectly closed",
974+
pair->session->ident);
874975
return IPX_ERR_FORMAT;
875976
}
876977

978+
if (len < 0) {
979+
// Something went wrong
980+
const char *err_str;
981+
982+
if (errno == EWOULDBLOCK || errno == EAGAIN) {
983+
return IPX_OK;
984+
}
985+
986+
ipx_strerror(errno, err_str);
987+
IPX_CTX_WARNING(ctx, "Connection with '%s' failed: %s",
988+
pair->session->ident, err_str);
989+
return IPX_ERR_FORMAT;
990+
}
991+
992+
pair->msg_offset += (uint16_t) len;
993+
return IPX_OK;
994+
}
995+
996+
/**
997+
* \brief Try to pass fully received IPFIX Message to the collector.
998+
*
999+
* \param[in] ctx Instance data (necessary for passing messages)
1000+
* \param[in] pair Connection pair (socket descriptor and session)
1001+
* \return #IPX_OK on success
1002+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
1003+
*/
1004+
static int
1005+
socket_process_pass_msg(ipx_ctx_t *ctx, struct tcp_pair *pair)
1006+
{
1007+
assert(pair->msg && pair->msg_offset == pair->msg_size && "Partly received message");
1008+
8771009
if (pair->new_connection) {
8781010
// Send information about the new Transport Session
879-
pair->new_connection = false;
8801011
ipx_msg_session_t *msg = ipx_msg_session_create(pair->session, IPX_MSG_SESSION_OPEN);
8811012
if (!msg) {
882-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
883-
"failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
884-
free(buffer);
1013+
IPX_CTX_ERROR(ctx,
1014+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
1015+
pair->session->ident, __FILE__, __LINE__);
8851016
return IPX_ERR_NOMEM;
8861017
}
8871018

8881019
ipx_ctx_msg_pass(ctx, ipx_msg_session2base(msg));
1020+
pair->new_connection = false;
8891021
}
8901022

8911023
// Create a message wrapper and pass the message
8921024
struct ipx_msg_ctx msg_ctx;
8931025
msg_ctx.session = pair->session;
894-
msg_ctx.odid = msg_odid;
1026+
msg_ctx.odid = ntohl(((struct fds_ipfix_msg_hdr *) pair->msg)->odid);
8951027
msg_ctx.stream = 0; // Streams are not supported over TCP
8961028

897-
ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, buffer, msg_size);
1029+
ipx_msg_ipfix_t *msg = ipx_msg_ipfix_create(ctx, &msg_ctx, pair->msg, pair->msg_size);
8981030
if (!msg) {
899-
IPX_CTX_ERROR(ctx, "Connection from '%s' closed due to memory allocation "
900-
"failure! (%s:%d).", pair->session->ident, __FILE__, __LINE__);
901-
free(buffer);
1031+
IPX_CTX_ERROR(ctx,
1032+
"Connection with '%s' closed due to memory allocation failure! (%s:%d).",
1033+
pair->session->ident, __FILE__, __LINE__);
9021034
return IPX_ERR_NOMEM;
9031035
}
9041036

9051037
ipx_ctx_msg_pass(ctx, ipx_msg_ipfix2base(msg));
1038+
1039+
pair->msg = NULL;
1040+
pair->msg_offset = 0;
1041+
pair->msg_size = 0;
1042+
9061043
return IPX_OK;
9071044
}
9081045

1046+
/**
1047+
* \brief Get an IPFIX message from a socket and pass it
1048+
*
1049+
* \param[in] ctx Instance data (necessary for passing messages)
1050+
* \param[in] pair Connection pair (socket descriptor and session) to receive from
1051+
* \return #IPX_OK on success
1052+
* \return #IPX_ERR_EOF if the socket is closed
1053+
* \return #IPX_ERR_FORMAT if the message (or stream) is malformed and the connection MUST be closed
1054+
* \return #IPX_ERR_NOMEM on a memory allocation error and the connection MUST be closed
1055+
*/
1056+
static int
1057+
socket_process(ipx_ctx_t *ctx, struct tcp_pair *pair)
1058+
{
1059+
int ret;
1060+
1061+
if (!pair->msg || pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
1062+
// Try to receive IPFIX Message header first
1063+
ret = socket_process_receive_header(ctx, pair);
1064+
if (ret != IPX_OK) {
1065+
return ret;
1066+
}
1067+
1068+
if (pair->msg_offset < FDS_IPFIX_MSG_HDR_LEN) {
1069+
// Incomplete IPFIX Message header, read the rest later...
1070+
return IPX_OK;
1071+
}
1072+
}
1073+
1074+
// Receive rest of the message body
1075+
ret = socket_process_receive_body(ctx, pair);
1076+
if (ret != IPX_OK) {
1077+
return ret;
1078+
}
1079+
1080+
if (pair->msg_offset < pair->msg_size) {
1081+
// Incomplete IPFIX Message body, read the rest later...
1082+
return IPX_OK;
1083+
}
1084+
1085+
// Pass the message
1086+
return socket_process_pass_msg(ctx, pair);
1087+
}
1088+
9091089
// -------------------------------------------------------------------------------------------------
9101090

9111091
int

0 commit comments

Comments
 (0)