@@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg)
8787 sender.process_message (msg);
8888
8989 } catch (const ConnectionError &err) {
90- // In case connection was lost, we have to resend templates when it reconnects
91- sender.clear_templates ();
90+ on_connection_lost ();
9291 throw err;
9392 }
9493}
@@ -107,31 +106,36 @@ Connection::advance_transfers()
107106
108107 IPX_CTX_DEBUG (m_log_ctx, " Waiting transfers on connection %s: %zu" , m_ident.c_str (), m_transfers.size ());
109108
110- for (auto it = m_transfers.begin (); it != m_transfers.end (); ) {
109+ try {
110+ for (auto it = m_transfers.begin (); it != m_transfers.end (); ) {
111111
112- Transfer &transfer = *it;
112+ Transfer &transfer = *it;
113113
114- assert (transfer.data .size () <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
114+ assert (transfer.data .size () <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
115115
116- ssize_t ret = send (m_sockfd.get (), &transfer.data [transfer.offset ],
117- transfer.data .size () - transfer.offset , MSG_DONTWAIT | MSG_NOSIGNAL);
116+ ssize_t ret = send (m_sockfd.get (), &transfer.data [transfer.offset ],
117+ transfer.data .size () - transfer.offset , MSG_DONTWAIT | MSG_NOSIGNAL);
118118
119- check_socket_error (ret);
119+ check_socket_error (ret);
120120
121- size_t sent = std::max<ssize_t >(0 , ret);
122- IPX_CTX_DEBUG (m_log_ctx, " Sent %zu/%zu B to %s" , sent, transfer.data .size (), m_ident.c_str ());
121+ size_t sent = std::max<ssize_t >(0 , ret);
122+ IPX_CTX_DEBUG (m_log_ctx, " Sent %zu/%zu B to %s" , sent, transfer.data .size (), m_ident.c_str ());
123123
124- // Is the transfer done?
125- if (transfer.offset + sent == transfer.data .size ()) {
126- it = m_transfers.erase (it);
127- // Remove the transfer and continue with the next one
124+ // Is the transfer done?
125+ if (transfer.offset + sent == transfer.data .size ()) {
126+ it = m_transfers.erase (it);
127+ // Remove the transfer and continue with the next one
128128
129- } else {
130- transfer.offset += sent;
129+ } else {
130+ transfer.offset += sent;
131131
132- // Finish, cannot advance next transfer before the one before it is fully sent
133- break ;
132+ // Finish, cannot advance next transfer before the one before it is fully sent
133+ break ;
134+ }
134135 }
136+ } catch (ConnectionError& err) {
137+ on_connection_lost ();
138+ throw err;
135139 }
136140}
137141
@@ -261,3 +265,18 @@ Connection::check_socket_error(ssize_t sock_ret)
261265 throw ConnectionError (errbuf);
262266 }
263267}
268+
269+ void
270+ Connection::on_connection_lost ()
271+ {
272+ if (m_con_params.protocol == Protocol::TCP) {
273+ for (auto & p : m_senders) {
274+ // In case connection was lost, we have to resend templates when it reconnects
275+ Sender& sender = *p.second .get ();
276+ sender.clear_templates ();
277+ }
278+
279+ // Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message
280+ m_transfers.clear ();
281+ }
282+ }
0 commit comments