Skip to content

Commit ebd7e5a

Browse files
Aditya Prajapatimgeriesa
authored andcommitted
out_opentelemetry: export traces
Signed-off-by: Aditya Prajapati <[email protected]> Signed-off-by: Manal Geries <[email protected]>
1 parent 7fd8a07 commit ebd7e5a

File tree

2 files changed

+95
-3
lines changed

2 files changed

+95
-3
lines changed

plugins/out_opentelemetry/opentelemetry.c

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
#include <cmetrics/cmetrics.h>
3131
#include <cmetrics/cmt_encode_opentelemetry.h>
3232

33+
#include <ctraces/ctraces.h>
34+
#include <ctraces/ctr_decode_msgpack.h>
35+
3336
extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
3437
extern void cmt_encode_opentelemetry_destroy(struct cmt *cmt);
3538

@@ -428,6 +431,86 @@ static int process_metrics(struct flb_event_chunk *event_chunk,
428431
return result;
429432
}
430433

434+
static int process_traces(struct flb_event_chunk *event_chunk,
435+
struct flb_output_flush *out_flush,
436+
struct flb_input_instance *ins, void *out_context,
437+
struct flb_config *config)
438+
{
439+
int ok;
440+
int ret;
441+
int result;
442+
cfl_sds_t encoded_chunk;
443+
flb_sds_t buf = NULL;
444+
size_t off = 0;
445+
struct ctrace *ctr;
446+
struct opentelemetry_context *ctx = out_context;
447+
448+
/* Initialize vars */
449+
ctx = out_context;
450+
ok = 0;
451+
result = FLB_OK;
452+
453+
buf = flb_sds_create_size(event_chunk->size);
454+
if (!buf) {
455+
flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
456+
return FLB_RETRY;
457+
}
458+
459+
flb_plg_debug(ctx->ins, "ctraces msgpack size: %lu",
460+
event_chunk->size);
461+
462+
ret = ctr_decode_msgpack_create(&ctr,
463+
(char *) event_chunk->data,
464+
event_chunk->size, &off);
465+
if (ret != ok) {
466+
flb_plg_error(ctx->ins, "Error decoding msgpack encoded context");
467+
}
468+
469+
/* Create a OpenTelemetry payload */
470+
encoded_chunk = ctr_encode_opentelemetry_create(ctr);
471+
if (encoded_chunk == NULL) {
472+
flb_plg_error(ctx->ins,
473+
"Error encoding context as opentelemetry");
474+
result = FLB_ERROR;
475+
goto exit;
476+
}
477+
478+
/* concat buffer */
479+
flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));
480+
481+
/* release */
482+
ctr_encode_opentelemetry_destroy(encoded_chunk);
483+
ctr_destroy(ctr);
484+
485+
flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
486+
if (buf && flb_sds_len(buf) > 0) {
487+
/* Send HTTP request */
488+
result = http_post(ctx, buf, flb_sds_len(buf),
489+
event_chunk->tag,
490+
flb_sds_len(event_chunk->tag),
491+
ctx->traces_uri);
492+
493+
/* Debug http_post() result statuses */
494+
if (result == FLB_OK) {
495+
flb_plg_debug(ctx->ins, "http_post result FLB_OK");
496+
}
497+
else if (result == FLB_ERROR) {
498+
flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
499+
}
500+
else if (result == FLB_RETRY) {
501+
flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
502+
}
503+
}
504+
flb_sds_destroy(buf);
505+
buf = NULL;
506+
507+
exit:
508+
if (buf) {
509+
flb_sds_destroy(buf);
510+
}
511+
return result;
512+
}
513+
431514
static int cb_opentelemetry_exit(void *data, struct flb_config *config)
432515
{
433516
struct opentelemetry_context *ctx;
@@ -462,12 +545,15 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
462545
{
463546
int result = FLB_RETRY;
464547

465-
if (ins->event_type == FLB_OUTPUT_METRICS || ins->event_type == FLB_INPUT_METRICS){
548+
if (ins->event_type == FLB_INPUT_METRICS){
466549
result = process_metrics(event_chunk, out_flush, ins, out_context, config);
467550
}
468-
else if (ins->event_type == FLB_INPUT_LOGS || ins->event_type == FLB_OUTPUT_LOGS){
551+
else if (ins->event_type == FLB_INPUT_LOGS){
469552
result = process_logs(event_chunk, out_flush, ins, out_context, config);
470553
}
554+
else if (ins->event_type == FLB_INPUT_TRACES){
555+
result = process_traces(event_chunk, out_flush, ins, out_context, config);
556+
}
471557
FLB_OUTPUT_RETURN(result);
472558
}
473559

@@ -510,6 +596,11 @@ static struct flb_config_map config_map[] = {
510596
0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri),
511597
"Specify an optional HTTP URI for the target OTel endpoint."
512598
},
599+
{
600+
FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces",
601+
0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri),
602+
"Specify an optional HTTP URI for the target OTel endpoint."
603+
},
513604
{
514605
FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
515606
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
@@ -527,6 +618,6 @@ struct flb_output_plugin out_opentelemetry_plugin = {
527618
.cb_flush = cb_opentelemetry_flush,
528619
.cb_exit = cb_opentelemetry_exit,
529620
.config_map = config_map,
530-
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS,
621+
.event_type = FLB_OUTPUT_LOGS,
531622
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
532623
};

plugins/out_opentelemetry/opentelemetry.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ struct opentelemetry_context {
3838
int proxy_port;
3939

4040
/* HTTP URI */
41+
char *traces_uri;
4142
char *metrics_uri;
4243
char *logs_uri;
4344
char *host;

0 commit comments

Comments
 (0)