Skip to content

Commit d47f9ce

Browse files
committed
feat: add doris out plugin
Signed-off-by: composer <[email protected]>
1 parent 30eb89e commit d47f9ce

File tree

6 files changed

+507
-0
lines changed

6 files changed

+507
-0
lines changed

plugins/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,7 @@ REGISTER_OUT_PLUGIN("out_prometheus_remote_write")
344344
REGISTER_OUT_PLUGIN("out_s3")
345345
REGISTER_OUT_PLUGIN("out_vivo_exporter")
346346
REGISTER_OUT_PLUGIN("out_chronicle")
347+
REGISTER_OUT_PLUGIN("out_doris")
347348

348349
# FILTERS
349350
# =======

plugins/out_doris/CMakeLists.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
set(src
2+
doris.c
3+
doris_conf.c
4+
)
5+
6+
FLB_PLUGIN(out_doris "${src}" "")

plugins/out_doris/doris.c

Lines changed: 306 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,306 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2024 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#include <fluent-bit/flb_output_plugin.h>
21+
#include <fluent-bit/flb_output.h>
22+
#include <fluent-bit/flb_http_client.h>
23+
#include <fluent-bit/flb_pack.h>
24+
#include <fluent-bit/flb_str.h>
25+
#include <fluent-bit/flb_time.h>
26+
#include <fluent-bit/flb_utils.h>
27+
#include <fluent-bit/flb_pack.h>
28+
#include <fluent-bit/flb_sds.h>
29+
#include <fluent-bit/flb_gzip.h>
30+
#include <fluent-bit/flb_record_accessor.h>
31+
#include <fluent-bit/flb_log_event_decoder.h>
32+
#include <msgpack.h>
33+
34+
#include <stdio.h>
35+
#include <stdlib.h>
36+
#include <string.h>
37+
#include <assert.h>
38+
#include <errno.h>
39+
40+
#include "doris.h"
41+
#include "doris_conf.h"
42+
43+
#include <fluent-bit/flb_callback.h>
44+
45+
static int cb_doris_init(struct flb_output_instance *ins,
46+
struct flb_config *config, void *data)
47+
{
48+
struct flb_out_doris *ctx = NULL;
49+
(void) data;
50+
51+
ctx = flb_doris_conf_create(ins, config);
52+
if (!ctx) {
53+
return -1;
54+
}
55+
56+
/* Set the plugin context */
57+
flb_output_set_context(ins, ctx);
58+
59+
/*
60+
* This plugin instance uses the HTTP client interface, let's register
61+
* it debugging callbacks.
62+
*/
63+
flb_output_set_http_debug_callbacks(ins);
64+
65+
return 0;
66+
}
67+
68+
static int http_put(struct flb_out_doris *ctx,
69+
const void *body, size_t body_len,
70+
const char *tag, int tag_len)
71+
{
72+
int ret;
73+
int out_ret = FLB_OK;
74+
size_t b_sent;
75+
void *payload_buf = NULL;
76+
size_t payload_size = 0;
77+
struct flb_upstream *u;
78+
struct flb_connection *u_conn;
79+
struct flb_http_client *c;
80+
81+
/* Get upstream context and connection */
82+
u = ctx->u;
83+
u_conn = flb_upstream_conn_get(u);
84+
if (!u_conn) {
85+
flb_plg_error(ctx->ins, "no upstream connections available to %s:%i",
86+
u->tcp_host, u->tcp_port);
87+
return FLB_RETRY;
88+
}
89+
90+
/* Map payload */
91+
payload_buf = (void *) body;
92+
payload_size = body_len;
93+
94+
/* Create HTTP client context */
95+
c = flb_http_client(u_conn, FLB_HTTP_PUT, ctx->uri,
96+
payload_buf, payload_size,
97+
ctx->host, ctx->port,
98+
NULL, 0);
99+
100+
/*
101+
* Direct assignment of the callback context to the HTTP client context.
102+
* This needs to be improved through a more clean API.
103+
*/
104+
c->cb_ctx = ctx->ins->callback;
105+
106+
/* Append headers */
107+
flb_http_add_header(c, "format", 6, "json", 4);
108+
flb_http_add_header(c, "Expect", 6, "100-continue", 12);
109+
flb_http_add_header(c, "strip_outer_array", 17, "true", 4);
110+
flb_http_add_header(c, "columns", 7, ctx->columns, strlen(ctx->columns));
111+
flb_http_add_header(c, "User-Agent", 10, "Fluent-Bit", 10);
112+
if (ctx->timeout_second > 0) {
113+
char timeout[256];
114+
snprintf(timeout, sizeof(timeout) - 1, "%d", ctx->timeout_second);
115+
flb_http_add_header(c, "timeout", 7, timeout, strlen(timeout));
116+
}
117+
118+
/* Basic Auth headers */
119+
flb_http_basic_auth(c, ctx->user, ctx->password);
120+
121+
ret = flb_http_do(c, &b_sent);
122+
if (ret == 0) {
123+
flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%s\n",
124+
ctx->host, ctx->port,
125+
c->resp.status, c->resp.payload);
126+
if (c->resp.payload_size > 0 &&
127+
(strstr(c->resp.payload, "\"Status\": \"Success\"") != NULL ||
128+
strstr(c->resp.payload, "\"Status\": \"Publish Timeout\"") != NULL)) {
129+
// continue
130+
}
131+
else {
132+
out_ret = FLB_RETRY;
133+
}
134+
}
135+
else {
136+
flb_plg_error(ctx->ins, "could not flush records to %s:%i (http_do=%i)",
137+
ctx->host, ctx->port, ret);
138+
out_ret = FLB_RETRY;
139+
}
140+
141+
/* cleanup */
142+
143+
/*
144+
* If the payload buffer is different than incoming records in body, means
145+
* we generated a different payload and must be freed.
146+
*/
147+
if (payload_buf != body) {
148+
flb_free(payload_buf);
149+
}
150+
151+
/* Destroy HTTP client context */
152+
flb_http_client_destroy(c);
153+
154+
/* Release the TCP connection */
155+
flb_upstream_conn_release(u_conn);
156+
157+
return out_ret;
158+
}
159+
160+
static int compose_payload(struct flb_out_doris *ctx,
161+
const void *in_body, size_t in_size,
162+
void **out_body, size_t *out_size)
163+
{
164+
flb_sds_t encoded;
165+
166+
*out_body = NULL;
167+
*out_size = 0;
168+
169+
encoded = flb_pack_msgpack_to_json_format(in_body,
170+
in_size,
171+
FLB_PACK_JSON_FORMAT_JSON,
172+
FLB_PACK_JSON_DATE_DOUBLE,
173+
ctx->time_key);
174+
if (encoded == NULL) {
175+
flb_plg_error(ctx->ins, "failed to convert json");
176+
return FLB_ERROR;
177+
}
178+
*out_body = (void*)encoded;
179+
*out_size = flb_sds_len(encoded);
180+
181+
flb_plg_info(ctx->ins, "%s", (char*) *out_body);
182+
183+
return FLB_OK;
184+
}
185+
186+
static void cb_doris_flush(struct flb_event_chunk *event_chunk,
187+
struct flb_output_flush *out_flush,
188+
struct flb_input_instance *i_ins,
189+
void *out_context,
190+
struct flb_config *config)
191+
{
192+
int ret = FLB_ERROR;
193+
struct flb_out_doris *ctx = out_context;
194+
void *out_body;
195+
size_t out_size;
196+
(void) i_ins;
197+
198+
ret = compose_payload(ctx, event_chunk->data, event_chunk->size,
199+
&out_body, &out_size);
200+
201+
if (ret != FLB_OK) {
202+
FLB_OUTPUT_RETURN(ret);
203+
}
204+
205+
ret = http_put(ctx, out_body, out_size,
206+
event_chunk->tag, flb_sds_len(event_chunk->tag));
207+
flb_sds_destroy(out_body);
208+
209+
FLB_OUTPUT_RETURN(ret);
210+
}
211+
212+
static int cb_doris_exit(void *data, struct flb_config *config)
213+
{
214+
struct flb_out_doris *ctx = data;
215+
216+
flb_doris_conf_destroy(ctx);
217+
return 0;
218+
}
219+
220+
/* Configuration properties map */
221+
static struct flb_config_map config_map[] = {
222+
// host
223+
// port
224+
// user
225+
{
226+
FLB_CONFIG_MAP_STR, "user", NULL,
227+
0, FLB_TRUE, offsetof(struct flb_out_doris, user),
228+
"Set HTTP auth user"
229+
},
230+
// password
231+
{
232+
FLB_CONFIG_MAP_STR, "password", "",
233+
0, FLB_TRUE, offsetof(struct flb_out_doris, password),
234+
"Set HTTP auth password"
235+
},
236+
// database
237+
{
238+
FLB_CONFIG_MAP_STR, "database", NULL,
239+
0, FLB_TRUE, offsetof(struct flb_out_doris, database),
240+
"Set database"
241+
},
242+
// table
243+
{
244+
FLB_CONFIG_MAP_STR, "table", NULL,
245+
0, FLB_TRUE, offsetof(struct flb_out_doris, table),
246+
"Set table"
247+
},
248+
// time_key
249+
{
250+
FLB_CONFIG_MAP_STR, "time_key", "date",
251+
0, FLB_TRUE, offsetof(struct flb_out_doris, time_key),
252+
"Specify the name of the date field in output"
253+
},
254+
// columns
255+
{
256+
FLB_CONFIG_MAP_STR, "columns", "date,log",
257+
0, FLB_TRUE, offsetof(struct flb_out_doris, columns),
258+
"Set columns"
259+
},
260+
// timeout
261+
{
262+
FLB_CONFIG_MAP_INT, "timeout_second", "60",
263+
0, FLB_TRUE, offsetof(struct flb_out_doris, timeout_second),
264+
"Set timeout in second"
265+
},
266+
267+
/* EOF */
268+
{0}
269+
};
270+
271+
static int cb_doris_format_test(struct flb_config *config,
272+
struct flb_input_instance *ins,
273+
void *plugin_context,
274+
void *flush_ctx,
275+
int event_type,
276+
const char *tag, int tag_len,
277+
const void *data, size_t bytes,
278+
void **out_data, size_t *out_size)
279+
{
280+
struct flb_out_doris *ctx = plugin_context;
281+
int ret;
282+
283+
ret = compose_payload(ctx, data, bytes, out_data, out_size);
284+
if (ret != FLB_OK) {
285+
flb_error("ret=%d", ret);
286+
return -1;
287+
}
288+
return 0;
289+
}
290+
291+
/* Plugin reference */
292+
struct flb_output_plugin out_doris_plugin = {
293+
.name = "doris",
294+
.description = "Doris Output",
295+
.cb_init = cb_doris_init,
296+
.cb_pre_run = NULL,
297+
.cb_flush = cb_doris_flush,
298+
.cb_exit = cb_doris_exit,
299+
.config_map = config_map,
300+
301+
/* for testing */
302+
.test_formatter.callback = cb_doris_format_test,
303+
304+
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
305+
.workers = 2
306+
};

plugins/out_doris/doris.h

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2+
3+
/* Fluent Bit
4+
* ==========
5+
* Copyright (C) 2015-2024 The Fluent Bit Authors
6+
*
7+
* Licensed under the Apache License, Version 2.0 (the "License");
8+
* you may not use this file except in compliance with the License.
9+
* You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
#ifndef FLB_OUT_DORIS_H
21+
#define FLB_OUT_DORIS_H
22+
23+
struct flb_out_doris {
24+
char *host;
25+
int port;
26+
char uri[256];
27+
28+
char *user;
29+
char *password;
30+
31+
flb_sds_t database;
32+
flb_sds_t table;
33+
34+
flb_sds_t time_key;
35+
flb_sds_t columns;
36+
37+
int timeout_second;
38+
39+
/* Upstream connection to the backend server */
40+
struct flb_upstream *u;
41+
42+
/* Plugin instance */
43+
struct flb_output_instance *ins;
44+
};
45+
46+
#endif

0 commit comments

Comments
 (0)