5050#include < assert.h>
5151#include < endian.h>
5252#include < memory>
53+ #include < csignal>
54+ #include < fcntl.h>
5355
5456#define __STDC_FORMAT_MACROS
5557#include < inttypes.h>
@@ -138,7 +140,7 @@ IPFIXExporter::IPFIXExporter() :
138140 sequenceNum (0 ), exportedPackets(0 ),
139141 fd (-1 ), addrinfo(nullptr ),
140142 host (" " ), port(4739 ), protocol(IPPROTO_TCP),
141- ip (AF_UNSPEC), flags(0 ),
143+ ip (AF_UNSPEC), flags(0 ), non_blocking_tcp( false ),
142144 reconnectTimeout (RECONNECT_TIMEOUT), lastReconnect(0 ), odid(0 ),
143145 templateRefreshTime (TEMPLATE_REFRESH_TIME),
144146 templateRefreshPackets (TEMPLATE_REFRESH_PACKETS),
@@ -177,6 +179,10 @@ void IPFIXExporter::init(const char *params)
177179 protocol = IPPROTO_UDP;
178180 }
179181
182+ if (parser.m_non_blocking_tcp ) {
183+ non_blocking_tcp = true ;
184+ }
185+
180186 if (mtu <= IPFIX_HEADER_SIZE) {
181187 throw PluginError (" IPFIX message MTU size should be at least " + std::to_string (IPFIX_HEADER_SIZE));
182188 }
@@ -194,6 +200,9 @@ void IPFIXExporter::init(const char *params)
194200 if (verbose) {
195201 fprintf (stderr, " VERBOSE: IPFIX export plugin init end\n " );
196202 }
203+
204+ // ignore SIGPIPE signal and handle error by return value
205+ signal (SIGPIPE, SIG_IGN);
197206}
198207
199208void IPFIXExporter::init (const char *params, Plugins &plugins)
@@ -863,6 +872,10 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
863872
864873 /* Say that we should try to connect and send data again */
865874 return 1 ;
875+ case EAGAIN:
876+ // EAGAIN is returned when the socket is non-blocking and the send buffer is full
877+ // possible wait and stop flag check
878+ continue ;
866879 default :
867880 /* Unknown error */
868881 if (verbose) {
@@ -890,6 +903,79 @@ int IPFIXExporter::send_packet(ipfix_packet_t *packet)
890903 return 0 ;
891904}
892905
906+ static int connect_non_blocking (int fd, struct addrinfo * addr_info, bool verbose)
907+ {
908+ int flags = fcntl (fd, F_GETFL, 0 );
909+ if (flags == -1 ) {
910+ if (verbose) {
911+ fprintf (stderr, " VERBOSE: Cannot get socket flags\n " );
912+ }
913+ return -1 ;
914+ }
915+
916+ if (fcntl (fd, F_SETFL, flags | O_NONBLOCK) == -1 ) {
917+ if (verbose) {
918+ fprintf (stderr, " VERBOSE: Cannot set socket to non-blocking mode\n " );
919+ }
920+ return -1 ;
921+ }
922+
923+ const int connectResult = connect (fd, addr_info->ai_addr , addr_info->ai_addrlen );
924+ const int isTCPConnectInProgress = ((connectResult == -1 ) && (errno == EINPROGRESS));
925+
926+ if ((connectResult == -1 ) && (!isTCPConnectInProgress)) {
927+ if (verbose) {
928+ fprintf (stderr, " VERBOSE: Cannot connect to collector\n " );
929+ }
930+ return -1 ;
931+ }
932+
933+ if ((connectResult == 0 ) && (isTCPConnectInProgress == 0 )) {
934+ return 0 ;
935+ }
936+
937+ const std::size_t MAX_CONNECTION_TRY = 10 ;
938+ std::size_t connectionTry = 0 ;
939+ while (connectionTry < MAX_CONNECTION_TRY) {
940+ fd_set collectorSocket;
941+ FD_ZERO (&collectorSocket);
942+ FD_SET (fd, &collectorSocket);
943+
944+ struct timeval tv;
945+ tv.tv_sec = 0 ;
946+ tv.tv_usec = 10000 ;
947+
948+ const int selectResult = select (fd + 1 , NULL , &collectorSocket, NULL , &tv);
949+
950+ if (selectResult < 0 ) {
951+ if (verbose) {
952+ perror (" VERBOSE: select() failed" );
953+ }
954+ return -1 ;
955+ }
956+
957+ if (FD_ISSET (fd, &collectorSocket)) {
958+ struct sockaddr_in junk;
959+ socklen_t length = sizeof (junk);
960+ memset (&junk, 0 , sizeof (junk));
961+ if (getpeername (fd, (struct sockaddr *) &junk, &length) == 0 ) {
962+ return 0 ;
963+ } else {
964+ connectionTry++;
965+ continue ;
966+ }
967+ } else {
968+ connectionTry++;
969+ }
970+ }
971+
972+ if (verbose) {
973+ perror (" VERBOSE: Cannot connect to collector" );
974+ }
975+
976+ return -1 ;
977+ }
978+
893979/* *
894980 * \brief Create connection to collector
895981 *
@@ -952,22 +1038,33 @@ int IPFIXExporter::connect_to_collector()
9521038 continue ;
9531039 }
9541040
955- /* connect to server with TCP and SCTP */
956- if (protocol != IPPROTO_UDP &&
957- connect (fd, tmp->ai_addr , tmp->ai_addrlen ) == -1 ) {
958- if (verbose) {
959- perror (" VERBOSE: Cannot connect to collector" );
1041+ if (protocol == IPPROTO_UDP) {
1042+ break ;
1043+ }
1044+
1045+ if (non_blocking_tcp) {
1046+ if (connect_non_blocking (fd, tmp, verbose) == -1 ) {
1047+ if (verbose) {
1048+ perror (" VERBOSE: Cannot connect to collector" );
1049+ }
1050+ ::close (fd);
1051+ fd = -1 ;
1052+ continue ;
1053+ }
1054+ } else {
1055+ if (connect (fd, tmp->ai_addr , tmp->ai_addrlen ) == -1 ) {
1056+ if (verbose) {
1057+ perror (" VERBOSE: Cannot connect to collector" );
1058+ }
1059+ ::close (fd);
1060+ fd = -1 ;
1061+ continue ;
9601062 }
961- ::close (fd);
962- fd = -1 ;
963- continue ;
9641063 }
9651064
9661065 /* Connected, meaningless for UDP */
967- if (protocol != IPPROTO_UDP) {
968- if (verbose) {
969- fprintf (stderr, " VERBOSE: Successfully connected to collector\n " );
970- }
1066+ if (verbose) {
1067+ fprintf (stderr, " VERBOSE: Successfully connected to collector\n " );
9711068 }
9721069 break ;
9731070 }
0 commit comments