Skip to content

Commit a74afc9

Browse files
cosmo0920edsiper
authored andcommitted
go: proxy: plugin: Implement multi-threaded Golang input plugin mechanism
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent c6d22df commit a74afc9

File tree

11 files changed

+439
-32
lines changed

11 files changed

+439
-32
lines changed

include/fluent-bit/flb_api.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,14 @@
2525

2626
struct flb_api {
2727
const char *(*output_get_property) (const char *, struct flb_output_instance *);
28+
const char *(*input_get_property) (const char *, struct flb_input_instance *);
29+
30+
void *(*output_get_cmt_instance) (struct flb_output_instance *);
31+
void *(*input_get_cmt_instance) (struct flb_input_instance *);
32+
33+
void (*log_print) (int, const char*, int, const char*, ...);
34+
int (*input_log_check) (struct flb_input_instance *, int);
35+
int (*output_log_check) (struct flb_output_instance *, int);
2836
};
2937

3038
#ifdef FLB_CORE

include/fluent-bit/flb_input.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151

5252
/* Input plugin flag masks */
5353
#define FLB_INPUT_NET 4 /* input address may set host and port */
54+
#define FLB_INPUT_PLUGIN_CORE 0
55+
#define FLB_INPUT_PLUGIN_PROXY 1
5456
#define FLB_INPUT_CORO 128 /* plugin requires a thread on callbacks */
5557
#define FLB_INPUT_PRIVATE 256 /* plugin is not published/exposed */
5658
#define FLB_INPUT_NOTAG 512 /* plugin might don't have tags */
@@ -67,6 +69,13 @@
6769
struct flb_input_instance;
6870

6971
struct flb_input_plugin {
72+
/*
73+
* The type defines if this is a core-based plugin or it's handled by
74+
* some specific proxy.
75+
*/
76+
int type;
77+
void *proxy;
78+
7079
int flags; /* plugin flags */
7180
int event_type; /* event type to be generated: logs ?, metrics ? */
7281

@@ -522,6 +531,9 @@ int flb_input_set_property(struct flb_input_instance *ins,
522531
const char *k, const char *v);
523532
const char *flb_input_get_property(const char *key,
524533
struct flb_input_instance *ins);
534+
#ifdef FLB_HAVE_METRICS
535+
void *flb_input_get_cmt_instance(struct flb_input_instance *ins);
536+
#endif
525537

526538
int flb_input_check(struct flb_config *config);
527539
void flb_input_set_context(struct flb_input_instance *ins, void *context);
@@ -581,6 +593,7 @@ void flb_input_net_default_listener(const char *listen, int port,
581593

582594
int flb_input_event_type_is_metric(struct flb_input_instance *ins);
583595
int flb_input_event_type_is_log(struct flb_input_instance *ins);
596+
int flb_input_log_check(struct flb_input_instance *ins, int l);
584597

585598
struct mk_event_loop *flb_input_event_loop_get(struct flb_input_instance *ins);
586599
int flb_input_upstream_set(struct flb_upstream *u, struct flb_input_instance *ins);

include/fluent-bit/flb_output.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,9 @@ const char *flb_output_name(struct flb_output_instance *in);
728728
int flb_output_set_property(struct flb_output_instance *out,
729729
const char *k, const char *v);
730730
const char *flb_output_get_property(const char *key, struct flb_output_instance *ins);
731+
#ifdef FLB_HAVE_METRICS
732+
void *flb_output_get_cmt_instance(struct flb_output_instance *ins);
733+
#endif
731734
void flb_output_net_default(const char *host, int port,
732735
struct flb_output_instance *ins);
733736
const char *flb_output_name(struct flb_output_instance *ins);
@@ -737,6 +740,7 @@ void flb_output_set_context(struct flb_output_instance *ins, void *context);
737740
int flb_output_instance_destroy(struct flb_output_instance *ins);
738741
int flb_output_init_all(struct flb_config *config);
739742
int flb_output_check(struct flb_config *config);
743+
int flb_output_log_check(struct flb_output_instance *ins, int l);
740744

741745
int flb_output_upstream_set(struct flb_upstream *u, struct flb_output_instance *ins);
742746
int flb_output_upstream_ha_set(void *ha, struct flb_output_instance *ins);

include/fluent-bit/flb_plugin_proxy.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include <monkey/mk_core.h>
2424
#include <fluent-bit/flb_info.h>
2525
#include <fluent-bit/flb_output.h>
26+
#include <fluent-bit/flb_input_thread.h>
2627

2728
/* Plugin Types */
2829
#define FLB_PROXY_INPUT_PLUGIN 1
@@ -61,12 +62,18 @@ struct flb_plugin_proxy_context {
6162
struct flb_plugin_proxy *proxy;
6263
};
6364

65+
struct flb_plugin_input_proxy_context {
66+
int coll_fd;
67+
/* A proxy ptr is needed to store the proxy type/lang (OUTPUT/GOLANG) */
68+
struct flb_plugin_proxy *proxy;
69+
};
70+
6471
void *flb_plugin_proxy_symbol(struct flb_plugin_proxy *proxy,
6572
const char *symbol);
6673

67-
int flb_plugin_proxy_init(struct flb_plugin_proxy *proxy,
68-
struct flb_output_instance *o_ins,
69-
struct flb_config *config);
74+
int flb_plugin_proxy_output_init(struct flb_plugin_proxy *proxy,
75+
struct flb_output_instance *o_ins,
76+
struct flb_config *config);
7077

7178
int flb_plugin_proxy_register(struct flb_plugin_proxy *proxy,
7279
struct flb_config *config);

src/flb_api.c

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@
2020
#include <fluent-bit/flb_info.h>
2121
#include <fluent-bit/flb_api.h>
2222
#include <fluent-bit/flb_mem.h>
23+
#include <fluent-bit/flb_log.h>
2324

25+
#include <fluent-bit/flb_input.h>
2426
#include <fluent-bit/flb_output.h>
2527

2628
struct flb_api *flb_api_create()
@@ -34,6 +36,17 @@ struct flb_api *flb_api_create()
3436
}
3537

3638
api->output_get_property = flb_output_get_property;
39+
api->input_get_property = flb_input_get_property;
40+
41+
#ifdef FLB_HAVE_METRICS
42+
api->output_get_cmt_instance = flb_output_get_cmt_instance;
43+
api->input_get_cmt_instance = flb_input_get_cmt_instance;
44+
#endif
45+
46+
api->log_print = flb_log_print;
47+
api->input_log_check = flb_input_log_check;
48+
api->output_log_check = flb_output_log_check;
49+
3750
return api;
3851
}
3952

src/flb_input.c

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include <fluent-bit/flb_input_thread.h>
3131
#include <fluent-bit/flb_error.h>
3232
#include <fluent-bit/flb_utils.h>
33+
#include <fluent-bit/flb_plugin_proxy.h>
3334
#include <fluent-bit/flb_engine.h>
3435
#include <fluent-bit/flb_metrics.h>
3536
#include <fluent-bit/flb_storage.h>
@@ -135,6 +136,18 @@ int flb_input_event_type_is_log(struct flb_input_instance *ins)
135136
return FLB_FALSE;
136137
}
137138

139+
/* Check input plugin's log level.
140+
* Not for core plugins but for Golang plugins.
141+
* Golang plugins do not have thread-local flb_worker_ctx information. */
142+
int flb_input_log_check(struct flb_input_instance *ins, int l)
143+
{
144+
if (ins->log_level < l) {
145+
return FLB_FALSE;
146+
}
147+
148+
return FLB_TRUE;
149+
}
150+
138151
/* Create an input plugin instance */
139152
struct flb_input_instance *flb_input_new(struct flb_config *config,
140153
const char *input, void *data,
@@ -211,6 +224,24 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
211224
return NULL;
212225
}
213226

227+
if (plugin->type == FLB_INPUT_PLUGIN_CORE) {
228+
instance->context = NULL;
229+
}
230+
else {
231+
struct flb_plugin_proxy_context *ctx;
232+
233+
ctx = flb_calloc(1, sizeof(struct flb_plugin_proxy_context));
234+
if (!ctx) {
235+
flb_errno();
236+
flb_free(instance);
237+
return NULL;
238+
}
239+
240+
ctx->proxy = plugin->proxy;
241+
242+
instance->context = ctx;
243+
}
244+
214245
/* initialize remaining vars */
215246
instance->alias = NULL;
216247
instance->id = id;
@@ -219,7 +250,6 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
219250
instance->tag = NULL;
220251
instance->tag_len = 0;
221252
instance->routable = FLB_TRUE;
222-
instance->context = NULL;
223253
instance->data = data;
224254
instance->storage = NULL;
225255
instance->storage_type = -1;
@@ -513,6 +543,13 @@ const char *flb_input_get_property(const char *key,
513543
return flb_config_prop_get(key, &ins->properties);
514544
}
515545

546+
#ifdef FLB_HAVE_METRICS
547+
void *flb_input_get_cmt_instance(struct flb_input_instance *ins)
548+
{
549+
return (void *)ins->cmt;
550+
}
551+
#endif
552+
516553
/* Return an instance name or alias */
517554
const char *flb_input_name(struct flb_input_instance *ins)
518555
{
@@ -884,6 +921,7 @@ void flb_input_instance_exit(struct flb_input_instance *ins,
884921

885922
p = ins->p;
886923
if (p->cb_exit && ins->context) {
924+
/* Multi-threaded input plugins use the same function signature for exit callbacks. */
887925
p->cb_exit(ins->context, config);
888926
}
889927
}

src/flb_input_thread.c

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,6 +775,10 @@ int flb_input_thread_destroy(struct flb_input_thread *it, struct flb_input_insta
775775
{
776776
int ret;
777777
flb_input_thread_exit(it, ins);
778+
/* On Darwin, we must call pthread_cancel here to ensure worker
779+
* thread termination. Otherwise, worker thread termination will
780+
* be blocked. */
781+
pthread_cancel(it->thread);
778782
ret = pthread_join(it->thread, NULL);
779783
mpack_writer_destroy(&it->writer);
780784
pthread_mutex_destroy(&it->mutex);

src/flb_output.c

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,6 +807,13 @@ const char *flb_output_get_property(const char *key, struct flb_output_instance
807807
return flb_config_prop_get(key, &ins->properties);
808808
}
809809

810+
#ifdef FLB_HAVE_METRICS
811+
void *flb_output_get_cmt_instance(struct flb_output_instance *ins)
812+
{
813+
return (void *)ins->cmt;
814+
}
815+
#endif
816+
810817
/* Trigger the output plugins setup callbacks to prepare them. */
811818
int flb_output_init_all(struct flb_config *config)
812819
{
@@ -948,7 +955,7 @@ int flb_output_init_all(struct flb_config *config)
948955
#ifdef FLB_HAVE_PROXY_GO
949956
/* Proxy plugins have their own initialization */
950957
if (p->type == FLB_OUTPUT_PLUGIN_PROXY) {
951-
ret = flb_plugin_proxy_init(p->proxy, ins, config);
958+
ret = flb_plugin_proxy_output_init(p->proxy, ins, config);
952959
if (ret == -1) {
953960
flb_output_instance_destroy(ins);
954961
return -1;
@@ -1098,6 +1105,18 @@ int flb_output_check(struct flb_config *config)
10981105
return 0;
10991106
}
11001107

1108+
/* Check output plugin's log level.
1109+
* Not for core plugins but for Golang plugins.
1110+
* Golang plugins do not have thread-local flb_worker_ctx information. */
1111+
int flb_output_log_check(struct flb_output_instance *ins, int l)
1112+
{
1113+
if (ins->log_level < l) {
1114+
return FLB_FALSE;
1115+
}
1116+
1117+
return FLB_TRUE;
1118+
}
1119+
11011120
/*
11021121
* Output plugins might have enabled certain features that have not been passed
11031122
* directly to the upstream context. In order to avoid let plugins validate specific

0 commit comments

Comments
 (0)