Skip to content

Commit 886553c

Browse files
committed
task: restores direct routes from chunk metadata
Signed-off-by: Eduardo Silva <[email protected]>
1 parent fb1d52d commit 886553c

File tree

1 file changed

+178
-0
lines changed

1 file changed

+178
-0
lines changed

src/flb_task.c

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
#include <stdio.h>
2121
#include <stdlib.h>
22+
#include <stdint.h>
2223

2324
#include <fluent-bit/flb_info.h>
2425
#include <fluent-bit/flb_config.h>
@@ -30,6 +31,7 @@
3031
#include <fluent-bit/flb_mem.h>
3132
#include <fluent-bit/flb_str.h>
3233
#include <fluent-bit/flb_scheduler.h>
34+
#include <string.h>
3335

3436
/*
3537
* Every task created must have an unique ID, this function lookup the
@@ -71,6 +73,68 @@ static inline void map_free_task_id(int id, struct flb_config *config)
7173
config->task_map[id].task = NULL;
7274
}
7375

76+
static struct flb_output_instance *task_find_output_reference(struct flb_config *config,
77+
const struct flb_chunk_direct_route *route)
78+
{
79+
int alias_length;
80+
int label_length;
81+
int name_length;
82+
const char *label;
83+
uint32_t stored_id;
84+
struct mk_list *head;
85+
struct flb_output_instance *o_ins;
86+
87+
alias_length = 0;
88+
label_length = 0;
89+
name_length = 0;
90+
label = NULL;
91+
stored_id = 0;
92+
93+
if (!config || !route) {
94+
return NULL;
95+
}
96+
97+
label = route->label;
98+
stored_id = route->id;
99+
if (label != NULL) {
100+
label_length = route->label_length;
101+
if (label_length == 0) {
102+
label_length = (int) strlen(label);
103+
}
104+
}
105+
106+
if (label != NULL && label_length > 0) {
107+
mk_list_foreach(head, &config->outputs) {
108+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
109+
if (o_ins->alias != NULL) {
110+
alias_length = (int) strlen(o_ins->alias);
111+
if (alias_length == label_length &&
112+
strncmp(o_ins->alias, label, (size_t) label_length) == 0) {
113+
return o_ins;
114+
}
115+
}
116+
}
117+
118+
mk_list_foreach(head, &config->outputs) {
119+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
120+
name_length = (int) strlen(o_ins->name);
121+
if (name_length == label_length &&
122+
strncmp(o_ins->name, label, (size_t) label_length) == 0) {
123+
return o_ins;
124+
}
125+
}
126+
}
127+
128+
mk_list_foreach(head, &config->outputs) {
129+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
130+
if ((uint32_t) o_ins->id == stored_id) {
131+
return o_ins;
132+
}
133+
}
134+
135+
return NULL;
136+
}
137+
74138
void flb_task_retry_destroy(struct flb_task_retry *retry)
75139
{
76140
int ret;
@@ -361,16 +425,27 @@ struct flb_task *flb_task_create(uint64_t ref_id,
361425
int count = 0;
362426
int total_events = 0;
363427
int direct_count = 0;
428+
int stored_routes_result = 0;
429+
int stored_routes_used = FLB_FALSE;
430+
int stored_routes_valid = FLB_TRUE;
431+
int stored_routes_alloc_failed = FLB_FALSE;
432+
int direct_output_count = 0;
433+
int direct_output_index = 0;
434+
uint32_t missing_output_id = 0;
435+
uint16_t missing_output_label_length = 0;
436+
const char *missing_output_label;
364437
struct flb_task *task;
365438
struct flb_event_chunk *evc;
366439
struct flb_task_route *route;
367440
struct flb_router_path *route_path;
368441
struct flb_output_instance *o_ins;
442+
struct flb_output_instance *stored_output;
369443
struct flb_input_chunk *task_ic;
370444
struct cfl_list *i_head;
371445
struct mk_list *o_head;
372446
struct flb_router_chunk_context router_context;
373447
int router_context_initialized = FLB_FALSE;
448+
struct flb_chunk_direct_route *direct_routes;
374449

375450
/* No error status */
376451
*err = FLB_FALSE;
@@ -426,6 +501,109 @@ struct flb_task *flb_task_create(uint64_t ref_id,
426501
#endif
427502

428503
/* Direct connects betweek input <> outputs (API based) */
504+
direct_routes = NULL;
505+
missing_output_label = NULL;
506+
missing_output_label_length = 0;
507+
if (flb_input_chunk_has_direct_routes(task_ic) == FLB_TRUE) {
508+
stored_routes_result = flb_input_chunk_get_direct_routes(task_ic,
509+
&direct_routes,
510+
&direct_output_count);
511+
if (stored_routes_result == 0 && direct_output_count > 0) {
512+
stored_routes_valid = FLB_TRUE;
513+
missing_output_id = 0;
514+
for (direct_output_index = 0;
515+
direct_output_index < direct_output_count;
516+
direct_output_index++) {
517+
stored_output = task_find_output_reference(config,
518+
&direct_routes[direct_output_index]);
519+
if (!stored_output) {
520+
stored_routes_valid = FLB_FALSE;
521+
missing_output_id = direct_routes[direct_output_index].id;
522+
missing_output_label = direct_routes[direct_output_index].label;
523+
missing_output_label_length = direct_routes[direct_output_index].label_length;
524+
break;
525+
}
526+
}
527+
528+
if (stored_routes_valid == FLB_TRUE) {
529+
direct_count = 0;
530+
stored_routes_alloc_failed = FLB_FALSE;
531+
for (direct_output_index = 0;
532+
direct_output_index < direct_output_count;
533+
direct_output_index++) {
534+
stored_output = task_find_output_reference(config,
535+
&direct_routes[direct_output_index]);
536+
if (!stored_output) {
537+
continue;
538+
}
539+
540+
route = flb_calloc(1, sizeof(struct flb_task_route));
541+
if (!route) {
542+
flb_errno();
543+
stored_routes_alloc_failed = FLB_TRUE;
544+
break;
545+
}
546+
547+
route->status = FLB_TASK_ROUTE_INACTIVE;
548+
route->out = stored_output;
549+
mk_list_add(&route->_head, &task->routes);
550+
direct_count++;
551+
}
552+
553+
if (stored_routes_alloc_failed == FLB_TRUE) {
554+
if (router_context_initialized) {
555+
flb_router_chunk_context_destroy(&router_context);
556+
router_context_initialized = FLB_FALSE;
557+
}
558+
if (direct_routes) {
559+
flb_input_chunk_destroy_direct_routes(direct_routes,
560+
direct_output_count);
561+
}
562+
task->event_chunk->data = NULL;
563+
flb_task_destroy(task, FLB_TRUE);
564+
return NULL;
565+
}
566+
567+
if (direct_count > 0) {
568+
stored_routes_used = FLB_TRUE;
569+
}
570+
}
571+
else {
572+
flb_warn("[task] input=%s/%s stored direct route id=%u label=%.*s not found for chunk %s, falling back to configured routes",
573+
i_ins->p->name,
574+
flb_input_name(i_ins),
575+
(unsigned int) missing_output_id,
576+
(int) missing_output_label_length,
577+
missing_output_label ? missing_output_label : "",
578+
flb_input_chunk_get_name(task_ic));
579+
}
580+
}
581+
else if (stored_routes_result == -2) {
582+
flb_warn("[task] input=%s/%s invalid stored direct routing metadata for chunk %s, falling back to configured routes",
583+
i_ins->p->name,
584+
flb_input_name(i_ins),
585+
flb_input_chunk_get_name(task_ic));
586+
}
587+
}
588+
589+
if (stored_routes_used == FLB_TRUE) {
590+
if (direct_routes) {
591+
flb_input_chunk_destroy_direct_routes(direct_routes, direct_output_count);
592+
}
593+
flb_debug("[task] restored direct task=%p id=%i with %i route(s)",
594+
task, task->id, direct_count);
595+
if (router_context_initialized) {
596+
flb_router_chunk_context_destroy(&router_context);
597+
router_context_initialized = FLB_FALSE;
598+
}
599+
return task;
600+
}
601+
602+
if (direct_routes) {
603+
flb_input_chunk_destroy_direct_routes(direct_routes, direct_output_count);
604+
direct_routes = NULL;
605+
}
606+
429607
if (cfl_list_size(&i_ins->routes_direct) > 0) {
430608
direct_count = 0;
431609

0 commit comments

Comments
 (0)