Skip to content

Commit 4877f77

Browse files
committed
lib: new apis to support creating processor unit for input and output
This patch introduces new APIs to create a single processor unit for input and output instances: 1. flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, …) Create a single input processor unit for a specific event type and unit name 2. flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type, const char *processor_unit_name, int ffd, …) Create a single output processor unit for a specific event type and unit name Signed-off-by: Vanessa Zhang <Vanessa.Zhang@progress.com>
1 parent 3f4a024 commit 4877f77

File tree

2 files changed

+122
-0
lines changed

2 files changed

+122
-0
lines changed

include/fluent-bit/flb_lib.h

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,14 @@ FLB_EXPORT void flb_init_env();
5252
FLB_EXPORT flb_ctx_t *flb_create();
5353
FLB_EXPORT void flb_destroy(flb_ctx_t *ctx);
5454
FLB_EXPORT int flb_input(flb_ctx_t *ctx, const char *input, void *data);
55+
FLB_EXPORT int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type,
56+
const char *processor_unit_name, int ffd,
57+
...);
5558
FLB_EXPORT int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
5659
FLB_EXPORT int flb_output(flb_ctx_t *ctx, const char *output, struct flb_lib_out_cb *cb);
60+
FLB_EXPORT int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type,
61+
const char *processor_unit_name, int ffd,
62+
...);
5763
FLB_EXPORT int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc);
5864
FLB_EXPORT int flb_filter(flb_ctx_t *ctx, const char *filter, void *data);
5965
FLB_EXPORT int flb_input_set(flb_ctx_t *ctx, int ffd, ...);

src/flb_lib.c

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
#include <fluent-bit/flb_upstream.h>
3535
#include <fluent-bit/flb_downstream.h>
3636
#include <fluent-bit/tls/flb_tls.h>
37+
#include <fluent-bit/flb_processor.h>
3738

3839
#include <signal.h>
3940
#include <stdarg.h>
@@ -329,6 +330,72 @@ int flb_input_set(flb_ctx_t *ctx, int ffd, ...)
329330
return 0;
330331
}
331332

333+
static int flb_processor_event_type_get(const char *event_type)
334+
{
335+
if (strcasecmp(event_type, "logs") == 0) {
336+
return FLB_PROCESSOR_LOGS;
337+
}
338+
else if (strcasecmp(event_type, "metrics") == 0) {
339+
return FLB_PROCESSOR_METRICS;
340+
}
341+
else if (strcasecmp(event_type, "traces") == 0) {
342+
return FLB_PROCESSOR_TRACES;
343+
}
344+
else if (strcasecmp(event_type, "profiles") == 0) {
345+
return FLB_PROCESSOR_PROFILES;
346+
}
347+
else {
348+
return -1;
349+
}
350+
}
351+
352+
/* Create a single processor unit for the input processor */
353+
int flb_input_processor_unit(flb_ctx_t *ctx, const char *event_type,
354+
const char *processor_unit_name, int ffd, ...)
355+
{
356+
int ret;
357+
int type;
358+
struct flb_input_instance *i_ins;
359+
struct flb_processor *proc;
360+
struct flb_processor_unit *pu;
361+
char *key;
362+
char *value;
363+
va_list va;
364+
365+
i_ins = in_instance_get(ctx, ffd);
366+
if (!i_ins) {
367+
return -1;
368+
}
369+
proc = i_ins->processor;
370+
if (!proc) {
371+
return -1;
372+
}
373+
type = flb_processor_event_type_get(event_type);
374+
if (type == -1) {
375+
return -1;
376+
}
377+
pu = flb_processor_unit_create(proc, type, processor_unit_name);
378+
va_start(va, ffd);
379+
while ((key = va_arg(va, char *))) {
380+
value = va_arg(va, char *);
381+
if (!value) {
382+
/* Wrong parameter */
383+
va_end(va);
384+
return -1;
385+
}
386+
struct cfl_variant cfl_value = {
387+
.type = CFL_VARIANT_STRING,
388+
.data.as_string = value,
389+
};
390+
ret = flb_processor_unit_set_property(pu, key, &cfl_value);
391+
if (ret != 0) {
392+
va_end(va);
393+
return -1;
394+
}
395+
}
396+
return 0;
397+
}
398+
332399
int flb_input_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
333400
{
334401
struct flb_input_instance *i_ins;
@@ -545,6 +612,55 @@ int flb_output_set(flb_ctx_t *ctx, int ffd, ...)
545612
return 0;
546613
}
547614

615+
/* Create a single processor unit for the input processor */
616+
int flb_output_processor_unit(flb_ctx_t *ctx, const char *event_type,
617+
const char *processor_unit_name, int ffd, ...)
618+
{
619+
int ret;
620+
int type;
621+
struct flb_output_instance *o_ins;
622+
struct flb_processor *proc;
623+
struct flb_processor_unit *pu;
624+
char *key;
625+
char *value;
626+
va_list va;
627+
628+
o_ins = out_instance_get(ctx, ffd);
629+
if (!o_ins) {
630+
return -1;
631+
}
632+
proc = o_ins->processor;
633+
if (!proc) {
634+
return -1;
635+
}
636+
type = flb_processor_event_type_get(event_type);
637+
if (type == -1) {
638+
return -1;
639+
}
640+
pu = flb_processor_unit_create(proc, type, processor_unit_name);
641+
va_start(va, ffd);
642+
while ((key = va_arg(va, char *))) {
643+
value = va_arg(va, char *);
644+
if (!value) {
645+
/* Wrong parameter */
646+
va_end(va);
647+
return -1;
648+
}
649+
650+
struct cfl_variant cfl_value = {
651+
.type = CFL_VARIANT_STRING,
652+
.data.as_string = value,
653+
};
654+
655+
ret = flb_processor_unit_set_property(pu, key, &cfl_value);
656+
if (ret != 0) {
657+
va_end(va);
658+
return -1;
659+
}
660+
}
661+
return 0;
662+
}
663+
548664
int flb_output_set_processor(flb_ctx_t *ctx, int ffd, struct flb_processor *proc)
549665
{
550666
struct flb_output_instance *o_ins;

0 commit comments

Comments
 (0)