2525#include <fluent-bit/flb_storage.h>
2626#include <fluent-bit/multiline/flb_ml.h>
2727#include <fluent-bit/multiline/flb_ml_parser.h>
28+ #include <fluent-bit/flb_scheduler.h>
2829
2930#include "ml.h"
31+ #include "ml_concat.h"
3032
3133static struct ml_stream * get_by_id (struct ml_ctx * ctx , uint64_t stream_id );
3234
@@ -196,6 +198,7 @@ static int cb_ml_init(struct flb_filter_instance *ins,
196198 ctx -> ins = ins ;
197199 ctx -> debug_flush = FLB_FALSE ;
198200 ctx -> config = config ;
201+ ctx -> timer_created = FLB_FALSE ;
199202
200203 /*
201204 * Config map is not yet set at this point in the code
@@ -206,6 +209,26 @@ static int cb_ml_init(struct flb_filter_instance *ins,
206209 if (tmp ) {
207210 ctx -> use_buffer = flb_utils_bool (tmp );
208211 }
212+ ctx -> partial_mode = FLB_FALSE ;
213+ tmp = (char * ) flb_filter_get_property ("mode" , ins );
214+ if (tmp != NULL ) {
215+ if (strcasecmp (tmp , FLB_MULTILINE_MODE_PARTIAL_MESSAGE ) == 0 ) {
216+ ctx -> partial_mode = FLB_TRUE ;
217+ } else if (strcasecmp (tmp , FLB_MULTILINE_MODE_PARSER ) == 0 ) {
218+ ctx -> partial_mode = FLB_FALSE ;
219+ } else {
220+ flb_plg_error (ins , "'Mode' must be '%s' or '%s'" ,
221+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE ,
222+ FLB_MULTILINE_MODE_PARSER );
223+ return -1 ;
224+ }
225+ }
226+
227+ if (ctx -> partial_mode == FLB_TRUE && ctx -> use_buffer == FLB_FALSE ) {
228+ flb_plg_error (ins , "'%s' 'Mode' requires 'Buffer' to be 'On'" ,
229+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE );
230+ }
231+
209232 if (ctx -> use_buffer == FLB_FALSE ) {
210233 /* Init buffers */
211234 msgpack_sbuffer_init (& ctx -> mp_sbuf );
@@ -251,10 +274,25 @@ static int cb_ml_init(struct flb_filter_instance *ins,
251274 flb_free (ctx );
252275 return -1 ;
253276 }
254-
277+
255278 /* Set plugin context */
256279 flb_filter_set_context (ins , ctx );
257280
281+ if (ctx -> key_content == NULL && ctx -> partial_mode == FLB_TRUE ) {
282+ flb_plg_error (ins , "'Mode' '%s' requires 'multiline.key_content'" ,
283+ FLB_MULTILINE_MODE_PARTIAL_MESSAGE );
284+ flb_free (ctx );
285+ return -1 ;
286+ }
287+
288+ if (ctx -> partial_mode == FLB_FALSE && mk_list_size (ctx -> multiline_parsers ) == 0 ) {
289+ flb_plg_error (ins , "The default 'Mode' '%s' requires at least one 'multiline.parser'" ,
290+ FLB_MULTILINE_MODE_PARSER );
291+ flb_free (ctx );
292+ return -1 ;
293+ }
294+
295+
258296 if (ctx -> use_buffer == FLB_TRUE ) {
259297 /*
260298 * Emitter Storage Type: the emitter input plugin to be created by default
@@ -292,43 +330,46 @@ static int cb_ml_init(struct flb_filter_instance *ins,
292330#endif
293331 }
294332
295- /* Create multiline context */
296- ctx -> m = flb_ml_create (config , ctx -> ins -> name );
297- if (!ctx -> m ) {
298- /*
299- * we don't free the context since upon init failure, the exit
300- * callback will be triggered with our context set above.
301- */
302- return -1 ;
303- }
304-
305- /* Load the parsers/config */
306- ret = multiline_load_parsers (ctx );
307- if (ret == -1 ) {
308- return -1 ;
309- }
310-
311333 mk_list_init (& ctx -> ml_streams );
334+ mk_list_init (& ctx -> split_message_packers );
312335
313- if (ctx -> use_buffer == FLB_TRUE ) {
336+ if (ctx -> partial_mode == FLB_FALSE ) {
337+ /* Create multiline context */
338+ ctx -> m = flb_ml_create (config , ctx -> ins -> name );
339+ if (!ctx -> m ) {
340+ /*
341+ * we don't free the context since upon init failure, the exit
342+ * callback will be triggered with our context set above.
343+ */
344+ return -1 ;
345+ }
314346
315- ctx -> m -> flush_ms = ctx -> flush_ms ;
316- ret = flb_ml_auto_flush_init (ctx -> m );
347+ /* Load the parsers/config */
348+ ret = multiline_load_parsers (ctx );
317349 if (ret == -1 ) {
318350 return -1 ;
319351 }
320- } else {
321- /* Create a stream for this file */
322- len = strlen (ins -> name );
323- ret = flb_ml_stream_create (ctx -> m ,
324- ins -> name , len ,
325- flush_callback , ctx ,
326- & stream_id );
327- if (ret != 0 ) {
328- flb_plg_error (ctx -> ins , "could not create multiline stream" );
329- return -1 ;
352+
353+ if (ctx -> use_buffer == FLB_TRUE ) {
354+
355+ ctx -> m -> flush_ms = ctx -> flush_ms ;
356+ ret = flb_ml_auto_flush_init (ctx -> m );
357+ if (ret == -1 ) {
358+ return -1 ;
359+ }
360+ } else {
361+ /* Create a stream for this file */
362+ len = strlen (ins -> name );
363+ ret = flb_ml_stream_create (ctx -> m ,
364+ ins -> name , len ,
365+ flush_callback , ctx ,
366+ & stream_id );
367+ if (ret != 0 ) {
368+ flb_plg_error (ctx -> ins , "could not create multiline stream" );
369+ return -1 ;
370+ }
371+ ctx -> stream_id = stream_id ;
330372 }
331- ctx -> stream_id = stream_id ;
332373 }
333374
334375 return 0 ;
@@ -450,6 +491,176 @@ static struct ml_stream *get_or_create_stream(struct ml_ctx *ctx,
450491 return stream ;
451492}
452493
494+ static void partial_timer_cb (struct flb_config * config , void * data )
495+ {
496+ struct ml_ctx * ctx = data ;
497+ (void ) config ;
498+ struct mk_list * tmp ;
499+ struct mk_list * head ;
500+ struct split_message_packer * packer ;
501+ unsigned long long now ;
502+ unsigned long long diff ;
503+ int ret ;
504+
505+ now = ml_current_timestamp ();
506+
507+ mk_list_foreach_safe (head , tmp , & ctx -> split_message_packers ) {
508+ packer = mk_list_entry (head , struct split_message_packer , _head );
509+
510+ diff = now - packer -> last_write_time ;
511+ if (diff <= ctx -> flush_ms ) {
512+ continue ;
513+ }
514+
515+ mk_list_del (& packer -> _head );
516+ ml_split_message_packer_complete (packer );
517+ /* re-emit record with original tag */
518+ flb_plg_trace (ctx -> ins , "emitting from %s to %s" , packer -> input_name , packer -> tag );
519+ ret = in_emitter_add_record (packer -> tag , flb_sds_len (packer -> tag ),
520+ packer -> mp_sbuf .data , packer -> mp_sbuf .size ,
521+ ctx -> ins_emitter );
522+ if (ret < 0 ) {
523+ /* this shouldn't happen in normal execution */
524+ flb_plg_warn (ctx -> ins , "Couldn't send concatenated record of size %zu bytes to in_emitter %s" ,
525+ packer -> mp_sbuf .size , ctx -> ins_emitter -> name );
526+ }
527+ ml_split_message_packer_destroy (packer );
528+ }
529+
530+ }
531+
532+ static int ml_filter_partial (const void * data , size_t bytes ,
533+ const char * tag , int tag_len ,
534+ void * * out_buf , size_t * out_bytes ,
535+ struct flb_filter_instance * f_ins ,
536+ struct flb_input_instance * i_ins ,
537+ void * filter_context ,
538+ struct flb_config * config )
539+ {
540+ int ret ;
541+ int ok = MSGPACK_UNPACK_SUCCESS ;
542+ size_t off = 0 ;
543+ (void ) f_ins ;
544+ (void ) config ;
545+ msgpack_unpacked result ;
546+ msgpack_object * obj ;
547+ struct ml_ctx * ctx = filter_context ;
548+ struct flb_time tm ;
549+ msgpack_sbuffer tmp_sbuf ;
550+ msgpack_packer tmp_pck ;
551+ int partial_records = 0 ;
552+ int total_records = 0 ;
553+ int return_records = 0 ;
554+ int partial = FLB_FALSE ;
555+ int is_last_partial = FLB_FALSE ;
556+ struct split_message_packer * packer ;
557+ char * partial_id_str = NULL ;
558+ size_t partial_id_size = 0 ;
559+ struct flb_sched * sched ;
560+
561+ /*
562+ * create a timer that will run periodically and check if pending buffers
563+ * have expired
564+ * this is created once on the first flush
565+ */
566+ if (ctx -> timer_created == FLB_FALSE ) {
567+ flb_plg_debug (ctx -> ins ,
568+ "Creating flush timer with frequency %dms" ,
569+ ctx -> flush_ms );
570+
571+ sched = flb_sched_ctx_get ();
572+
573+ ret = flb_sched_timer_cb_create (sched , FLB_SCHED_TIMER_CB_PERM ,
574+ ctx -> flush_ms / 2 , partial_timer_cb ,
575+ ctx , NULL );
576+ if (ret < 0 ) {
577+ flb_plg_error (ctx -> ins , "Failed to create flush timer" );
578+ } else {
579+ ctx -> timer_created = FLB_TRUE ;
580+ }
581+ }
582+
583+ /*
584+ * Create temporary msgpack buffer
585+ * for non-partial messages which are passed on as-is
586+ */
587+ msgpack_sbuffer_init (& tmp_sbuf );
588+ msgpack_packer_init (& tmp_pck , & tmp_sbuf , msgpack_sbuffer_write );
589+
590+ msgpack_unpacked_init (& result );
591+ while (msgpack_unpack_next (& result , data , bytes , & off ) == ok ) {
592+ total_records ++ ;
593+ flb_time_pop_from_msgpack (& tm , & result , & obj );
594+
595+ partial = ml_is_partial (obj );
596+ if (partial == FLB_TRUE ) {
597+ partial_records ++ ;
598+ ret = ml_get_partial_id (obj , & partial_id_str , & partial_id_size );
599+ if (ret == -1 ) {
600+ flb_plg_warn (ctx -> ins , "Could not find partial_id but partial_message key is FLB_TRUE for record with tag %s" , tag );
601+ /* handle this record as non-partial */
602+ partial_records -- ;
603+ goto pack_non_partial ;
604+ }
605+ packer = ml_get_packer (& ctx -> split_message_packers , tag ,
606+ i_ins -> name , partial_id_str , partial_id_size );
607+ if (packer == NULL ) {
608+ flb_plg_trace (ctx -> ins , "Found new partial record with tag %s" , tag );
609+ packer = ml_create_packer (tag , i_ins -> name , partial_id_str , partial_id_size ,
610+ obj , ctx -> key_content , & tm );
611+ if (packer == NULL ) {
612+ flb_plg_warn (ctx -> ins , "Could not create packer for partial record with tag %s" , tag );
613+ /* handle this record as non-partial */
614+ partial_records -- ;
615+ goto pack_non_partial ;
616+ }
617+ mk_list_add (& packer -> _head , & ctx -> split_message_packers );
618+ }
619+ ret = ml_split_message_packer_write (packer , obj , ctx -> key_content );
620+ if (ret < 0 ) {
621+ flb_plg_warn (ctx -> ins , "Could not append content for partial record with tag %s" , tag );
622+ /* handle this record as non-partial */
623+ partial_records -- ;
624+ goto pack_non_partial ;
625+ }
626+ is_last_partial = ml_is_partial_last (obj );
627+ if (is_last_partial == FLB_TRUE ) {
628+ /* emit the record in this filter invocation */
629+ return_records ++ ;
630+ ml_split_message_packer_complete (packer );
631+ ml_append_complete_record (packer -> mp_sbuf .data , packer -> mp_sbuf .size , & tmp_pck );
632+ mk_list_del (& packer -> _head );
633+ ml_split_message_packer_destroy (packer );
634+ }
635+ } else {
636+
637+ pack_non_partial :
638+ return_records ++ ;
639+ /* record passed from filter as-is */
640+ msgpack_pack_array (& tmp_pck , 2 );
641+ flb_time_append_to_msgpack (& tm , & tmp_pck , 0 );
642+ msgpack_pack_object (& tmp_pck , * obj );
643+ }
644+
645+ }
646+
647+ msgpack_unpacked_destroy (& result );
648+
649+ if (partial_records == 0 ) {
650+ /* if no records were partial, we didn't modify the chunk */
651+ msgpack_sbuffer_destroy (& tmp_sbuf );
652+ return FLB_FILTER_NOTOUCH ;
653+ } else if (return_records > 0 ) {
654+ /* some new records can be returned now, return a new buffer */
655+ * out_buf = tmp_sbuf .data ;
656+ * out_bytes = tmp_sbuf .size ;
657+ } else {
658+ /* no records to return right now, free buffer */
659+ msgpack_sbuffer_destroy (& tmp_sbuf );
660+ }
661+ return FLB_FILTER_MODIFIED ;
662+ }
663+
453664static int cb_ml_filter (const void * data , size_t bytes ,
454665 const char * tag , int tag_len ,
455666 void * * out_buf , size_t * out_bytes ,
@@ -461,8 +672,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
461672 int ret ;
462673 int ok = MSGPACK_UNPACK_SUCCESS ;
463674 size_t off = 0 ;
464- (void ) out_buf ;
465- (void ) out_bytes ;
466675 (void ) f_ins ;
467676 (void ) config ;
468677 msgpack_unpacked result ;
@@ -473,6 +682,20 @@ static int cb_ml_filter(const void *data, size_t bytes,
473682 struct flb_time tm ;
474683 struct ml_stream * stream ;
475684
685+ if (i_ins == ctx -> ins_emitter ) {
686+ flb_plg_trace (ctx -> ins , "not processing records from the emitter" );
687+ return FLB_FILTER_NOTOUCH ;
688+ }
689+
690+ /* 'partial_message' mode */
691+ if (ctx -> partial_mode == FLB_TRUE ) {
692+ return ml_filter_partial (data , bytes , tag , tag_len ,
693+ out_buf , out_bytes ,
694+ f_ins , i_ins ,
695+ filter_context , config );
696+ }
697+
698+ /* 'parser' mode */
476699 if (ctx -> use_buffer == FLB_FALSE ) {
477700 /* reset mspgack size content */
478701 ctx -> mp_sbuf .size = 0 ;
@@ -518,10 +741,6 @@ static int cb_ml_filter(const void *data, size_t bytes,
518741 return FLB_FILTER_NOTOUCH ;
519742
520743 } else { /* buffered mode */
521- if (i_ins == ctx -> ins_emitter ) {
522- flb_plg_trace (ctx -> ins , "not processing record from the emitter" );
523- return FLB_FILTER_NOTOUCH ;
524- }
525744
526745 stream = get_or_create_stream (ctx , i_ins , tag , tag_len );
527746
@@ -595,6 +814,13 @@ static struct flb_config_map config_map[] = {
595814 "With buffer off, this filter will not work with most inputs, except tail."
596815 },
597816
817+ {
818+ FLB_CONFIG_MAP_STR , "mode" , "parser" ,
819+ 0 , FLB_TRUE , offsetof(struct ml_ctx , mode ),
820+ "Mode can be 'parser' for regex concat, or 'partial_message' to "
821+ "concat split docker logs."
822+ },
823+
598824 {
599825 FLB_CONFIG_MAP_INT , "flush_ms" , "2000" ,
600826 0 , FLB_TRUE , offsetof(struct ml_ctx , flush_ms ),
0 commit comments