Skip to content

Commit 2f7c94f

Browse files
authored
Merge pull request #5 from norrisjeremy/devel
JSON output: flush outputs to allow for more timely appearance of rec…
2 parents 737a1f4 + e374ae9 commit 2f7c94f

File tree

5 files changed

+74
-52
lines changed

5 files changed

+74
-52
lines changed

src/plugins/output/json/src/File.cpp

Lines changed: 46 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
6262
{
6363
// Prepare a configuration of the thread for changing time windows
6464
_thread = new thread_ctx_t;
65-
_thread->new_file = nullptr;
66-
_thread->new_file_ready = false;
65+
_thread->file = nullptr;
6766
_thread->stop = false;
6867

6968
_thread->ctx = ctx;
@@ -96,17 +95,33 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
9695
throw std::runtime_error("(File output) Failed to create a time window file.");
9796
}
9897

99-
_file = new_file;
98+
_thread->file = new_file;
10099

101-
if (pthread_mutex_init(&_thread->mutex, NULL) != 0) {
102-
fclose(_file);
100+
pthread_rwlockattr_t attr;
101+
if (pthread_rwlockattr_init(&attr) != 0) {
102+
fclose(_thread->file);
103103
delete _thread;
104-
throw std::runtime_error("(File output) Mutex initialization failed!");
104+
throw std::runtime_error("(File output) Rwlockattr initialization failed!");
105105
}
106106

107+
if (pthread_rwlockattr_setkind_np(&attr, PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) != 0) {
108+
fclose(_thread->file);
109+
pthread_rwlockattr_destroy(&attr);
110+
delete _thread;
111+
throw std::runtime_error("(File output) Rwlockattr setkind failed!");
112+
}
113+
114+
if (pthread_rwlock_init(&_thread->rwlock, &attr) != 0) {
115+
fclose(_thread->file);
116+
pthread_rwlockattr_destroy(&attr);
117+
delete _thread;
118+
throw std::runtime_error("(File output) Rwlock initialization failed!");
119+
}
120+
121+
pthread_rwlockattr_destroy(&attr);
107122
if (pthread_create(&_thread->thread, NULL, &File::thread_window, _thread) != 0) {
108-
fclose(_file);
109-
pthread_mutex_destroy(&_thread->mutex);
123+
fclose(_thread->file);
124+
pthread_rwlock_destroy(&_thread->rwlock);
110125
delete _thread;
111126
throw std::runtime_error("(File output) Failed to start a thread for changing time "
112127
"windows.");
@@ -120,17 +135,13 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
120135
*/
121136
File::~File()
122137
{
123-
if (_file) {
124-
fclose(_file);
125-
}
126-
127138
if (_thread) {
128139
_thread->stop = true;
129140
pthread_join(_thread->thread, NULL);
130-
pthread_mutex_destroy(&_thread->mutex);
141+
pthread_rwlock_destroy(&_thread->rwlock);
131142

132-
if (_thread->new_file) {
133-
fclose(_thread->new_file);
143+
if (_thread->file) {
144+
fclose(_thread->file);
134145
}
135146

136147
delete _thread;
@@ -164,10 +175,10 @@ File::thread_window(void *context)
164175
}
165176

166177
// New time window
167-
pthread_mutex_lock(&data->mutex);
168-
if (data->new_file) {
169-
fclose(data->new_file);
170-
data->new_file = NULL;
178+
pthread_rwlock_wrlock(&data->rwlock);
179+
if (data->file) {
180+
fclose(data->file);
181+
data->file = nullptr;
171182
}
172183

173184
data->window_time += data->window_size;
@@ -177,9 +188,8 @@ File::thread_window(void *context)
177188
}
178189

179190
// Null pointer is also valid...
180-
data->new_file = file;
181-
data->new_file_ready = true;
182-
pthread_mutex_unlock(&data->mutex);
191+
data->file = file;
192+
pthread_rwlock_unlock(&data->rwlock);
183193
}
184194

185195
IPX_CTX_DEBUG(data->ctx, "(File output) Thread terminated.", '\0');
@@ -196,28 +206,23 @@ File::thread_window(void *context)
196206
int
197207
File::process(const char *str, size_t len)
198208
{
199-
// Should we change a time window
200-
if (_thread->new_file_ready) {
201-
// Close old time window
202-
if (_file) {
203-
fclose(_file);
204-
}
205-
206-
// Get new time window
207-
pthread_mutex_lock(&_thread->mutex);
208-
_file = _thread->new_file;
209-
_thread->new_file = nullptr;
210-
_thread->new_file_ready = false;
211-
pthread_mutex_unlock(&_thread->mutex);
209+
pthread_rwlock_rdlock(&_thread->rwlock);
210+
if (_thread->file) {
211+
// Store the record
212+
fwrite(str, len, 1, _thread->file);
212213
}
214+
pthread_rwlock_unlock(&_thread->rwlock);
215+
return IPX_OK;
216+
}
213217

214-
if (!_file) {
215-
return IPX_OK;
218+
void
219+
File::flush()
220+
{
221+
pthread_rwlock_rdlock(&_thread->rwlock);
222+
if (_thread->file) {
223+
fflush(_thread->file);
216224
}
217-
218-
// Store the record
219-
fwrite(str, len, 1, _file);
220-
return IPX_OK;
225+
pthread_rwlock_unlock(&_thread->rwlock);
221226
}
222227

223228
/**

src/plugins/output/json/src/File.hpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#ifndef JSON_FILE_H
4343
#define JSON_FILE_H
4444

45+
#include <atomic>
4546
#include <string>
4647
#include <ctime>
4748
#include <cstdio>
@@ -61,6 +62,8 @@ class File : public Output {
6162

6263
// Store a record to the file
6364
int process(const char *str, size_t len);
65+
66+
void flush();
6467
private:
6568
/** Minimal window size */
6669
const unsigned int _WINDOW_MIN_SIZE = 60; // seconds
@@ -69,20 +72,17 @@ class File : public Output {
6972
typedef struct thread_ctx_s {
7073
ipx_ctx_t *ctx; /**< Plugin instance context */
7174
pthread_t thread; /**< Thread */
72-
pthread_mutex_t mutex; /**< Data mutex */
73-
bool stop; /**< Stop flag for termination */
75+
pthread_rwlock_t rwlock; /**< Data rwlock */
76+
std::atomic<bool> stop; /**< Stop flag for termination */
7477

7578
unsigned int window_size; /**< Size of a time window */
7679
time_t window_time; /**< Current time window */
7780
std::string storage_path; /**< Storage path (template) */
7881
std::string file_prefix; /**< File prefix */
7982

80-
FILE *new_file; /**< New file */
81-
bool new_file_ready; /**< New file flag */
83+
FILE *file; /**< File descriptor */
8284
} thread_ctx_t;
8385

84-
/** File descriptor */
85-
FILE *_file;
8686
/** Thread for changing time windows */
8787
thread_ctx_t *_thread;
8888

src/plugins/output/json/src/Server.hpp

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
#ifndef JSON_SERVER_H
4141
#define JSON_SERVER_H
4242

43+
#include <atomic>
4344
#include <string>
4445
#include <vector>
4546
#include <pthread.h>
@@ -78,10 +79,10 @@ class Server : public Output
7879
ipx_ctx_t *ctx; /**< Instance context (for log only ) */
7980
pthread_t thread; /**< Thread */
8081
pthread_mutex_t mutex; /**< Mutex for the array */
81-
bool stop; /**< Stop flag for terminating */
82+
std::atomic<bool> stop; /**< Stop flag for terminating */
8283

8384
int socket_fd; /**< Server socket */
84-
bool new_clients_ready; /**< New clients flag */
85+
std::atomic<bool> new_clients_ready; /**< New clients flag */
8586
std::vector<client_t> new_clients; /**< Array of new clients */
8687
} acceptor_t;
8788

src/plugins/output/json/src/Storage.cpp

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,8 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
149149
{
150150
// Process all data records
151151
const uint32_t rec_cnt = ipx_msg_ipfix_get_drec_cnt(msg);
152+
bool flush = false;
153+
int ret = IPX_OK;
152154

153155
// Message header
154156
auto hdr = (fds_ipfix_msg_hdr*) ipx_msg_ipfix_get_packet(msg);
@@ -162,13 +164,16 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
162164
continue;
163165
}
164166

167+
flush = true;
168+
165169
// Convert the record
166170
convert(ipfix_rec->rec, iemgr, hdr, false);
167171

168172
// Store it
169173
for (Output *output : m_outputs) {
170174
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
171-
return IPX_ERR_DENIED;
175+
ret = IPX_ERR_DENIED;
176+
goto endloop;
172177
}
173178
}
174179

@@ -183,12 +188,20 @@ Storage::records_store(ipx_msg_ipfix_t *msg, const fds_iemgr_t *iemgr)
183188
// Store it
184189
for (Output *output : m_outputs) {
185190
if (output->process(m_record.buffer, m_record.size_used) != IPX_OK) {
186-
return IPX_ERR_DENIED;
191+
ret = IPX_ERR_DENIED;
192+
goto endloop;
187193
}
188194
}
189195
}
190196

191-
return IPX_OK;
197+
endloop:
198+
if (flush) {
199+
for (Output *output : m_outputs) {
200+
output->flush();
201+
}
202+
}
203+
204+
return ret;
192205
}
193206

194207
/**

src/plugins/output/json/src/Storage.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@ class Output {
7474
*/
7575
virtual int
7676
process(const char *str, size_t len) = 0;
77+
78+
virtual void
79+
flush() {};
7780
};
7881

7982
/** JSON converter and output manager */

0 commit comments

Comments
 (0)