Skip to content

Commit 1d91313

Browse files
Leonardo Alminanaedsiper
authored andcommitted
core: introduced batch flush result processing
Additionally, when shutting downflb_input_exit_all was moved to be the last step to prevent it from unmapping chunks that are still in use until flb_output_exit is executed. Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 7432bec commit 1d91313

File tree

2 files changed

+54
-14
lines changed

2 files changed

+54
-14
lines changed

include/fluent-bit/flb_engine.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
#include <fluent-bit/flb_output.h>
2727
#include <fluent-bit/flb_thread_storage.h>
2828

29+
#define FLB_ENGINE_OUTPUT_EVENT_BATCH_SIZE 1
30+
2931
int flb_engine_start(struct flb_config *config);
3032
int flb_engine_failed(struct flb_config *config);
3133
int flb_engine_flush(struct flb_config *config,

src/flb_engine.c

Lines changed: 52 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
* limitations under the License.
1818
*/
1919

20+
#include <math.h>
2021
#include <stdio.h>
2122
#include <stdlib.h>
2223

@@ -206,29 +207,22 @@ static inline int handle_input_event(flb_pipefd_t fd, uint64_t ts,
206207
return 0;
207208
}
208209

209-
static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
210-
struct flb_config *config)
210+
static inline int handle_output_event(uint64_t ts,
211+
struct flb_config *config,
212+
uint64_t val)
211213
{
212214
int ret;
213-
int bytes;
214215
int task_id;
215216
int out_id;
216217
int retries;
217218
int retry_seconds;
218219
uint32_t type;
219220
uint32_t key;
220-
uint64_t val;
221221
char *name;
222222
struct flb_task *task;
223223
struct flb_task_retry *retry;
224224
struct flb_output_instance *ins;
225225

226-
bytes = flb_pipe_r(fd, &val, sizeof(val));
227-
if (bytes == -1) {
228-
flb_errno();
229-
return -1;
230-
}
231-
232226
/* Get type and key */
233227
type = FLB_BITS_U64_HIGH(val);
234228
key = FLB_BITS_U64_LOW(val);
@@ -441,6 +435,52 @@ static inline int handle_output_event(flb_pipefd_t fd, uint64_t ts,
441435
return 0;
442436
}
443437

438+
static inline int handle_output_events(flb_pipefd_t fd,
439+
struct flb_config *config)
440+
{
441+
uint64_t values[FLB_ENGINE_OUTPUT_EVENT_BATCH_SIZE];
442+
int result;
443+
int bytes;
444+
size_t limit;
445+
size_t index;
446+
uint64_t ts;
447+
448+
memset(&values, 0, sizeof(values));
449+
450+
bytes = flb_pipe_r(fd, &values, sizeof(values));
451+
452+
if (bytes == -1) {
453+
flb_errno();
454+
return -1;
455+
}
456+
457+
limit = floor(bytes / sizeof(uint64_t));
458+
459+
ts = cfl_time_now();
460+
461+
for (index = 0 ;
462+
index < limit &&
463+
index < (sizeof(values) / sizeof(values[0])) ;
464+
index++) {
465+
if (values[index] == 0) {
466+
break;
467+
}
468+
469+
result = handle_output_event(ts, config, values[index]);
470+
}
471+
472+
/* This is wrong, in one hand, if handle_output_event_ fails we should
473+
* stop, on the other, we have already consumed the signals from the pipe
474+
* so we have to do whatever we can with them.
475+
*
476+
* And a side effect is that since we have N results but we are not aborting
477+
* as soon as we get an error there could be N results to this function which
478+
* not only are we not ready to handle but is not even checked at the moment.
479+
*/
480+
481+
return result;
482+
}
483+
444484
static inline int flb_engine_manager(flb_pipefd_t fd, struct flb_config *config)
445485
{
446486
int bytes;
@@ -982,13 +1022,11 @@ int flb_engine_start(struct flb_config *config)
9821022
}
9831023
}
9841024
else if (event->type == FLB_ENGINE_EV_OUTPUT) {
985-
ts = cfl_time_now();
986-
9871025
/*
9881026
* Event originated by an output plugin. likely a Task return
9891027
* status.
9901028
*/
991-
handle_output_event(event->fd, ts, config);
1029+
handle_output_events(event->fd, config);
9921030
}
9931031
else if (event->type == FLB_ENGINE_EV_INPUT) {
9941032
ts = cfl_time_now();
@@ -1042,10 +1080,10 @@ int flb_engine_shutdown(struct flb_config *config)
10421080
flb_router_exit(config);
10431081

10441082
/* cleanup plugins */
1045-
flb_input_exit_all(config);
10461083
flb_filter_exit(config);
10471084
flb_output_exit(config);
10481085
flb_custom_exit(config);
1086+
flb_input_exit_all(config);
10491087

10501088
/* Destroy the storage context */
10511089
flb_storage_destroy(config);

0 commit comments

Comments
 (0)