Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
196 changes: 103 additions & 93 deletions plugins/in_opentelemetry/opentelemetry_logs.c
Original file line number Diff line number Diff line change
Expand Up @@ -310,13 +310,8 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
struct flb_mp_map_header mh_tmp;
struct flb_time tm;

/* record buffer and packer */
msgpack_sbuffer mp_sbuf;
msgpack_packer mp_pck;

/* metadata buffer and packer */
msgpack_sbuffer mp_sbuf_meta;
msgpack_packer mp_pck_meta;
msgpack_packer *mp_pck;
msgpack_packer *mp_pck_meta;

/* OTel proto suff */
Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest *input_logs;
Expand All @@ -329,12 +324,8 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
Opentelemetry__Proto__Logs__V1__LogRecord **log_records;
Opentelemetry__Proto__Resource__V1__Resource *resource;

/* initialize msgpack buffers */
msgpack_sbuffer_init(&mp_sbuf);
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);

msgpack_sbuffer_init(&mp_sbuf_meta);
msgpack_packer_init(&mp_pck_meta, &mp_sbuf_meta, msgpack_sbuffer_write);
mp_pck = &encoder->body.packer;
mp_pck_meta = &encoder->metadata.packer;

/* unpack logs from protobuf payload */
input_logs = opentelemetry__proto__collector__logs__v1__export_logs_service_request__unpack(NULL, in_size, in_buf);
Expand Down Expand Up @@ -385,130 +376,133 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
FLB_LOG_EVENT_INT64_VALUE(scope_log_index));


flb_mp_map_header_init(&mh, &mp_pck);
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to reset log event body: %s",
flb_log_event_encoder_get_error_description(ret));
goto binary_payload_to_msgpack_end;
}

flb_mp_map_header_init(&mh, mp_pck);

/* Resource */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&mp_pck, 8);
msgpack_pack_str_body(&mp_pck, "resource", 8);
msgpack_pack_str(mp_pck, 8);
msgpack_pack_str_body(mp_pck, "resource", 8);

flb_mp_map_header_init(&mh_tmp, &mp_pck);
flb_mp_map_header_init(&mh_tmp, mp_pck);
if (resource) {
/* look for OTel resource attributes */
if (resource->n_attributes > 0 && resource->attributes) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 10);
msgpack_pack_str_body(&mp_pck, "attributes", 10);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "attributes", 10);

ret = otel_pack_kvarray(&mp_pck,
ret = otel_pack_kvarray(mp_pck,
resource->attributes,
resource->n_attributes);
if (ret != 0) {
return ret;
goto binary_payload_to_msgpack_end;
}
}

if (resource->dropped_attributes_count > 0) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 24);
msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24);
msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count);
msgpack_pack_str(mp_pck, 24);
msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
msgpack_pack_uint64(mp_pck, resource->dropped_attributes_count);
}

if (resource_log->schema_url) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 10);
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "schema_url", 10);

len = strlen(resource_log->schema_url);
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len);
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, resource_log->schema_url, len);
}
}
flb_mp_map_header_end(&mh_tmp);

/* scope */
flb_mp_map_header_append(&mh);
msgpack_pack_str(&mp_pck, 5);
msgpack_pack_str_body(&mp_pck, "scope", 5);
msgpack_pack_str(mp_pck, 5);
msgpack_pack_str_body(mp_pck, "scope", 5);

/* Scope */
scope = scope_log->scope;

if (scope && (scope->name || scope->version || scope->n_attributes > 0)) {
flb_mp_map_header_init(&mh_tmp, &mp_pck);
flb_mp_map_header_init(&mh_tmp, mp_pck);

if (scope_log->schema_url && strlen(scope_log->schema_url) > 0) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 10);
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "schema_url", 10);

len = strlen(scope_log->schema_url);
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, scope_log->schema_url, len);
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, scope_log->schema_url, len);
}

if (scope->name && strlen(scope->name) > 0) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 4);
msgpack_pack_str_body(&mp_pck, "name", 4);
msgpack_pack_str(mp_pck, 4);
msgpack_pack_str_body(mp_pck, "name", 4);

len = strlen(scope->name);
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, scope->name, len);
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, scope->name, len);
}
if (scope->version && strlen(scope->version) > 0) {
flb_mp_map_header_append(&mh_tmp);

msgpack_pack_str(&mp_pck, 7);
msgpack_pack_str_body(&mp_pck, "version", 7);
msgpack_pack_str(mp_pck, 7);
msgpack_pack_str_body(mp_pck, "version", 7);

len = strlen(scope->version);
msgpack_pack_str(&mp_pck, len);
msgpack_pack_str_body(&mp_pck, scope->version, len);
msgpack_pack_str(mp_pck, len);
msgpack_pack_str_body(mp_pck, scope->version, len);
}

if (scope->n_attributes > 0 && scope->attributes) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 10);
msgpack_pack_str_body(&mp_pck, "attributes", 10);
ret = otel_pack_kvarray(&mp_pck,
msgpack_pack_str(mp_pck, 10);
msgpack_pack_str_body(mp_pck, "attributes", 10);
ret = otel_pack_kvarray(mp_pck,
scope->attributes,
scope->n_attributes);
if (ret != 0) {
return ret;
goto binary_payload_to_msgpack_end;
}
}

if (scope->dropped_attributes_count > 0) {
flb_mp_map_header_append(&mh_tmp);
msgpack_pack_str(&mp_pck, 24);
msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24);
msgpack_pack_uint64(&mp_pck, scope->dropped_attributes_count);
msgpack_pack_str(mp_pck, 24);
msgpack_pack_str_body(mp_pck, "dropped_attributes_count", 24);
msgpack_pack_uint64(mp_pck, scope->dropped_attributes_count);
}

flb_mp_map_header_end(&mh_tmp);
}
else {
/* set an empty scope */
msgpack_pack_map(&mp_pck, 0);
msgpack_pack_map(mp_pck, 0);
}

flb_mp_map_header_end(&mh);

ret = flb_log_event_encoder_set_body_from_raw_msgpack(
encoder,
mp_sbuf.data,
mp_sbuf.size);
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "could not set group content metadata");
flb_plg_error(ctx->ins, "could not set group content metadata: %s",
flb_log_event_encoder_get_error_description(ret));
goto binary_payload_to_msgpack_end;
}

flb_log_event_encoder_group_header_end(encoder);

msgpack_sbuffer_clear(&mp_sbuf);

for (log_record_index=0; log_record_index < scope_log->n_log_records; log_record_index++) {
ret = flb_log_event_encoder_begin_record(encoder);

Expand All @@ -528,54 +522,72 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
msgpack_sbuffer_clear(&mp_sbuf_meta);
ret = otel_pack_v1_metadata(ctx, &mp_pck_meta, log_records[log_record_index], resource, scope_log->scope);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to convert log record");
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to reset log event metadata: %s",
flb_log_event_encoder_get_error_description(ret));
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(
encoder,
mp_sbuf_meta.data,
mp_sbuf_meta.size);
ret = otel_pack_v1_metadata(ctx,
mp_pck_meta,
log_records[log_record_index],
resource,
scope_log->scope);
}

msgpack_sbuffer_clear(&mp_sbuf_meta);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to convert log record");
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->metadata);
}
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
ret = otlp_pack_any_value(
&mp_pck,
log_records[log_record_index]->body);

if (ret != 0) {
flb_plg_error(ctx->ins, "failed to convert log record body");
ret = flb_log_event_encoder_dynamic_field_reset(&encoder->body);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_plg_error(ctx->ins, "failed to reset log event body: %s",
flb_log_event_encoder_get_error_description(ret));
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else if (ctx->logs_body_key == NULL &&
log_records[log_record_index]->body != NULL &&
log_records[log_record_index]->body->value_case ==
OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
ret = otlp_pack_any_value(
mp_pck,
log_records[log_record_index]->body);
}
else {
if (ctx->logs_body_key == NULL &&
log_records[log_record_index]->body != NULL &&
log_records[log_record_index]->body->value_case ==
OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_KVLIST_VALUE) {
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
encoder,
mp_sbuf.data,
mp_sbuf.size);
logs_body_key = ctx->logs_body_key;
if (logs_body_key == NULL) {
logs_body_key = "log";
}
ret = msgpack_pack_map(mp_pck, 1);
if (ret == 0) {
ret = msgpack_pack_str(mp_pck, strlen(logs_body_key));
}
if (ret == 0) {
ret = msgpack_pack_str_body(mp_pck,
logs_body_key,
strlen(logs_body_key));
}
else {
logs_body_key = ctx->logs_body_key;
if (logs_body_key == NULL) {
logs_body_key = "log";
}
ret = flb_log_event_encoder_append_body_values(
encoder,
FLB_LOG_EVENT_CSTRING_VALUE(logs_body_key),
FLB_LOG_EVENT_MSGPACK_RAW_VALUE(mp_sbuf.data, mp_sbuf.size));
if (ret == 0) {
ret = otlp_pack_any_value(
mp_pck,
log_records[log_record_index]->body);
}
}

msgpack_sbuffer_clear(&mp_sbuf);
if (ret != 0) {
flb_plg_error(ctx->ins, "failed to convert log record body");
ret = FLB_EVENT_ENCODER_ERROR_SERIALIZATION_FAILURE;
}
else {
ret = flb_log_event_encoder_dynamic_field_flush(&encoder->body);
}
}

if (ret == FLB_EVENT_ENCODER_SUCCESS) {
Expand All @@ -593,8 +605,6 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
}

binary_payload_to_msgpack_end:
msgpack_sbuffer_destroy(&mp_sbuf);
msgpack_sbuffer_destroy(&mp_sbuf_meta);
if (input_logs) {
opentelemetry__proto__collector__logs__v1__export_logs_service_request__free_unpacked(
input_logs, NULL);
Expand Down
Loading