Skip to content

Commit fcfe5f8

Browse files
committed
JSON output: flush outputs to allow for more timely appearance of records in file output.
1 parent 737a1f4 commit fcfe5f8

File tree

4 files changed

+29
-3
lines changed

4 files changed

+29
-3
lines changed

src/plugins/output/json/src/File.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,14 @@ File::process(const char *str, size_t len)
220220
return IPX_OK;
221221
}
222222

223+
void
224+
File::flush(void)
225+
{
226+
if (_file) {
227+
fflush(_file);
228+
}
229+
}
230+
223231
/**
224232
* \brief Get a directory path for a time window
225233
* \param[in] tm Time window

src/plugins/output/json/src/File.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ class File : public Output {
6161

6262
// Store a record to the file
6363
int process(const char *str, size_t len);
64+
65+
void flush();
6466
private:
6567
/** Minimal window size */
6668
const unsigned int _WINDOW_MIN_SIZE = 60; // seconds

src/plugins/output/json/src/Storage.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
149149
{
150150
// Process all data records
151151
const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
152+
bool flush = false;
153+
int ret = IPX_OK;
152154

153155
// Message header
154156
auto hdr = (fds_ipfix_msg_hdr*) ipx_msg_ipfix_get_packet(msg);
@@ -162,13 +164,16 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
162164
continue;
163165
}
164166

167+
flush = true;
168+
165169
// Convert the record
166170
convert(ipfix_rec->rec, iemgr, hdr, false);
167171

168172
// Store it
169173
for (Output *output : m_outputs) {
170174
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
171-
return IPX_ERR_DENIED;
175+
ret = IPX_ERR_DENIED;
176+
goto endloop;
172177
}
173178
}
174179

@@ -183,12 +188,20 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
183188
// Store it
184189
for (Output *output : m_outputs) {
185190
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
186-
return IPX_ERR_DENIED;
191+
ret = IPX_ERR_DENIED;
192+
goto endloop;
187193
}
188194
}
189195
}
190196

191-
return IPX_OK;
197+
endloop:
198+
if (flush) {
199+
for (Output *output : m_outputs) {
200+
output->flush();
201+
}
202+
}
203+
204+
return ret;
192205
}
193206

194207
/**

src/plugins/output/json/src/Storage.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class Output {
7474
*/
7575
virtual int
7676
process(const char *str, size_t len) = 0;
77+
78+
virtual void
79+
flush() {};
7780
};
7881

7982
/** JSON converter and output manager */

0 commit comments

Comments
 (0)