Skip to content

Commit fb1d52d

Browse files
committed
input_log: write direct routes when creating chunks
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 1fa3d3a commit fb1d52d

File tree

1 file changed

+89
-0
lines changed

1 file changed

+89
-0
lines changed

src/flb_input_log.c

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,9 +102,15 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins,
102102
size_t out_size = 0;
103103
size_t chunk_size_sz = 0;
104104
ssize_t chunk_size;
105+
int direct_count;
106+
int direct_index;
107+
int write_ret;
105108
struct cfl_list *head;
106109
struct flb_input_chunk *chunk = NULL;
107110
struct flb_router_path *route_path;
111+
struct flb_chunk_direct_route *direct_routes;
112+
size_t label_length;
113+
const char *label_source;
108114

109115
if (!ins || !payload || !payload->tag || !payload->route) {
110116
return -1;
@@ -198,6 +204,89 @@ static int route_payload_apply_outputs(struct flb_input_instance *ins,
198204
}
199205
}
200206

207+
direct_routes = NULL;
208+
direct_count = 0;
209+
direct_index = 0;
210+
write_ret = 0;
211+
label_length = 0;
212+
label_source = NULL;
213+
214+
cfl_list_foreach(head, &ins->routes_direct) {
215+
route_path = cfl_list_entry(head, struct flb_router_path, _head);
216+
if (!route_path->ins) {
217+
continue;
218+
}
219+
220+
if (flb_routes_mask_get_bit(chunk->routes_mask,
221+
route_path->ins->id,
222+
ins->config) == 0) {
223+
continue;
224+
}
225+
226+
direct_count++;
227+
}
228+
229+
if (direct_count > 0) {
230+
direct_routes = flb_calloc((size_t) direct_count,
231+
sizeof(struct flb_chunk_direct_route));
232+
if (!direct_routes) {
233+
flb_errno();
234+
direct_count = 0;
235+
}
236+
}
237+
238+
if (direct_routes && direct_count > 0) {
239+
direct_index = 0;
240+
cfl_list_foreach(head, &ins->routes_direct) {
241+
route_path = cfl_list_entry(head, struct flb_router_path, _head);
242+
if (!route_path->ins) {
243+
continue;
244+
}
245+
246+
if (flb_routes_mask_get_bit(chunk->routes_mask,
247+
route_path->ins->id,
248+
ins->config) == 0) {
249+
continue;
250+
}
251+
252+
if (direct_index < direct_count) {
253+
label_source = route_path->ins->alias;
254+
label_length = 0;
255+
if (!label_source || label_source[0] == '\0') {
256+
label_source = route_path->ins->name;
257+
}
258+
if (label_source) {
259+
label_length = strlen(label_source);
260+
if (label_length > UINT16_MAX) {
261+
label_length = UINT16_MAX;
262+
}
263+
}
264+
direct_routes[direct_index].id = (uint32_t) route_path->ins->id;
265+
direct_routes[direct_index].label = label_source;
266+
direct_routes[direct_index].label_length = (uint16_t) label_length;
267+
direct_index++;
268+
}
269+
}
270+
271+
if (direct_index == direct_count) {
272+
write_ret = flb_input_chunk_write_header_v2(chunk->chunk,
273+
chunk->event_type,
274+
payload->tag,
275+
flb_sds_len(payload->tag),
276+
direct_routes,
277+
direct_count);
278+
if (write_ret != 0) {
279+
flb_plg_warn(ins,
280+
"failed to persist direct routes for chunk %s",
281+
flb_input_chunk_get_name(chunk));
282+
}
283+
}
284+
}
285+
286+
if (direct_routes) {
287+
flb_free(direct_routes);
288+
}
289+
201290
return 0;
202291
}
203292

0 commit comments

Comments
 (0)