1+ /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+ /* Fluent Bit
4+ * ==========
5+ * Copyright (C) 2019-2022 The Fluent Bit Authors
6+ * Copyright (C) 2015-2018 Treasure Data Inc.
7+ *
8+ * Licensed under the Apache License, Version 2.0 (the "License");
9+ * you may not use this file except in compliance with the License.
10+ * You may obtain a copy of the License at
11+ *
12+ * http://www.apache.org/licenses/LICENSE-2.0
13+ *
14+ * Unless required by applicable law or agreed to in writing, software
15+ * distributed under the License is distributed on an "AS IS" BASIS,
16+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17+ * See the License for the specific language governing permissions and
18+ * limitations under the License.
19+ */
20+ #include <fluent-bit.h>
21+ #include <fluent-bit/flb_sds.h>
22+ #include <fluent-bit/flb_time.h>
23+ #include <float.h>
24+ #include <math.h>
25+ #include <msgpack.h>
26+ #include "flb_tests_runtime.h"
27+
28+ struct test_ctx {
29+ flb_ctx_t * flb ; /* Fluent Bit library context */
30+ int i_ffd ; /* Input fd */
31+ int f_ffd ; /* Filter fd (unused) */
32+ int o_ffd ; /* Output fd */
33+ };
34+
35+ pthread_mutex_t result_mutex = PTHREAD_MUTEX_INITIALIZER ;
36+ int num_output = 0 ;
37+ static int get_output_num ()
38+ {
39+ int ret ;
40+ pthread_mutex_lock (& result_mutex );
41+ ret = num_output ;
42+ pthread_mutex_unlock (& result_mutex );
43+
44+ return ret ;
45+ }
46+
47+ static void set_output_num (int num )
48+ {
49+ pthread_mutex_lock (& result_mutex );
50+ num_output = num ;
51+ pthread_mutex_unlock (& result_mutex );
52+ }
53+
54+ static void clear_output_num ()
55+ {
56+ set_output_num (0 );
57+ }
58+
59+ struct str_list {
60+ size_t size ;
61+ char * * lists ;
62+ };
63+
64+ /* Callback to check expected results */
65+ static void cb_check_str_list (void * ctx , int ffd , int res_ret ,
66+ void * res_data , size_t res_size , void * data )
67+ {
68+ char * p ;
69+ flb_sds_t out_line = res_data ;
70+ int num = get_output_num ();
71+ size_t i ;
72+ struct str_list * l = (struct str_list * )data ;
73+
74+ if (!TEST_CHECK (res_data != NULL )) {
75+ TEST_MSG ("res_data is NULL" );
76+ return ;
77+ }
78+
79+ if (!TEST_CHECK (l != NULL )) {
80+ TEST_MSG ("l is NULL" );
81+ flb_sds_destroy (out_line );
82+ return ;
83+ }
84+
85+ if (!TEST_CHECK (res_ret == 0 )) {
86+ TEST_MSG ("callback ret=%d" , res_ret );
87+ }
88+ if (!TEST_CHECK (res_data != NULL )) {
89+ TEST_MSG ("res_data is NULL" );
90+ flb_sds_destroy (out_line );
91+ return ;
92+ }
93+
94+ for (i = 0 ; i < l -> size ; i ++ ) {
95+ p = strstr (out_line , l -> lists [i ]);
96+ if (!TEST_CHECK (p != NULL )) {
97+ TEST_MSG (" Got :%s\n expect:%s" , out_line , l -> lists [i ]);
98+ }
99+ }
100+ set_output_num (num + 1 );
101+
102+ flb_sds_destroy (out_line );
103+ }
104+
105+ static struct test_ctx * test_ctx_create ()
106+ {
107+ int i_ffd ;
108+ int o_ffd ;
109+ struct test_ctx * ctx = NULL ;
110+
111+ ctx = flb_malloc (sizeof (struct test_ctx ));
112+ if (!TEST_CHECK (ctx != NULL )) {
113+ TEST_MSG ("malloc failed" );
114+ flb_errno ();
115+ return NULL ;
116+ }
117+
118+ /* Service config */
119+ ctx -> flb = flb_create ();
120+ flb_service_set (ctx -> flb ,
121+ "Flush" , "0.200000000" ,
122+ "Grace" , "1" ,
123+ "Log_Level" , "error" ,
124+ NULL );
125+
126+ /* Input */
127+ i_ffd = flb_input (ctx -> flb , (char * ) "lib" , NULL );
128+ TEST_CHECK (i_ffd >= 0 );
129+ ctx -> i_ffd = i_ffd ;
130+
131+ /* Output */
132+ o_ffd = flb_output (ctx -> flb , (char * ) "doris" , NULL );
133+ ctx -> o_ffd = o_ffd ;
134+
135+ return ctx ;
136+ }
137+
138+ static void test_ctx_destroy (struct test_ctx * ctx )
139+ {
140+ TEST_CHECK (ctx != NULL );
141+
142+ sleep (1 );
143+ flb_stop (ctx -> flb );
144+ flb_destroy (ctx -> flb );
145+ flb_free (ctx );
146+ }
147+
148+ void flb_test_json ()
149+ {
150+ struct test_ctx * ctx ;
151+ int ret ;
152+ int num ;
153+
154+ char * buf1 = "[1, {\"msg\":\"hello world\"}]" ;
155+ size_t size1 = strlen (buf1 );
156+ char * buf2 = "[2, {\"msg\":\"hello world\"}]" ;
157+ size_t size2 = strlen (buf2 );
158+
159+ char * expected_strs [] = {"[{\"date\":1.0,\"msg\":\"hello world\"},{\"date\":2.0,\"msg\":\"hello world\"}]" };
160+ struct str_list expected = {
161+ .size = sizeof (expected_strs )/sizeof (char * ),
162+ .lists = & expected_strs [0 ],
163+ };
164+
165+ clear_output_num ();
166+
167+ ctx = test_ctx_create ();
168+ if (!TEST_CHECK (ctx != NULL )) {
169+ TEST_MSG ("test_ctx_create failed" );
170+ exit (EXIT_FAILURE );
171+ }
172+
173+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
174+ "match" , "*" ,
175+ "user" , "admin" ,
176+ "database" , "d_fb" ,
177+ "table" , "t_fb" ,
178+ NULL );
179+ TEST_CHECK (ret == 0 );
180+
181+ ret = flb_output_set_test (ctx -> flb , ctx -> o_ffd ,
182+ "formatter" , cb_check_str_list ,
183+ & expected , NULL );
184+ TEST_CHECK (ret == 0 );
185+
186+ /* Start the engine */
187+ ret = flb_start (ctx -> flb );
188+ TEST_CHECK (ret == 0 );
189+
190+ /* Ingest data sample */
191+ ret = flb_lib_push (ctx -> flb , ctx -> i_ffd , (char * ) buf1 , size1 );
192+ TEST_CHECK (ret >= 0 );
193+ ret = flb_lib_push (ctx -> flb , ctx -> i_ffd , (char * ) buf2 , size2 );
194+ TEST_CHECK (ret >= 0 );
195+
196+ /* waiting to flush */
197+ flb_time_msleep (500 );
198+
199+ num = get_output_num ();
200+ if (!TEST_CHECK (num > 0 )) {
201+ TEST_MSG ("no outputs" );
202+ }
203+
204+ test_ctx_destroy (ctx );
205+ }
206+
207+ void flb_test_time_key ()
208+ {
209+ struct test_ctx * ctx ;
210+ int ret ;
211+ int num ;
212+
213+ char * buf1 = "[1, {\"msg\":\"hello world\"}]" ;
214+ size_t size1 = strlen (buf1 );
215+
216+ char * expected_strs [] = {"{\"timestamp\":1.0,\"msg\":\"hello world\"}" };
217+ struct str_list expected = {
218+ .size = sizeof (expected_strs )/sizeof (char * ),
219+ .lists = & expected_strs [0 ],
220+ };
221+
222+ clear_output_num ();
223+
224+ ctx = test_ctx_create ();
225+ if (!TEST_CHECK (ctx != NULL )) {
226+ TEST_MSG ("test_ctx_create failed" );
227+ exit (EXIT_FAILURE );
228+ }
229+
230+ ret = flb_output_set (ctx -> flb , ctx -> o_ffd ,
231+ "match" , "*" ,
232+ "user" , "admin" ,
233+ "database" , "d_fb" ,
234+ "table" , "t_fb" ,
235+ "time_key" , "timestamp" ,
236+ NULL );
237+ TEST_CHECK (ret == 0 );
238+
239+ ret = flb_output_set_test (ctx -> flb , ctx -> o_ffd ,
240+ "formatter" , cb_check_str_list ,
241+ & expected , NULL );
242+ TEST_CHECK (ret == 0 );
243+
244+ /* Start the engine */
245+ ret = flb_start (ctx -> flb );
246+ TEST_CHECK (ret == 0 );
247+
248+ /* Ingest data sample */
249+ ret = flb_lib_push (ctx -> flb , ctx -> i_ffd , (char * ) buf1 , size1 );
250+ TEST_CHECK (ret >= 0 );
251+
252+ /* waiting to flush */
253+ flb_time_msleep (500 );
254+
255+ num = get_output_num ();
256+ if (!TEST_CHECK (num > 0 )) {
257+ TEST_MSG ("no outputs" );
258+ }
259+
260+ test_ctx_destroy (ctx );
261+ }
262+
263+ /* Test list */
264+ TEST_LIST = {
265+ {"json" , flb_test_json },
266+ {"time_key" , flb_test_time_key },
267+ {NULL , NULL }
268+ };
0 commit comments