Skip to content

Commit 30bb2c8

Browse files
committed
Forwarder: handle periodic message
Adds support for newly introduced "periodic message". Without this change, unfinished data transfers might never finish if no input data is coming.
1 parent 5cc2857 commit 30bb2c8

File tree

5 files changed

+40
-3
lines changed

5 files changed

+40
-3
lines changed

src/plugins/output/forwarder/src/Forwarder.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,14 @@ void Forwarder::handle_ipfix_message(ipx_msg_ipfix_t *msg)
105105
}
106106
}
107107

108+
void
109+
Forwarder::handle_periodic_message(ipx_msg_periodic_t *msg)
110+
{
111+
for (auto &host : m_hosts) {
112+
host->advance_transfers();
113+
}
114+
}
115+
108116
void
109117
Forwarder::forward_to_all(ipx_msg_ipfix_t *msg)
110118
{

src/plugins/output/forwarder/src/Forwarder.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,13 @@ class Forwarder {
8080
void
8181
handle_ipfix_message(ipx_msg_ipfix_t *msg);
8282

83+
/**
84+
* \brief Handle a periodic message
85+
* \param msg The periodic message
86+
*/
87+
void
88+
handle_periodic_message(ipx_msg_periodic_t *msg);
89+
8390
/**
8491
* \brief The destructor - finalize the forwarder
8592
*/
@@ -101,4 +108,5 @@ class Forwarder {
101108

102109
void
103110
forward_round_robin(ipx_msg_ipfix_t *msg);
104-
};
111+
112+
};

src/plugins/output/forwarder/src/Host.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,17 @@ Host::forward_message(ipx_msg_ipfix_t *msg)
142142
return true;
143143
}
144144

145+
void
146+
Host::advance_transfers()
147+
{
148+
for (auto &p : m_session_to_connection) {
149+
Connection &connection = *p.second.get();
150+
if (connection.check_connected()) {
151+
connection.advance_transfers();
152+
}
153+
}
154+
}
155+
145156
Host::~Host()
146157
{
147158
for (auto &p : m_session_to_connection) {

src/plugins/output/forwarder/src/Host.h

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,12 @@ class Host {
100100
bool
101101
forward_message(ipx_msg_ipfix_t *msg);
102102

103+
/**
104+
* \brief Advance the unfinished transfers
105+
*/
106+
void
107+
advance_transfers();
108+
103109
private:
104110
const std::string &m_ident;
105111

@@ -116,4 +122,4 @@ class Host {
116122
Connector &m_connector;
117123

118124
std::unordered_map<const ipx_session *, std::unique_ptr<Connection>> m_session_to_connection;
119-
};
125+
};

src/plugins/output/forwarder/src/main.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ ipx_plugin_init(ipx_ctx_t *ctx, const char *xml_config)
8888
return IPX_ERR_DENIED;
8989
}
9090

91-
ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION;
91+
ipx_msg_mask_t mask = IPX_MSG_IPFIX | IPX_MSG_SESSION | IPX_MSG_PERIODIC;
9292
ipx_ctx_subscribe(ctx, &mask, NULL);
9393
ipx_ctx_private_set(ctx, forwarder.release());
9494

@@ -121,6 +121,10 @@ ipx_plugin_process(ipx_ctx_t *ctx, void *priv, ipx_msg_t *msg)
121121
forwarder.handle_session_message(ipx_msg_base2session(msg));
122122
break;
123123

124+
case IPX_MSG_PERIODIC:
125+
forwarder.handle_periodic_message(ipx_msg_base2periodic(msg));
126+
break;
127+
124128
default: assert(0);
125129
}
126130

0 commit comments

Comments
 (0)