@@ -360,6 +360,7 @@ struct flb_task *flb_task_create(uint64_t ref_id,
360360{
361361 int count = 0 ;
362362 int total_events = 0 ;
363+ int direct_count = 0 ;
363364 struct flb_task * task ;
364365 struct flb_event_chunk * evc ;
365366 struct flb_task_route * route ;
@@ -415,22 +416,41 @@ struct flb_task *flb_task_create(uint64_t ref_id,
415416
416417 /* Direct connects betweek input <> outputs (API based) */
417418 if (mk_list_size (& i_ins -> routes_direct ) > 0 ) {
419+ direct_count = 0 ;
420+
418421 mk_list_foreach (i_head , & i_ins -> routes_direct ) {
419422 route_path = mk_list_entry (i_head , struct flb_router_path , _head );
423+
424+ if (flb_router_path_should_route (task -> event_chunk , route_path ) == FLB_FALSE ) {
425+ continue ;
426+ }
427+
420428 o_ins = route_path -> ins ;
421429
422- route = flb_malloc ( sizeof (struct flb_task_route ));
430+ route = flb_calloc ( 1 , sizeof (struct flb_task_route ));
423431 if (!route ) {
424432 flb_errno ();
425433 task -> event_chunk -> data = NULL ;
426434 flb_task_destroy (task , FLB_TRUE );
427435 return NULL ;
428436 }
429437
438+ route -> status = FLB_TASK_ROUTE_INACTIVE ;
430439 route -> out = o_ins ;
431440 mk_list_add (& route -> _head , & task -> routes );
441+ direct_count ++ ;
442+ }
443+
444+ if (direct_count == 0 ) {
445+ flb_debug ("[task] dropping direct task=%p id=%i without matching routes" ,
446+ task , task -> id );
447+ task -> event_chunk -> data = NULL ;
448+ flb_task_destroy (task , FLB_TRUE );
449+ return NULL ;
432450 }
433- flb_debug ("[task] created direct task=%p id=%i OK" , task , task -> id );
451+
452+ flb_debug ("[task] created direct task=%p id=%i with %i route(s)" ,
453+ task , task -> id , direct_count );
434454 return task ;
435455 }
436456
@@ -444,7 +464,7 @@ struct flb_task *flb_task_create(uint64_t ref_id,
444464 continue ;
445465 }
446466
447- if (flb_routes_mask_get_bit (task_ic -> routes_mask ,
467+ if (flb_routes_mask_get_bit (task_ic -> routes_mask ,
448468 o_ins -> id ,
449469 o_ins -> config ) != 0 ) {
450470 route = flb_calloc (1 , sizeof (struct flb_task_route ));
0 commit comments