Skip to content

Commit 98a5660

Browse files
committed
task: restore direct routes using label-based matching
Update task creation to use label-based route collection for restoring direct routes from chunk metadata. Falls back to configured routes if stored routes cannot be fully resolved. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 2869b11 commit 98a5660

File tree

1 file changed

+147
-35
lines changed

1 file changed

+147
-35
lines changed

src/flb_task.c

Lines changed: 147 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -73,28 +73,31 @@ static inline void map_free_task_id(int id, struct flb_config *config)
7373
config->task_map[id].task = NULL;
7474
}
7575

76-
static struct flb_output_instance *task_find_output_reference(struct flb_config *config,
77-
const struct flb_chunk_direct_route *route)
76+
static int task_collect_output_references(struct flb_config *config,
77+
const struct flb_chunk_direct_route *route,
78+
struct flb_output_instance ***out_matches,
79+
size_t *out_count)
7880
{
81+
size_t index;
82+
size_t count;
7983
int alias_length;
8084
int label_length;
8185
int name_length;
8286
const char *label;
8387
uint32_t stored_id;
8488
struct mk_list *head;
8589
struct flb_output_instance *o_ins;
90+
struct flb_output_instance **matches;
8691

87-
alias_length = 0;
88-
label_length = 0;
89-
name_length = 0;
90-
label = NULL;
91-
stored_id = 0;
92-
93-
if (!config || !route) {
94-
return NULL;
92+
if (!config || !route || !out_matches || !out_count) {
93+
return -1;
9594
}
9695

96+
*out_matches = NULL;
97+
*out_count = 0;
98+
9799
label = route->label;
100+
label_length = 0;
98101
stored_id = route->id;
99102
if (label != NULL) {
100103
label_length = route->label_length;
@@ -103,14 +106,16 @@ static struct flb_output_instance *task_find_output_reference(struct flb_config
103106
}
104107
}
105108

109+
count = 0;
106110
if (label != NULL && label_length > 0) {
107111
mk_list_foreach(head, &config->outputs) {
108112
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
109113
if (o_ins->alias != NULL) {
110114
alias_length = (int) strlen(o_ins->alias);
111115
if (alias_length == label_length &&
112-
strncmp(o_ins->alias, label, (size_t) label_length) == 0) {
113-
return o_ins;
116+
strncmp(o_ins->alias, label, (size_t) label_length) == 0 &&
117+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
118+
count++;
114119
}
115120
}
116121
}
@@ -119,20 +124,88 @@ static struct flb_output_instance *task_find_output_reference(struct flb_config
119124
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
120125
name_length = (int) strlen(o_ins->name);
121126
if (name_length == label_length &&
122-
strncmp(o_ins->name, label, (size_t) label_length) == 0) {
123-
return o_ins;
127+
strncmp(o_ins->name, label, (size_t) label_length) == 0 &&
128+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
129+
if (o_ins->alias != NULL) {
130+
alias_length = (int) strlen(o_ins->alias);
131+
if (alias_length == label_length &&
132+
strncmp(o_ins->alias, label, (size_t) label_length) == 0) {
133+
continue;
134+
}
135+
}
136+
count++;
124137
}
125138
}
139+
140+
if (count == 0) {
141+
return 0;
142+
}
126143
}
144+
else {
145+
mk_list_foreach(head, &config->outputs) {
146+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
147+
if ((uint32_t) o_ins->id == stored_id &&
148+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
149+
count++;
150+
}
151+
}
127152

128-
mk_list_foreach(head, &config->outputs) {
129-
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
130-
if ((uint32_t) o_ins->id == stored_id) {
131-
return o_ins;
153+
if (count == 0) {
154+
return 0;
132155
}
133156
}
134157

135-
return NULL;
158+
matches = flb_calloc(count, sizeof(struct flb_output_instance *));
159+
if (!matches) {
160+
flb_errno();
161+
return -1;
162+
}
163+
164+
index = 0;
165+
if (label != NULL && label_length > 0) {
166+
mk_list_foreach(head, &config->outputs) {
167+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
168+
if (o_ins->alias != NULL) {
169+
alias_length = (int) strlen(o_ins->alias);
170+
if (alias_length == label_length &&
171+
strncmp(o_ins->alias, label, (size_t) label_length) == 0 &&
172+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
173+
matches[index++] = o_ins;
174+
}
175+
}
176+
}
177+
178+
mk_list_foreach(head, &config->outputs) {
179+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
180+
name_length = (int) strlen(o_ins->name);
181+
if (name_length == label_length &&
182+
strncmp(o_ins->name, label, (size_t) label_length) == 0 &&
183+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
184+
if (o_ins->alias != NULL) {
185+
alias_length = (int) strlen(o_ins->alias);
186+
if (alias_length == label_length &&
187+
strncmp(o_ins->alias, label, (size_t) label_length) == 0) {
188+
continue;
189+
}
190+
}
191+
matches[index++] = o_ins;
192+
}
193+
}
194+
}
195+
else {
196+
mk_list_foreach(head, &config->outputs) {
197+
o_ins = mk_list_entry(head, struct flb_output_instance, _head);
198+
if ((uint32_t) o_ins->id == stored_id &&
199+
flb_chunk_route_plugin_matches(o_ins, route) == FLB_TRUE) {
200+
matches[index++] = o_ins;
201+
}
202+
}
203+
}
204+
205+
*out_matches = matches;
206+
*out_count = index;
207+
208+
return 0;
136209
}
137210

138211
void flb_task_retry_destroy(struct flb_task_retry *retry)
@@ -426,6 +499,7 @@ struct flb_task *flb_task_create(uint64_t ref_id,
426499
int total_events = 0;
427500
int direct_count = 0;
428501
int stored_routes_result = 0;
502+
int ret = 0;
429503
int stored_routes_used = FLB_FALSE;
430504
int stored_routes_valid = FLB_TRUE;
431505
int stored_routes_alloc_failed = FLB_FALSE;
@@ -434,12 +508,14 @@ struct flb_task *flb_task_create(uint64_t ref_id,
434508
uint32_t missing_output_id = 0;
435509
uint16_t missing_output_label_length = 0;
436510
const char *missing_output_label;
511+
struct flb_output_instance **stored_matches;
512+
size_t stored_match_count;
513+
size_t stored_match_index;
437514
struct flb_task *task;
438515
struct flb_event_chunk *evc;
439516
struct flb_task_route *route;
440517
struct flb_router_path *route_path;
441518
struct flb_output_instance *o_ins;
442-
struct flb_output_instance *stored_output;
443519
struct flb_input_chunk *task_ic;
444520
struct cfl_list *i_head;
445521
struct mk_list *o_head;
@@ -514,15 +590,34 @@ struct flb_task *flb_task_create(uint64_t ref_id,
514590
for (direct_output_index = 0;
515591
direct_output_index < direct_output_count;
516592
direct_output_index++) {
517-
stored_output = task_find_output_reference(config,
518-
&direct_routes[direct_output_index]);
519-
if (!stored_output) {
593+
stored_matches = NULL;
594+
stored_match_count = 0;
595+
ret = task_collect_output_references(config,
596+
&direct_routes[direct_output_index],
597+
&stored_matches,
598+
&stored_match_count);
599+
if (ret == -1) {
600+
flb_error("[task] failed collecting restored routes for chunk %s",
601+
flb_input_chunk_get_name(task_ic));
602+
}
603+
604+
if (ret != 0 || stored_match_count == 0) {
520605
stored_routes_valid = FLB_FALSE;
521606
missing_output_id = direct_routes[direct_output_index].id;
522607
missing_output_label = direct_routes[direct_output_index].label;
523608
missing_output_label_length = direct_routes[direct_output_index].label_length;
609+
if (missing_output_label_length == 0 && missing_output_label != NULL) {
610+
missing_output_label_length = (uint16_t) strlen(missing_output_label);
611+
}
612+
if (stored_matches != NULL) {
613+
flb_free(stored_matches);
614+
}
524615
break;
525616
}
617+
618+
if (stored_matches != NULL) {
619+
flb_free(stored_matches);
620+
}
526621
}
527622

528623
if (stored_routes_valid == FLB_TRUE) {
@@ -531,23 +626,40 @@ struct flb_task *flb_task_create(uint64_t ref_id,
531626
for (direct_output_index = 0;
532627
direct_output_index < direct_output_count;
533628
direct_output_index++) {
534-
stored_output = task_find_output_reference(config,
535-
&direct_routes[direct_output_index]);
536-
if (!stored_output) {
629+
stored_matches = NULL;
630+
stored_match_count = 0;
631+
ret = task_collect_output_references(config,
632+
&direct_routes[direct_output_index],
633+
&stored_matches,
634+
&stored_match_count);
635+
if (ret != 0 || stored_match_count == 0 || stored_matches == NULL) {
636+
if (stored_matches != NULL) {
637+
flb_free(stored_matches);
638+
}
537639
continue;
538640
}
539641

540-
route = flb_calloc(1, sizeof(struct flb_task_route));
541-
if (!route) {
542-
flb_errno();
543-
stored_routes_alloc_failed = FLB_TRUE;
544-
break;
642+
for (stored_match_index = 0;
643+
stored_match_index < stored_match_count;
644+
stored_match_index++) {
645+
route = flb_calloc(1, sizeof(struct flb_task_route));
646+
if (!route) {
647+
flb_errno();
648+
stored_routes_alloc_failed = FLB_TRUE;
649+
break;
650+
}
651+
652+
route->status = FLB_TASK_ROUTE_INACTIVE;
653+
route->out = stored_matches[stored_match_index];
654+
mk_list_add(&route->_head, &task->routes);
655+
direct_count++;
545656
}
546657

547-
route->status = FLB_TASK_ROUTE_INACTIVE;
548-
route->out = stored_output;
549-
mk_list_add(&route->_head, &task->routes);
550-
direct_count++;
658+
flb_free(stored_matches);
659+
660+
if (stored_routes_alloc_failed == FLB_TRUE) {
661+
break;
662+
}
551663
}
552664

553665
if (stored_routes_alloc_failed == FLB_TRUE) {

0 commit comments

Comments
 (0)