@@ -1319,8 +1319,6 @@ int flb_input_instance_init(struct flb_input_instance *ins,
13191319 return -1 ;
13201320 }
13211321
1322- //ins->notification_channel = ins->thi->notification_channels[1];
1323-
13241322 /* register the ring buffer */
13251323 ret = flb_ring_buffer_add_event_loop (ins -> rb , config -> evl , FLB_INPUT_RING_BUFFER_WINDOW );
13261324 if (ret ) {
@@ -1338,23 +1336,22 @@ int flb_input_instance_init(struct flb_input_instance *ins,
13381336 }
13391337
13401338 ins -> notification_channel = config -> notification_channels [1 ];
1339+ ins -> processor -> notification_channel = ins -> notification_channel ;
13411340
13421341 ret = p -> cb_init (ins , config , ins -> data );
13431342 if (ret != 0 ) {
13441343 flb_error ("failed initialize input %s" ,
13451344 ins -> name );
13461345 return -1 ;
13471346 }
1348- }
1349- }
13501347
1351- ins -> processor -> notification_channel = ins -> notification_channel ;
1352-
1353- /* initialize processors */
1354- ret = flb_processor_init ( ins -> processor );
1355- if ( ret == -1 ) {
1356- flb_error ( "[input %s] error initializing processor, aborting startup" , ins -> name );
1357- return -1 ;
1348+ ret = flb_processor_init ( ins -> processor ) ;
1349+ if ( ret == -1 ) {
1350+ flb_error ( "failed initialize processors for input %s" ,
1351+ ins -> name );
1352+ return -1 ;
1353+ }
1354+ }
13581355 }
13591356
13601357 return 0 ;
@@ -1581,7 +1578,11 @@ static struct flb_input_collector *collector_create(int type,
15811578 coll -> evl = thi -> evl ;
15821579 }
15831580 else {
1584- coll -> evl = config -> evl ;
1581+ /* We need to obtain the event loop from the TLS when
1582+ * creating collectors for non threaded plugins running
1583+ * under a threaded plugin.
1584+ */
1585+ coll -> evl = flb_engine_evl_get ();
15851586 }
15861587
15871588 /*
0 commit comments