Skip to content

Commit b624c78

Browse files
authored
Merge pull request #208 from CESNET/ipfix-non-blocking-tcp
ipfix output - add option to use non-blocking tcp socket
2 parents bd14632 + 7c749c8 commit b624c78

File tree

4 files changed

+131
-15
lines changed

4 files changed

+131
-15
lines changed

init/ipfixprobed

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,20 @@ if [ -e "$CONFFILE" ]; then
8282
if [[ $UDP == "yes" ]]; then
8383
UDP_PARAM="udp";
8484
fi
85-
output="-o ipfix;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}"
85+
86+
NON_BLOCKING_TCP_PARAM=""
87+
if [[ $NON_BLOCKING_TCP == "yes" ]]; then
88+
NON_BLOCKING_TCP_PARAM="non-blocking-tcp";
89+
fi
90+
91+
output="-o ipfix;host=${HOST:-127.0.0.1};port=${PORT:-4739};id=${LINK:-0};dir=${DIR:-0};${UDP_PARAM};${NON_BLOCKING_TCP_PARAM};template=${TEMPLATE_REFRESH_RATE:-300}"
92+
93+
telemetry=""
94+
if [ "$USE_FUSE" = "1" ]; then
95+
telemetry="-t ${FUSE_MOUNT_POINT}"
96+
fi
97+
98+
exec /usr/bin/ipfixprobe "${dpdkinput[@]}" $input $storage $process $output $telemetry
8699

87100
telemetry=""
88101
if [ "$USE_FUSE" = "1" ]; then

init/link0.conf.example

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ PORT=4739
183183
# Use UDP yes/no? (instead of TCP)
184184
UDP=yes
185185

186+
# Use non-blocking socket for TCP connection yes/no?
187+
NON_BLOCKING_TCP=no
188+
186189
# Export ipfix template every N seconds (UDP)
187190
TEMPLATE_REFRESH_RATE=300
188191

output/ipfix.cpp

Lines changed: 110 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@
5050
#include <assert.h>
5151
#include <endian.h>
5252
#include <memory>
53+
#include <csignal>
54+
#include <fcntl.h>
5355

5456
#define __STDC_FORMAT_MACROS
5557
#include <inttypes.h>
@@ -138,7 +140,7 @@ IPFIXExporter::IPFIXExporter() :
138140
sequenceNum(0), exportedPackets(0),
139141
fd(-1), addrinfo(nullptr),
140142
host(""), port(4739), protocol(IPPROTO_TCP),
141-
ip(AF_UNSPEC), flags(0),
143+
ip(AF_UNSPEC), flags(0), non_blocking_tcp(false),
142144
reconnectTimeout(RECONNECT_TIMEOUT), lastReconnect(0), odid(0),
143145
templateRefreshTime(TEMPLATE_REFRESH_TIME),
144146
templateRefreshPackets(TEMPLATE_REFRESH_PACKETS),
@@ -177,6 +179,10 @@ void IPFIXExporter::init(const char *params)
177179
protocol = IPPROTO_UDP;
178180
}
179181

182+
if (parser.m_non_blocking_tcp) {
183+
non_blocking_tcp = true;
184+
}
185+
180186
if (mtu <= IPFIX_HEADER_SIZE) {
181187
throw PluginError("IPFIX message MTU size should be at least " + std::to_string(IPFIX_HEADER_SIZE));
182188
}
@@ -194,6 +200,9 @@ void IPFIXExporter::init(const char *params)
194200
if (verbose) {
195201
fprintf(stderr, "VERBOSE: IPFIX export plugin init end\n");
196202
}
203+
204+
// ignore SIGPIPE signal and handle error by return value
205+
signal(SIGPIPE, SIG_IGN);
197206
}
198207

199208
void IPFIXExporter::init(const char *params, Plugins &plugins)
@@ -863,6 +872,10 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
863872

864873
/* Say that we should try to connect and send data again */
865874
return 1;
875+
case EAGAIN:
876+
// EAGAIN is returned when the socket is non-blocking and the send buffer is full
877+
// possible wait and stop flag check
878+
continue;
866879
default:
867880
/* Unknown error */
868881
if (verbose) {
@@ -890,6 +903,79 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
890903
return 0;
891904
}
892905

906+
static int connect_non_blocking(int fd, struct addrinfo* addr_info, bool verbose)
907+
{
908+
int flags = fcntl(fd, F_GETFL, 0);
909+
if (flags == -1) {
910+
if (verbose) {
911+
fprintf(stderr, "VERBOSE: Cannot get socket flags\n");
912+
}
913+
return -1;
914+
}
915+
916+
if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {
917+
if (verbose) {
918+
fprintf(stderr, "VERBOSE: Cannot set socket to non-blocking mode\n");
919+
}
920+
return -1;
921+
}
922+
923+
const int connectResult = connect(fd, addr_info->ai_addr, addr_info->ai_addrlen);
924+
const int isTCPConnectInProgress = ((connectResult == -1) && (errno == EINPROGRESS));
925+
926+
if ((connectResult == -1) && (!isTCPConnectInProgress)) {
927+
if (verbose) {
928+
fprintf(stderr, "VERBOSE: Cannot connect to collector\n");
929+
}
930+
return -1;
931+
}
932+
933+
if ((connectResult == 0) && (isTCPConnectInProgress == 0)) {
934+
return 0;
935+
}
936+
937+
const std::size_t MAX_CONNECTION_TRY = 10;
938+
std::size_t connectionTry = 0;
939+
while (connectionTry < MAX_CONNECTION_TRY) {
940+
fd_set collectorSocket;
941+
FD_ZERO(&collectorSocket);
942+
FD_SET(fd, &collectorSocket);
943+
944+
struct timeval tv;
945+
tv.tv_sec = 0;
946+
tv.tv_usec = 10000;
947+
948+
const int selectResult = select(fd + 1, NULL, &collectorSocket, NULL, &tv);
949+
950+
if (selectResult < 0) {
951+
if (verbose) {
952+
perror("VERBOSE: select() failed");
953+
}
954+
return -1;
955+
}
956+
957+
if (FD_ISSET(fd, &collectorSocket)) {
958+
struct sockaddr_in junk;
959+
socklen_t length = sizeof(junk);
960+
memset(&junk, 0, sizeof(junk));
961+
if (getpeername(fd, (struct sockaddr*) &junk, &length) == 0) {
962+
return 0;
963+
} else {
964+
connectionTry++;
965+
continue;
966+
}
967+
} else {
968+
connectionTry++;
969+
}
970+
}
971+
972+
if (verbose) {
973+
perror("VERBOSE: Cannot connect to collector");
974+
}
975+
976+
return -1;
977+
}
978+
893979
/**
894980
* \brief Create connection to collector
895981
*
@@ -952,22 +1038,33 @@ int IPFIXExporter::connect_to_collector()
9521038
continue;
9531039
}
9541040

955-
/* connect to server with TCP and SCTP */
956-
if (protocol != IPPROTO_UDP &&
957-
connect(fd, tmp->ai_addr, tmp->ai_addrlen) == -1) {
958-
if (verbose) {
959-
perror("VERBOSE: Cannot connect to collector");
1041+
if (protocol == IPPROTO_UDP) {
1042+
break;
1043+
}
1044+
1045+
if (non_blocking_tcp) {
1046+
if (connect_non_blocking(fd, tmp, verbose) == -1) {
1047+
if (verbose) {
1048+
perror("VERBOSE: Cannot connect to collector");
1049+
}
1050+
::close(fd);
1051+
fd = -1;
1052+
continue;
1053+
}
1054+
} else {
1055+
if (connect(fd, tmp->ai_addr, tmp->ai_addrlen) == -1) {
1056+
if (verbose) {
1057+
perror("VERBOSE: Cannot connect to collector");
1058+
}
1059+
::close(fd);
1060+
fd = -1;
1061+
continue;
9601062
}
961-
::close(fd);
962-
fd = -1;
963-
continue;
9641063
}
9651064

9661065
/* Connected, meaningless for UDP */
967-
if (protocol != IPPROTO_UDP) {
968-
if (verbose) {
969-
fprintf(stderr, "VERBOSE: Successfully connected to collector\n");
970-
}
1066+
if (verbose) {
1067+
fprintf(stderr, "VERBOSE: Successfully connected to collector\n");
9711068
}
9721069
break;
9731070
}

output/ipfix.hpp

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,13 +73,14 @@ class IpfixOptParser : public OptionsParser
7373
uint16_t m_port;
7474
uint16_t m_mtu;
7575
bool m_udp;
76+
bool m_non_blocking_tcp;
7677
uint64_t m_id;
7778
uint32_t m_dir;
7879
uint32_t m_template_refresh_time;
7980
bool m_verbose;
8081

8182
IpfixOptParser() : OptionsParser("ipfix", "Output plugin for ipfix export"),
82-
m_host("127.0.0.1"), m_port(4739), m_mtu(DEFAULT_MTU), m_udp(false), m_id(DEFAULT_EXPORTER_ID), m_dir(0),
83+
m_host("127.0.0.1"), m_port(4739), m_mtu(DEFAULT_MTU), m_udp(false), m_non_blocking_tcp(false), m_id(DEFAULT_EXPORTER_ID), m_dir(0),
8384
m_template_refresh_time(TEMPLATE_REFRESH_TIME), m_verbose(false)
8485
{
8586
register_option("h", "host", "ADDR", "Remote collector address", [this](const char *arg){m_host = arg; return true;}, OptionFlags::RequiredArgument);
@@ -90,6 +91,7 @@ class IpfixOptParser : public OptionsParser
9091
[this](const char *arg){try {m_mtu = str2num<decltype(m_mtu)>(arg);} catch(std::invalid_argument &e) {return false;} return true;},
9192
OptionFlags::RequiredArgument);
9293
register_option("u", "udp", "", "Use UDP protocol", [this](const char *arg){m_udp = true; return true;}, OptionFlags::NoArgument);
94+
register_option("n", "non-blocking-tcp", "", "Use non-blocking socket for TCP protocol", [this](const char *arg){m_non_blocking_tcp = true; return true;}, OptionFlags::NoArgument);
9395
register_option("I", "id", "NUM", "Exporter identification",
9496
[this](const char *arg){try {m_id = str2num<decltype(m_id)>(arg);} catch(std::invalid_argument &e) {return false;} return true;},
9597
OptionFlags::RequiredArgument);
@@ -267,6 +269,7 @@ class IPFIXExporter : public OutputPlugin
267269
int protocol; /**< Collector connection protocol */
268270
int ip; /**< IP protocol version (AF_INET, ...) */
269271
int flags; /**< getaddrinfo flags */
272+
bool non_blocking_tcp;
270273

271274
uint32_t reconnectTimeout; /**< Timeout between connection retries */
272275
time_t lastReconnect; /**< Time in seconds of last connection retry */

0 commit comments

Comments
 (0)