@@ -60,6 +60,28 @@ static void clear_output_num()
6060 set_output_num (0 );
6161}
6262
63+ static int cb_count_msgpack (void * record , size_t size , void * data )
64+ {
65+ msgpack_unpacked result ;
66+ size_t off = 0 ;
67+
68+ if (!TEST_CHECK (data != NULL )) {
69+ flb_error ("data is NULL" );
70+ }
71+
72+ /* Iterate each item array and apply rules */
73+ msgpack_unpacked_init (& result );
74+ while (msgpack_unpack_next (& result , record , size , & off ) == MSGPACK_UNPACK_SUCCESS ) {
75+ pthread_mutex_lock (& result_mutex );
76+ num_output ++ ;
77+ pthread_mutex_unlock (& result_mutex );
78+ }
79+ msgpack_unpacked_destroy (& result );
80+
81+ flb_free (record );
82+ return 0 ;
83+ }
84+
6385/* Callback to check expected results */
6486static int cb_check_result_json (void * record , size_t size , void * data )
6587{
@@ -354,10 +376,77 @@ void flb_test_format_none_separator()
354376 test_ctx_destroy (ctx );
355377}
356378
379+ /*
380+ * Ingest 64k records.
381+ * https://github.com/fluent/fluent-bit/issues/5336
382+ */
383+ void flb_test_issue_5336 ()
384+ {
385+ struct flb_lib_out_cb cb_data ;
386+ struct test_ctx * ctx ;
387+ flb_sockfd_t fd ;
388+ int ret ;
389+ int num ;
390+ ssize_t w_size ;
391+ int not_used ;
392+ int i ;
393+ int count = 65535 ;
394+
395+ char * buf = "{\"test\":\"msg\"}" ;
396+ size_t size = strlen (buf );
397+
398+ clear_output_num ();
399+
400+ cb_data .cb = cb_count_msgpack ;
401+ cb_data .data = & not_used ;
402+
403+ ctx = test_ctx_create (& cb_data );
404+ if (!TEST_CHECK (ctx != NULL )) {
405+ TEST_MSG ("test_ctx_create failed" );
406+ exit (EXIT_FAILURE );
407+ }
408+
409+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
410+ "match" , "*" ,
411+ NULL );
412+ TEST_CHECK (ret == 0 );
413+
414+ /* Start the engine */
415+ ret = flb_start (ctx -> flb );
416+ TEST_CHECK (ret == 0 );
417+
418+ /* use default host/port */
419+ fd = connect_tcp (NULL , -1 );
420+ if (!TEST_CHECK (fd >= 0 )) {
421+ exit (EXIT_FAILURE );
422+ }
423+
424+ for (i = 0 ; i < count ; i ++ ) {
425+ w_size = send (fd , buf , size , 0 );
426+ if (!TEST_CHECK (w_size == size )) {
427+ TEST_MSG ("failed to send, count=%d errno=%d" ,i , errno );
428+ flb_socket_close (fd );
429+ exit (EXIT_FAILURE );
430+ }
431+ }
432+
433+ /* waiting to flush */
434+ flb_time_msleep (2500 );
435+
436+ num = get_output_num ();
437+ if (!TEST_CHECK (num == count )) {
438+ TEST_MSG ("got %d, expected: %d" , num , count );
439+ }
440+
441+ flb_socket_close (fd );
442+ test_ctx_destroy (ctx );
443+ }
444+
357445TEST_LIST = {
358446 {"tcp" , flb_test_tcp },
359447 {"format_none" , flb_test_format_none },
360448 {"format_none_separator" , flb_test_format_none_separator },
449+ {"65535_records_issue_5336" , flb_test_issue_5336 },
361450 {NULL , NULL }
362451};
363452
0 commit comments