@@ -248,19 +248,24 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
248248 continue ;
249249 }
250250
251- /* Empty line (just \n) */
252- if (len == 0 ) {
251+ /*
252+ * Empty line (just \n): we skip empty lines only if we are NOT using
253+ * multiline core mode.
254+ */
255+ if (len == 0 && !ctx -> ml_ctx ) {
253256 data ++ ;
254257 processed_bytes ++ ;
255258 continue ;
256259 }
257260
258261 /* Process '\r\n' */
259- crlf = (data [len - 1 ] == '\r' );
260- if (len == 1 && crlf ) {
261- data += 2 ;
262- processed_bytes += 2 ;
263- continue ;
262+ if (len >= 2 ) {
263+ crlf = (data [len - 1 ] == '\r' );
264+ if (len == 1 && crlf ) {
265+ data += 2 ;
266+ processed_bytes += 2 ;
267+ continue ;
268+ }
264269 }
265270
266271 /* Reset time for each line */
@@ -269,7 +274,14 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
269274 line = data ;
270275 line_len = len - crlf ;
271276 repl_line = NULL ;
272- if (ctx -> docker_mode ) {
277+
278+ if (ctx -> ml_ctx ) {
279+ ret = flb_ml_append (ctx -> ml_ctx , file -> ml_stream_id ,
280+ FLB_ML_TYPE_TEXT ,
281+ & out_time , line , line_len );
282+ goto go_next ;
283+ }
284+ else if (ctx -> docker_mode ) {
273285 ret = flb_tail_dmode_process_content (now , line , line_len ,
274286 & repl_line , & repl_line_len ,
275287 file , ctx , out_sbuf , out_pck );
@@ -627,11 +639,26 @@ static int set_file_position(struct flb_tail_config *ctx,
627639 return 0 ;
628640}
629641
642+ static int ml_flush_callback (struct flb_ml_parser * parser ,
643+ struct flb_ml_stream * mst ,
644+ void * data , char * buf_data , size_t buf_size )
645+ {
646+ struct flb_tail_file * file = data ;
647+
648+ flb_input_chunk_append_raw (file -> config -> ins ,
649+ file -> tag_buf ,
650+ file -> tag_len ,
651+ buf_data ,
652+ buf_size );
653+ return 0 ;
654+ }
655+
630656int flb_tail_file_append (char * path , struct stat * st , int mode ,
631657 struct flb_tail_config * ctx )
632658{
633659 int fd ;
634660 int ret ;
661+ uint64_t stream_id ;
635662 size_t len ;
636663 char * tag ;
637664 size_t tag_len ;
@@ -716,6 +743,22 @@ int flb_tail_file_append(char *path, struct stat *st, int mode,
716743 file -> skip_next = FLB_FALSE ;
717744 file -> skip_warn = FLB_FALSE ;
718745
746+ /* Multiline core mode */
747+ if (ctx -> ml_ctx ) {
748+ /* Create a stream for this file */
749+ ret = flb_ml_stream_create (ctx -> ml_ctx ,
750+ file -> name , file -> name_len ,
751+ ml_flush_callback , file ,
752+ & stream_id );
753+ if (ret != 0 ) {
754+ flb_plg_error (ctx -> ins ,
755+ "could not create multiline stream for file: %s" ,
756+ file -> name );
757+ goto error ;
758+ }
759+ file -> ml_stream_id = stream_id ;
760+ }
761+
719762 /* Local buffer */
720763 file -> buf_size = ctx -> buf_chunk_size ;
721764 file -> buf_data = flb_malloc (file -> buf_size );
@@ -816,6 +859,11 @@ void flb_tail_file_remove(struct flb_tail_file *file)
816859 flb_plg_debug (ctx -> ins , "inode=%" PRIu64 " removing file name %s" ,
817860 file -> inode , file -> name );
818861
862+ /* remove the multiline.core stream */
863+ if (ctx -> ml_ctx && file -> ml_stream_id > 0 ) {
864+ flb_ml_stream_id_destroy_all (ctx -> ml_ctx , file -> ml_stream_id );
865+ }
866+
819867 if (file -> rotated > 0 ) {
820868#ifdef FLB_HAVE_SQLDB
821869 /*
@@ -833,6 +881,7 @@ void flb_tail_file_remove(struct flb_tail_file *file)
833881 flb_sds_destroy (file -> dmode_lastline );
834882 mk_list_del (& file -> _head );
835883 flb_tail_fs_remove (ctx , file );
884+
836885 /* avoid deleting file with -1 fd */
837886 if (file -> fd != -1 ) {
838887 close (file -> fd );
0 commit comments