Skip to content

Commit 68ec5a8

Browse files
committed
Forwarder: clear buffers on connection lost
Unfinished transfers would remain in buffers when we lost connection. In case of TCP, this is a problem. A message might be half-way send when the connection drops, resulting in the other half being sent after the TCP connection reestabilishes. The collector would interpret this as an entirely new message, resulting in bogus data being decoded.
1 parent 30bb2c8 commit 68ec5a8

File tree

2 files changed

+39
-19
lines changed

2 files changed

+39
-19
lines changed

src/plugins/output/forwarder/src/Connection.cpp

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,7 @@ Connection::forward_message(ipx_msg_ipfix_t *msg)
8787
sender.process_message(msg);
8888

8989
} catch (const ConnectionError &err) {
90-
// In case connection was lost, we have to resend templates when it reconnects
91-
sender.clear_templates();
90+
on_connection_lost();
9291
throw err;
9392
}
9493
}
@@ -107,31 +106,36 @@ Connection::advance_transfers()
107106

108107
IPX_CTX_DEBUG(m_log_ctx, "Waiting transfers on connection %s: %zu", m_ident.c_str(), m_transfers.size());
109108

110-
for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
109+
try {
110+
for (auto it = m_transfers.begin(); it != m_transfers.end(); ) {
111111

112-
Transfer &transfer = *it;
112+
Transfer &transfer = *it;
113113

114-
assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
114+
assert(transfer.data.size() <= UINT16_MAX); // The transfer consists of one IPFIX message which cannot be larger
115115

116-
ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
117-
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
116+
ssize_t ret = send(m_sockfd.get(), &transfer.data[transfer.offset],
117+
transfer.data.size() - transfer.offset, MSG_DONTWAIT | MSG_NOSIGNAL);
118118

119-
check_socket_error(ret);
119+
check_socket_error(ret);
120120

121-
size_t sent = std::max<ssize_t>(0, ret);
122-
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
121+
size_t sent = std::max<ssize_t>(0, ret);
122+
IPX_CTX_DEBUG(m_log_ctx, "Sent %zu/%zu B to %s", sent, transfer.data.size(), m_ident.c_str());
123123

124-
// Is the transfer done?
125-
if (transfer.offset + sent == transfer.data.size()) {
126-
it = m_transfers.erase(it);
127-
// Remove the transfer and continue with the next one
124+
// Is the transfer done?
125+
if (transfer.offset + sent == transfer.data.size()) {
126+
it = m_transfers.erase(it);
127+
// Remove the transfer and continue with the next one
128128

129-
} else {
130-
transfer.offset += sent;
129+
} else {
130+
transfer.offset += sent;
131131

132-
// Finish, cannot advance next transfer before the one before it is fully sent
133-
break;
132+
// Finish, cannot advance next transfer before the one before it is fully sent
133+
break;
134+
}
134135
}
136+
} catch (ConnectionError& err) {
137+
on_connection_lost();
138+
throw err;
135139
}
136140
}
137141

@@ -261,3 +265,16 @@ Connection::check_socket_error(ssize_t sock_ret)
261265
throw ConnectionError(errbuf);
262266
}
263267
}
268+
269+
void
270+
Connection::on_connection_lost()
271+
{
272+
for (auto& p : m_senders) {
273+
// In case connection was lost, we have to resend templates when it reconnects
274+
Sender& sender = *p.second.get();
275+
sender.clear_templates();
276+
}
277+
278+
// Do not continue any of the transfers that haven't been finished so we don't end up in the middle of a message
279+
m_transfers.clear();
280+
}

src/plugins/output/forwarder/src/Connection.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,4 +181,7 @@ class Connection {
181181

182182
void
183183
check_socket_error(ssize_t sock_ret);
184-
};
184+
185+
void
186+
on_connection_lost();
187+
};

0 commit comments

Comments
 (0)