@@ -66,8 +66,9 @@ static int cb_doris_init(struct flb_output_instance *ins,
6666}
6767
6868static int http_put (struct flb_out_doris * ctx ,
69- const void * body , size_t body_len ,
70- const char * tag , int tag_len )
69+ const char * host , int port ,
70+ const void * body , size_t body_len ,
71+ const char * tag , int tag_len )
7172{
7273 int ret ;
7374 int out_ret = FLB_OK ;
@@ -79,7 +80,16 @@ static int http_put(struct flb_out_doris *ctx,
7980 struct flb_http_client * c ;
8081
8182 /* Get upstream context and connection */
82- u = ctx -> u ;
83+ if (strcmp (host , ctx -> host ) == 0 && port == ctx -> port ) {
84+ u = ctx -> u ;
85+ }
86+ else {
87+ u = flb_upstream_create (ctx -> u -> base .config ,
88+ host ,
89+ port ,
90+ ctx -> u -> base .flags ,
91+ ctx -> u -> base .tls_context );
92+ }
8393 u_conn = flb_upstream_conn_get (u );
8494 if (!u_conn ) {
8595 flb_plg_error (ctx -> ins , "no upstream connections available to %s:%i" ,
@@ -94,7 +104,7 @@ static int http_put(struct flb_out_doris *ctx,
94104 /* Create HTTP client context */
95105 c = flb_http_client (u_conn , FLB_HTTP_PUT , ctx -> uri ,
96106 payload_buf , payload_size ,
97- ctx -> host , ctx -> port ,
107+ host , port ,
98108 NULL , 0 );
99109
100110 /*
@@ -120,13 +130,32 @@ static int http_put(struct flb_out_doris *ctx,
120130
121131 ret = flb_http_do (c , & b_sent );
122132 if (ret == 0 ) {
123- flb_plg_info (ctx -> ins , "%s:%i, HTTP status=%i\n%s\n" ,
124- ctx -> host , ctx -> port ,
125- c -> resp .status , c -> resp .payload );
126- if (c -> resp .payload_size > 0 &&
127- (strstr (c -> resp .payload , "\"Status\": \"Success\"" ) != NULL ||
128- strstr (c -> resp .payload , "\"Status\": \"Publish Timeout\"" ) != NULL )) {
133+ flb_plg_debug (ctx -> ins , "%s:%i, HTTP status=%i\n%s\n" ,
134+ host , port ,
135+ c -> resp .status , c -> resp .payload );
136+ if (c -> resp .status == 307 ) { // redict
137+ // example: Location: http://admin:[email protected] :8040/api/d_fb/t_fb/_stream_load? 138+ char * location = strstr (c -> resp .data , "Location:" );
139+ char * start = strstr (location , "@" ) + 1 ;
140+ char * mid = strstr (start , ":" );
141+ char * end = strstr (mid , "/api" );
142+ char redict_host [50 ] = {0 };
143+ memcpy (redict_host , start , mid - start );
144+ char redict_port [10 ] = {0 };
145+ memcpy (redict_port , mid + 1 , end - (mid + 1 ));
146+
147+ out_ret = http_put (ctx , redict_host , atoi (redict_port ),
148+ body , body_len , tag , tag_len );
149+ }
150+ else if (c -> resp .status == 200 ) {
151+ if (c -> resp .payload_size > 0 &&
152+ (strstr (c -> resp .payload , "\"Status\": \"Success\"" ) != NULL ||
153+ strstr (c -> resp .payload , "\"Status\": \"Publish Timeout\"" ) != NULL )) {
129154 // continue
155+ }
156+ else {
157+ out_ret = FLB_RETRY ;
158+ }
130159 }
131160 else {
132161 out_ret = FLB_RETRY ;
@@ -154,6 +183,11 @@ static int http_put(struct flb_out_doris *ctx,
154183 /* Release the TCP connection */
155184 flb_upstream_conn_release (u_conn );
156185
186+ /* Release flb_upstream */
187+ if (u != ctx -> u ) {
188+ flb_upstream_destroy (u );
189+ }
190+
157191 return out_ret ;
158192}
159193
@@ -169,7 +203,7 @@ static int compose_payload(struct flb_out_doris *ctx,
169203 encoded = flb_pack_msgpack_to_json_format (in_body ,
170204 in_size ,
171205 FLB_PACK_JSON_FORMAT_JSON ,
172- FLB_PACK_JSON_DATE_DOUBLE ,
206+ FLB_PACK_JSON_DATE_EPOCH ,
173207 ctx -> time_key );
174208 if (encoded == NULL ) {
175209 flb_plg_error (ctx -> ins , "failed to convert json" );
@@ -178,7 +212,7 @@ static int compose_payload(struct flb_out_doris *ctx,
178212 * out_body = (void * )encoded ;
179213 * out_size = flb_sds_len (encoded );
180214
181- flb_plg_info (ctx -> ins , "%s" , (char * ) * out_body );
215+ flb_plg_debug (ctx -> ins , "http body: %s" , (char * ) * out_body );
182216
183217 return FLB_OK ;
184218}
@@ -202,8 +236,8 @@ static void cb_doris_flush(struct flb_event_chunk *event_chunk,
202236 FLB_OUTPUT_RETURN (ret );
203237 }
204238
205- ret = http_put (ctx , out_body , out_size ,
206- event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
239+ ret = http_put (ctx , ctx -> host , ctx -> port , out_body , out_size ,
240+ event_chunk -> tag , flb_sds_len (event_chunk -> tag ));
207241 flb_sds_destroy (out_body );
208242
209243 FLB_OUTPUT_RETURN (ret );
@@ -219,8 +253,6 @@ static int cb_doris_exit(void *data, struct flb_config *config)
219253
220254/* Configuration properties map */
221255static struct flb_config_map config_map [] = {
222- // host
223- // port
224256 // user
225257 {
226258 FLB_CONFIG_MAP_STR , "user" , NULL ,
0 commit comments