Skip to content

Commit 5c3dcfe

Browse files
Leonardo Alminanaedsiper
authored andcommitted
output: added a flush abortion mechanism
This change introduces an additional check in output_pre_cb_flush to ensure that chunks can be dropped if they were scheduled to be flushed but the flush coroutine has not entered the plugins entry point yet. Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 488424a commit 5c3dcfe

File tree

1 file changed

+31
-0
lines changed

1 file changed

+31
-0
lines changed

include/fluent-bit/flb_output.h

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,12 @@ struct flb_out_flush_params {
498498

499499
extern FLB_TLS_DEFINE(struct flb_out_flush_params, out_flush_params);
500500

501+
#define FLB_OUTPUT_RETURN(x) \
502+
flb_output_return_do(x); \
503+
return
504+
505+
static inline void flb_output_return_do(int x);
506+
501507
static FLB_INLINE void output_params_set(struct flb_output_flush *out_flush,
502508
struct flb_coro *coro,
503509
struct flb_task *task,
@@ -537,6 +543,7 @@ static FLB_INLINE void output_params_set(struct flb_output_flush *out_flush,
537543

538544
static FLB_INLINE void output_pre_cb_flush(void)
539545
{
546+
int route_status;
540547
struct flb_coro *coro;
541548
struct flb_output_plugin *out_p;
542549
struct flb_out_flush_params *params;
@@ -557,11 +564,29 @@ static FLB_INLINE void output_pre_cb_flush(void)
557564
*/
558565
coro = params->coro;
559566
persisted_params = *params;
567+
560568
co_switch(coro->caller);
561569

562570
/* Continue, we will resume later */
563571
out_p = persisted_params.out_plugin;
564572

573+
flb_task_acquire_lock(persisted_params.out_flush->task);
574+
575+
route_status = flb_task_get_route_status(
576+
persisted_params.out_flush->task,
577+
persisted_params.out_flush->o_ins);
578+
579+
if (route_status == FLB_TASK_ROUTE_DROPPED) {
580+
flb_task_release_lock(persisted_params.out_flush->task);
581+
582+
FLB_OUTPUT_RETURN(FLB_ERROR);
583+
}
584+
585+
flb_task_activate_route(persisted_params.out_flush->task,
586+
persisted_params.out_flush->o_ins);
587+
588+
flb_task_release_lock(persisted_params.out_flush->task);
589+
565590
out_p->cb_flush(persisted_params.event_chunk,
566591
persisted_params.out_flush,
567592
persisted_params.i_ins,
@@ -909,6 +934,12 @@ static inline void flb_output_return(int ret, struct flb_coro *co) {
909934
o_ins = out_flush->o_ins;
910935
task = out_flush->task;
911936

937+
flb_task_acquire_lock(task);
938+
939+
flb_task_deactivate_route(task, o_ins);
940+
941+
flb_task_release_lock(task);
942+
912943
if (out_flush->processed_event_chunk) {
913944
if (task->event_chunk->data != out_flush->processed_event_chunk->data) {
914945
flb_free(out_flush->processed_event_chunk->data);

0 commit comments

Comments
 (0)