@@ -97,6 +97,18 @@ int callback_test(void* data, size_t size, void* cb_data)
9797 return 0 ;
9898}
9999
100+ int callback_cat (void * data , size_t size , void * cb_data )
101+ {
102+ flb_sds_t * outbuf = cb_data ;
103+ if (size > 0 ) {
104+ flb_debug ("[test_filter_lua] received message: %s" , data );
105+ pthread_mutex_lock (& result_mutex );
106+ flb_sds_cat_safe (outbuf , data , size );
107+ pthread_mutex_unlock (& result_mutex );
108+ }
109+ return 0 ;
110+ }
111+
100112void delete_script ()
101113{
102114 unlink (TMP_LUA_PATH );
@@ -621,6 +633,80 @@ void flb_test_drop_all_records(void)
621633 flb_destroy (ctx );
622634}
623635
636+ /* https://github.com/fluent/fluent-bit/issues/5496 */
637+ void flb_test_split_record (void )
638+ {
639+ int ret ;
640+ flb_ctx_t * ctx ;
641+ int in_ffd ;
642+ int out_ffd ;
643+ int filter_ffd ;
644+ struct flb_lib_out_cb cb_data ;
645+ char * output = NULL ;
646+ flb_sds_t outbuf = flb_sds_create ("" );
647+ char * input = "[0, {\"x\": [ "
648+ "{\"a1\":\"aa\"}, "
649+ "{\"b1\":\"bb\"}, "
650+ "{\"c1\":\"cc\"} ]}]" ;
651+ const char * expected =
652+ "[5.000000,{\"a1\":\"aa\"}]"
653+ "[5.000000,{\"b1\":\"bb\"}]"
654+ "[5.000000,{\"c1\":\"cc\"}]" ;
655+ char * script_body = ""
656+ "function lua_main(tag, timestamp, record)\n"
657+ " return 1, 5, record.x\n"
658+ "end\n" ;
659+
660+ /* Create context, flush every second (some checks omitted here) */
661+ ctx = flb_create ();
662+ flb_service_set (ctx , "flush" , "1" , "grace" , "1" , NULL );
663+
664+ /* Prepare output callback context*/
665+ cb_data .cb = callback_cat ;
666+ cb_data .data = & outbuf ;
667+
668+ ret = create_script (script_body , strlen (script_body ));
669+ TEST_CHECK (ret == 0 );
670+ /* Filter */
671+ filter_ffd = flb_filter (ctx , (char * ) "lua" , NULL );
672+ TEST_CHECK (filter_ffd >= 0 );
673+ ret = flb_filter_set (ctx , filter_ffd ,
674+ "Match" , "*" ,
675+ "call" , "lua_main" ,
676+ "script" , TMP_LUA_PATH ,
677+ NULL );
678+
679+ /* Input */
680+ in_ffd = flb_input (ctx , (char * ) "lib" , NULL );
681+ flb_input_set (ctx , in_ffd , "tag" , "test" , NULL );
682+ TEST_CHECK (in_ffd >= 0 );
683+
684+ /* Lib output */
685+ out_ffd = flb_output (ctx , (char * ) "lib" , (void * )& cb_data );
686+ TEST_CHECK (out_ffd >= 0 );
687+ flb_output_set (ctx , out_ffd ,
688+ "match" , "test" ,
689+ "format" , "json" ,
690+ NULL );
691+
692+ ret = flb_start (ctx );
693+ TEST_CHECK (ret == 0 );
694+
695+ flb_lib_push (ctx , in_ffd , input , strlen (input ));
696+ sleep (1 );
697+ if (!TEST_CHECK (!strcmp (outbuf , expected ))) {
698+ TEST_MSG ("expected:\n%s\ngot:\n%s\n" , expected , outbuf );
699+ }
700+
701+ /* clean up */
702+ flb_lib_free (output );
703+ delete_script ();
704+
705+ flb_stop (ctx );
706+ flb_destroy (ctx );
707+ flb_sds_destroy (outbuf );
708+ }
709+
624710TEST_LIST = {
625711 {"hello_world" , flb_test_helloworld },
626712 {"append_tag" , flb_test_append_tag },
@@ -629,5 +715,6 @@ TEST_LIST = {
629715 {"type_array_key" , flb_test_type_array_key },
630716 {"array_contains_null" , flb_test_array_contains_null },
631717 {"drop_all_records" , flb_test_drop_all_records },
718+ {"split_record" , flb_test_split_record },
632719 {NULL , NULL }
633720};
0 commit comments