Skip to content

Commit d0c7f0e

Browse files
Leonardo Alminanaedsiper
authored andcommitted
out_opentelemetry: profiles support added
Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 0200a2f commit d0c7f0e

File tree

3 files changed

+122
-2
lines changed

3 files changed

+122
-2
lines changed

plugins/out_opentelemetry/opentelemetry.c

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@
3737
#include <ctraces/ctraces.h>
3838
#include <ctraces/ctr_decode_msgpack.h>
3939

40+
#include <cprofiles/cprofiles.h>
41+
#include <cprofiles/cprof_decode_msgpack.h>
42+
#include <cprofiles/cprof_encode_opentelemetry.h>
43+
4044
extern cfl_sds_t cmt_encode_opentelemetry_create(struct cmt *cmt);
4145
extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text);
4246

@@ -643,6 +647,88 @@ static int process_traces(struct flb_event_chunk *event_chunk,
643647
return result;
644648
}
645649

650+
static int process_profiles(struct flb_event_chunk *event_chunk,
651+
struct flb_output_flush *out_flush,
652+
struct flb_input_instance *ins, void *out_context,
653+
struct flb_config *config)
654+
{
655+
int ret;
656+
int result;
657+
cfl_sds_t encoded_chunk;
658+
flb_sds_t buf = NULL;
659+
size_t off = 0;
660+
struct cprof *profiles_context;
661+
struct opentelemetry_context *ctx = out_context;
662+
663+
/* Initialize vars */
664+
ctx = out_context;
665+
result = FLB_OK;
666+
667+
buf = flb_sds_create_size(event_chunk->size);
668+
if (!buf) {
669+
flb_plg_error(ctx->ins, "could not allocate outgoing buffer");
670+
return FLB_RETRY;
671+
}
672+
673+
flb_plg_debug(ctx->ins, "cprofiles msgpack size: %lu",
674+
event_chunk->size);
675+
676+
while (cprof_decode_msgpack_create(&profiles_context,
677+
(unsigned char *) event_chunk->data,
678+
event_chunk->size, &off) == 0) {
679+
/* Create a OpenTelemetry payload */
680+
ret = cprof_encode_opentelemetry_create(&encoded_chunk, profiles_context);
681+
if (ret != CPROF_ENCODE_OPENTELEMETRY_SUCCESS) {
682+
flb_plg_error(ctx->ins,
683+
"Error encoding context as opentelemetry");
684+
result = FLB_ERROR;
685+
cprof_decode_msgpack_destroy(profiles_context);
686+
goto exit;
687+
}
688+
689+
/* concat buffer */
690+
ret = flb_sds_cat_safe(&buf, encoded_chunk, flb_sds_len(encoded_chunk));
691+
if (ret != 0) {
692+
flb_plg_error(ctx->ins, "Error appending encoded profiles to buffer");
693+
result = FLB_ERROR;
694+
cprof_encode_opentelemetry_destroy(encoded_chunk);
695+
cprof_decode_msgpack_destroy(profiles_context);
696+
goto exit;
697+
}
698+
699+
/* release */
700+
cprof_encode_opentelemetry_destroy(encoded_chunk);
701+
cprof_decode_msgpack_destroy(profiles_context);
702+
}
703+
704+
flb_plg_debug(ctx->ins, "final payload size: %lu", flb_sds_len(buf));
705+
if (buf && flb_sds_len(buf) > 0) {
706+
/* Send HTTP request */
707+
result = opentelemetry_post(ctx, buf, flb_sds_len(buf),
708+
event_chunk->tag,
709+
flb_sds_len(event_chunk->tag),
710+
ctx->profiles_uri_sanitized,
711+
ctx->grpc_profiles_uri);
712+
713+
/* Debug http_post() result statuses */
714+
if (result == FLB_OK) {
715+
flb_plg_debug(ctx->ins, "http_post result FLB_OK");
716+
}
717+
else if (result == FLB_ERROR) {
718+
flb_plg_debug(ctx->ins, "http_post result FLB_ERROR");
719+
}
720+
else if (result == FLB_RETRY) {
721+
flb_plg_debug(ctx->ins, "http_post result FLB_RETRY");
722+
}
723+
}
724+
725+
exit:
726+
if (buf) {
727+
flb_sds_destroy(buf);
728+
}
729+
return result;
730+
}
731+
646732
static int cb_opentelemetry_exit(void *data, struct flb_config *config)
647733
{
648734
struct opentelemetry_context *ctx;
@@ -690,6 +776,9 @@ static void cb_opentelemetry_flush(struct flb_event_chunk *event_chunk,
690776
else if (event_chunk->type == FLB_INPUT_TRACES){
691777
result = process_traces(event_chunk, out_flush, ins, out_context, config);
692778
}
779+
else if (event_chunk->type == FLB_INPUT_PROFILES){
780+
result = process_profiles(event_chunk, out_flush, ins, out_context, config);
781+
}
693782

694783
FLB_OUTPUT_RETURN(result);
695784
}
@@ -788,11 +877,24 @@ static struct flb_config_map config_map[] = {
788877
"Specify an optional HTTP URI for the target OTel endpoint."
789878
},
790879
{
791-
FLB_CONFIG_MAP_STR, "grpc_traces_uri", "/opentelemetry.proto.collector.trace.v1.TraceService/Export",
880+
FLB_CONFIG_MAP_STR, "grpc_traces_uri",
881+
"/opentelemetry.proto.collector.trace.v1.TraceService/Export",
792882
0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_traces_uri),
793883
"Specify an optional gRPC URI for the target OTel endpoint."
794884
},
795885

886+
{
887+
FLB_CONFIG_MAP_STR, "profiles_uri", "/v1development/profiles",
888+
0, FLB_TRUE, offsetof(struct opentelemetry_context, profiles_uri),
889+
"Specify an optional HTTP URI for the profiles OTel endpoint."
890+
},
891+
{
892+
FLB_CONFIG_MAP_STR, "grpc_profiles_uri",
893+
"/opentelemetry.proto.collector.profiles.v1experimental.ProfilesService/Export",
894+
0, FLB_TRUE, offsetof(struct opentelemetry_context, grpc_profiles_uri),
895+
"Specify an optional gRPC URI for the profiles OTel endpoint."
896+
},
897+
796898
{
797899
FLB_CONFIG_MAP_BOOL, "log_response_payload", "true",
798900
0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload),
@@ -886,7 +988,7 @@ struct flb_output_plugin out_opentelemetry_plugin = {
886988
.cb_flush = cb_opentelemetry_flush,
887989
.cb_exit = cb_opentelemetry_exit,
888990
.config_map = config_map,
889-
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES,
991+
.event_type = FLB_OUTPUT_LOGS | FLB_OUTPUT_METRICS | FLB_OUTPUT_TRACES | FLB_OUTPUT_PROFILES,
890992
.flags = FLB_OUTPUT_NET | FLB_IO_OPT_TLS,
891993

892994
.test_formatter.callback = opentelemetry_format_test,

plugins/out_opentelemetry/opentelemetry.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,14 @@ struct opentelemetry_context {
5858
int proxy_port;
5959

6060
/* HTTP URI */
61+
char *profiles_uri_sanitized;
6162
char *traces_uri_sanitized;
6263
char *metrics_uri_sanitized;
6364
char *logs_uri_sanitized;
6465
char *traces_uri;
6566
char *grpc_traces_uri;
67+
char *profiles_uri;
68+
char *grpc_profiles_uri;
6669
char *metrics_uri;
6770
char *grpc_metrics_uri;
6871
char *logs_uri;

plugins/out_opentelemetry/opentelemetry_conf.c

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -332,6 +332,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
332332
ctx->logs_uri_sanitized = sanitize_uri(ctx->logs_uri);
333333
ctx->traces_uri_sanitized = sanitize_uri(ctx->traces_uri);
334334
ctx->metrics_uri_sanitized = sanitize_uri(ctx->metrics_uri);
335+
ctx->profiles_uri_sanitized = sanitize_uri(ctx->profiles_uri);
335336

336337
if (ctx->logs_uri_sanitized == NULL) {
337338
flb_plg_trace(ctx->ins,
@@ -363,6 +364,16 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output
363364
return NULL;
364365
}
365366

367+
if (ctx->profiles_uri_sanitized == NULL) {
368+
flb_plg_trace(ctx->ins,
369+
"Could not allocate memory for sanitized "
370+
"profiles endpoint uri");
371+
372+
flb_opentelemetry_context_destroy(ctx);
373+
374+
return NULL;
375+
}
376+
366377
/* list of 'logs_body_key' */
367378
ret = log_body_key_list_create(ctx);
368379
if (ret != 0) {
@@ -604,6 +615,10 @@ void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx)
604615
flb_free(ctx->metrics_uri_sanitized);
605616
}
606617

618+
if (ctx->profiles_uri_sanitized != NULL && ctx->profiles_uri_sanitized != ctx->profiles_uri) {
619+
flb_free(ctx->profiles_uri_sanitized);
620+
}
621+
607622
/* release log_body_key_list */
608623
log_body_key_list_destroy(ctx);
609624

0 commit comments

Comments
 (0)