Skip to content

Commit 8ae604e

Browse files
danlenaredsiper
authored andcommitted
input: send resume signal to the input thread event loop if plugin is threaded
Signed-off-by: Daniel Lenar <[email protected]>
1 parent 6fd4a75 commit 8ae604e

File tree

4 files changed

+40
-3
lines changed

4 files changed

+40
-3
lines changed

include/fluent-bit/flb_input_thread.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,7 @@ int flb_input_thread_instance_init(struct flb_config *config,
9393
int flb_input_thread_instance_pre_run(struct flb_config *config, struct flb_input_instance *ins);
9494

9595
int flb_input_thread_instance_pause(struct flb_input_instance *ins);
96+
int flb_input_thread_instance_resume(struct flb_input_instance *ins);
9697
int flb_input_thread_instance_exit(struct flb_input_instance *ins);
9798

9899
int flb_input_thread_collectors_signal_start(struct flb_input_instance *ins);

src/flb_input.c

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1695,7 +1695,13 @@ int flb_input_pause(struct flb_input_instance *ins)
16951695
int flb_input_resume(struct flb_input_instance *ins)
16961696
{
16971697
if (ins->p->cb_resume) {
1698-
ins->p->cb_resume(ins->context, ins->config);
1698+
if (flb_input_is_threaded(ins)) {
1699+
/* signal the thread event loop about the 'resume' operation */
1700+
flb_input_thread_instance_resume(ins);
1701+
}
1702+
else {
1703+
ins->p->cb_resume(ins->context, ins->config);
1704+
}
16991705
}
17001706

17011707
return 0;

src/flb_input_chunk.c

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1185,7 +1185,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
11851185
in->mem_buf_status == FLB_INPUT_PAUSED) {
11861186
in->mem_buf_status = FLB_INPUT_RUNNING;
11871187
if (in->p->cb_resume) {
1188-
in->p->cb_resume(in->context, in->config);
1188+
flb_input_resume(in);
11891189
flb_info("[input] %s resume (mem buf overlimit)",
11901190
in->name);
11911191
}
@@ -1196,7 +1196,7 @@ size_t flb_input_chunk_set_limits(struct flb_input_instance *in)
11961196
in->storage_buf_status == FLB_INPUT_PAUSED) {
11971197
in->storage_buf_status = FLB_INPUT_RUNNING;
11981198
if (in->p->cb_resume) {
1199-
in->p->cb_resume(in->context, in->config);
1199+
flb_input_resume(in);
12001200
flb_info("[input] %s resume (storage buf overlimit %zu/%zu)",
12011201
in->name,
12021202
((struct flb_storage_input *)in->storage)->cio->total_chunks_up,

src/flb_input_thread.c

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,11 @@ static inline int handle_input_event(flb_pipefd_t fd, struct flb_input_instance
7878
ins->p->cb_pause(ins->context, ins->config);
7979
}
8080
}
81+
else if (operation == FLB_INPUT_THREAD_RESUME) {
82+
if (ins->p->cb_resume) {
83+
ins->p->cb_resume(ins->context, ins->config);
84+
}
85+
}
8186
else if (operation == FLB_INPUT_THREAD_EXIT) {
8287
return FLB_INPUT_THREAD_EXIT;
8388
}
@@ -482,6 +487,31 @@ int flb_input_thread_instance_pause(struct flb_input_instance *ins)
482487
return 0;
483488
}
484489

490+
/*
491+
* Signal the thread event loop to resume the running plugin instance. This function
492+
* must be called only from the main thread/pipeline.
493+
*/
494+
int flb_input_thread_instance_resume(struct flb_input_instance *ins)
495+
{
496+
int ret;
497+
uint64_t val;
498+
struct flb_input_thread_instance *thi = ins->thi;
499+
500+
flb_plg_debug(ins, "thread resume instance");
501+
502+
/* compose message to resume the thread */
503+
val = FLB_BITS_U64_SET(FLB_INPUT_THREAD_TO_THREAD,
504+
FLB_INPUT_THREAD_RESUME);
505+
506+
ret = flb_pipe_w(thi->ch_parent_events[1], &val, sizeof(val));
507+
if (ret <= 0) {
508+
flb_errno();
509+
return -1;
510+
}
511+
512+
return 0;
513+
}
514+
485515
int flb_input_thread_instance_exit(struct flb_input_instance *ins)
486516
{
487517
int ret;

0 commit comments

Comments
 (0)