Skip to content

Commit c9d1473

Browse files
committed
JSON output: use read/write locks and a single file descriptor for file output.
This prevents file output from keeping file descriptor open indefinitely if no new records are received.
1 parent fcfe5f8 commit c9d1473

File tree

2 files changed

+27
-49
lines changed

2 files changed

+27
-49
lines changed

src/plugins/output/json/src/File.cpp

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,7 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
6262
{
6363
// Prepare a configuration of the thread for changing time windows
6464
_thread = new thread_ctx_t;
65-
_thread->new_file = nullptr;
66-
_thread->new_file_ready = false;
65+
_thread->file = nullptr;
6766
_thread->stop = false;
6867

6968
_thread->ctx = ctx;
@@ -96,17 +95,17 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
9695
throw std::runtime_error("(File output) Failed to create a time window file.");
9796
}
9897

99-
_file = new_file;
98+
_thread->file = new_file;
10099

101-
if (pthread_mutex_init(&_thread->mutex, NULL) != 0) {
102-
fclose(_file);
100+
if (pthread_rwlock_init(&_thread->rwlock, NULL) != 0) {
101+
fclose(_thread->file);
103102
delete _thread;
104103
throw std::runtime_error("(File output) Mutex initialization failed!");
105104
}
106105

107106
if (pthread_create(&_thread->thread, NULL, &File::thread_window, _thread) != 0) {
108-
fclose(_file);
109-
pthread_mutex_destroy(&_thread->mutex);
107+
fclose(_thread->file);
108+
pthread_rwlock_destroy(&_thread->rwlock);
110109
delete _thread;
111110
throw std::runtime_error("(File output) Failed to start a thread for changing time "
112111
"windows.");
@@ -120,17 +119,13 @@ File::File(const struct cfg_file &cfg, ipx_ctx_t *ctx) : Output(cfg.name, ctx)
120119
*/
121120
File::~File()
122121
{
123-
if (_file) {
124-
fclose(_file);
125-
}
126-
127122
if (_thread) {
128123
_thread->stop = true;
129124
pthread_join(_thread->thread, NULL);
130-
pthread_mutex_destroy(&_thread->mutex);
125+
pthread_rwlock_destroy(&_thread->rwlock);
131126

132-
if (_thread->new_file) {
133-
fclose(_thread->new_file);
127+
if (_thread->file) {
128+
fclose(_thread->file);
134129
}
135130

136131
delete _thread;
@@ -164,10 +159,10 @@ File::thread_window(void *context)
164159
}
165160

166161
// New time window
167-
pthread_mutex_lock(&data->mutex);
168-
if (data->new_file) {
169-
fclose(data->new_file);
170-
data->new_file = NULL;
162+
pthread_rwlock_wrlock(&data->rwlock);
163+
if (data->file) {
164+
fclose(data->file);
165+
data->file = nullptr;
171166
}
172167

173168
data->window_time += data->window_size;
@@ -177,9 +172,8 @@ File::thread_window(void *context)
177172
}
178173

179174
// Null pointer is also valid...
180-
data->new_file = file;
181-
data->new_file_ready = true;
182-
pthread_mutex_unlock(&data->mutex);
175+
data->file = file;
176+
pthread_rwlock_unlock(&data->rwlock);
183177
}
184178

185179
IPX_CTX_DEBUG(data->ctx, "(File output) Thread terminated.", '\0');
@@ -196,36 +190,23 @@ File::thread_window(void *context)
196190
int
197191
File::process(const char *str, size_t len)
198192
{
199-
// Should we change a time window
200-
if (_thread->new_file_ready) {
201-
// Close old time window
202-
if (_file) {
203-
fclose(_file);
204-
}
205-
206-
// Get new time window
207-
pthread_mutex_lock(&_thread->mutex);
208-
_file = _thread->new_file;
209-
_thread->new_file = nullptr;
210-
_thread->new_file_ready = false;
211-
pthread_mutex_unlock(&_thread->mutex);
193+
pthread_rwlock_rdlock(&_thread->rwlock);
194+
if (_thread->file) {
195+
// Store the record
196+
fwrite(str, len, 1, _thread->file);
212197
}
213-
214-
if (!_file) {
215-
return IPX_OK;
216-
}
217-
218-
// Store the record
219-
fwrite(str, len, 1, _file);
198+
pthread_rwlock_unlock(&_thread->rwlock);
220199
return IPX_OK;
221200
}
222201

223202
void
224-
File::flush(void)
203+
File::flush()
225204
{
226-
if (_file) {
227-
fflush(_file);
205+
pthread_rwlock_rdlock(&_thread->rwlock);
206+
if (_thread->file) {
207+
fflush(_thread->file);
228208
}
209+
pthread_rwlock_unlock(&_thread->rwlock);
229210
}
230211

231212
/**

src/plugins/output/json/src/File.hpp

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,20 +71,17 @@ class File : public Output {
7171
typedef struct thread_ctx_s {
7272
ipx_ctx_t *ctx; /**< Plugin instance context */
7373
pthread_t thread; /**< Thread */
74-
pthread_mutex_t mutex; /**< Data mutex */
74+
pthread_rwlock_t rwlock; /**< Data rwlock */
7575
bool stop; /**< Stop flag for termination */
7676

7777
unsigned int window_size; /**< Size of a time window */
7878
time_t window_time; /**< Current time window */
7979
std::string storage_path; /**< Storage path (template) */
8080
std::string file_prefix; /**< File prefix */
8181

82-
FILE *new_file; /**< New file */
83-
bool new_file_ready; /**< New file flag */
82+
FILE *file; /**< File descriptor */
8483
} thread_ctx_t;
8584

86-
/** File descriptor */
87-
FILE *_file;
8885
/** Thread for changing time windows */
8986
thread_ctx_t *_thread;
9087

0 commit comments

Comments
 (0)