Skip to content

Commit 4dc5677

Browse files
committed
in_opentelemetry: fix packing of resource and scope schemaUrl
schemaURL is now properly set in the group body, for resource and scope, this is an example of the packed content inside the group body: (pretty printed in JSON for readability) { "resource": { "attributes": { "service.name": "my.service" }, "schema_url": "https://example.com/schema2" }, "scope": { "schema_url": "https://example.com/schema1", "name": "my.library", "version": "1.0.0", "attributes": { "my.scope.attribute": "some scope attribute" } } } Signed-off-by: Eduardo Silva <[email protected]>
1 parent cc8bccc commit 4dc5677

File tree

1 file changed

+77
-34
lines changed

1 file changed

+77
-34
lines changed

plugins/in_opentelemetry/opentelemetry_logs.c

Lines changed: 77 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -682,12 +682,14 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct
682682
int ret;
683683
int result;
684684
size_t index;
685-
msgpack_object obj;
685+
msgpack_object *obj;
686686
msgpack_object_map *resource = NULL;
687687
msgpack_object *resource_attr = NULL;
688688
msgpack_object_map *resource_logs_entry = NULL;
689+
msgpack_object *resource_schema_url = NULL;
689690
msgpack_object *scope = NULL;
690691
msgpack_object_array *scope_logs;
692+
msgpack_object *scope_schema_url = NULL;
691693

692694
if (resource_logs_object->type != MSGPACK_OBJECT_MAP) {
693695
flb_plg_error(ctx->ins, "unexpected resourceLogs entry type");
@@ -697,22 +699,37 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct
697699
/* get 'resource' and resource['attributes'] */
698700
result = find_map_entry_by_key(&resource_logs_object->via.map, "resource", 0, FLB_TRUE);
699701
if (result >= 0) {
700-
obj = resource_logs_object->via.map.ptr[result].val;
701-
if (obj.type == MSGPACK_OBJECT_MAP) {
702-
resource = &obj.via.map;
702+
obj = &resource_logs_object->via.map.ptr[result].val;
703+
if (obj->type == MSGPACK_OBJECT_MAP) {
704+
resource = &obj->via.map;
705+
706+
/* attributes */
703707
result = find_map_entry_by_key(resource, "attributes", 0, FLB_TRUE);
704708
if (result >= 0) {
705-
obj = resource->ptr[result].val;
706-
if (obj.type == MSGPACK_OBJECT_ARRAY) {
707-
resource_attr = &obj;
709+
obj = &resource->ptr[result].val;
710+
if (obj->type == MSGPACK_OBJECT_ARRAY) {
711+
resource_attr = &resource->ptr[result].val;
708712
}
709713
}
710714
}
711715
}
712716

713717
resource_logs_entry = &resource_logs_object->via.map;
714-
result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE);
715718

719+
/* schemaUrl */
720+
result = find_map_entry_by_key(resource_logs_entry, "schemaUrl", 0, FLB_TRUE);
721+
if (result == -1) {
722+
result = find_map_entry_by_key(resource_logs_entry, "schema_url", 0, FLB_TRUE);
723+
}
724+
if (result >= 0) {
725+
obj = &resource_logs_entry->ptr[result].val;
726+
if (obj->type == MSGPACK_OBJECT_STR) {
727+
resource_schema_url = &resource_logs_entry->ptr[result].val;
728+
}
729+
}
730+
731+
/* scopeLogs */
732+
result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE);
716733
if (result == -1) {
717734
result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE);
718735
if (result == -1) {
@@ -767,24 +784,42 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct
767784
if (resource) {
768785
result = find_map_entry_by_key(resource, "droppedAttributesCount", 0, FLB_TRUE);
769786
if (result >= 0) {
770-
obj = resource->ptr[result].val;
787+
obj = &resource->ptr[result].val;
771788
flb_log_event_encoder_append_body_values(encoder,
772789
FLB_LOG_EVENT_CSTRING_VALUE("dropped_attributes_count"),
773-
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
790+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj));
774791
}
775792
}
776793

794+
if (resource_schema_url) {
795+
flb_log_event_encoder_append_body_values(encoder,
796+
FLB_LOG_EVENT_CSTRING_VALUE("schema_url"),
797+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(resource_schema_url));
798+
}
799+
777800
/* close resource map */
778801
flb_log_event_encoder_body_commit_map(encoder);
779802

803+
/* scope schemaUrl */
804+
result = find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schemaUrl", 0, FLB_TRUE);
805+
if (result == -1) {
806+
result = find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schema_url", 0, FLB_TRUE);
807+
}
808+
if (result >= 0) {
809+
obj = &scope_logs->ptr[index].via.map.ptr[result].val;
810+
if (obj->type == MSGPACK_OBJECT_STR) {
811+
scope_schema_url = &scope_logs->ptr[index].via.map.ptr[result].val;
812+
}
813+
}
814+
780815
/* scope metadata */
781816
scope = NULL;
782-
obj = scope_logs->ptr[index];
783-
if (obj.type == MSGPACK_OBJECT_MAP) {
784-
result = find_map_entry_by_key(&obj.via.map, "scope", 0, FLB_TRUE);
817+
obj = &scope_logs->ptr[index];
818+
if (obj->type == MSGPACK_OBJECT_MAP) {
819+
result = find_map_entry_by_key(&obj->via.map, "scope", 0, FLB_TRUE);
785820
if (result >= 0) {
786-
if (obj.via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) {
787-
scope = &obj.via.map.ptr[result].val;
821+
if (obj->via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) {
822+
scope = &scope_logs->ptr[index].via.map.ptr[result].val;
788823
}
789824
}
790825
}
@@ -804,40 +839,47 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct
804839
/* scope name */
805840
result = find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE);
806841
if (result >= 0) {
807-
obj = scope->via.map.ptr[result].val;
808-
if (obj.type == MSGPACK_OBJECT_STR) {
842+
obj = &scope->via.map.ptr[result].val;
843+
if (obj->type == MSGPACK_OBJECT_STR) {
809844
flb_log_event_encoder_append_body_values(encoder,
810845
FLB_LOG_EVENT_CSTRING_VALUE("name"),
811-
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
846+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj));
812847
}
813848
}
814849

815850
/* scope version */
816851
result = find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE);
817852
if (result >= 0) {
818-
obj = scope->via.map.ptr[result].val;
819-
if (obj.type == MSGPACK_OBJECT_STR) {
853+
obj = &scope->via.map.ptr[result].val;
854+
if (obj->type == MSGPACK_OBJECT_STR) {
820855
flb_log_event_encoder_append_body_values(encoder,
821856
FLB_LOG_EVENT_CSTRING_VALUE("version"),
822-
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
857+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj));
823858
}
824859
}
825860

826861
/* scope attributes */
827862
result = find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE);
828863
if (result >= 0) {
829-
obj = scope->via.map.ptr[result].val;
830-
if (obj.type == MSGPACK_OBJECT_ARRAY) {
864+
obj = &scope->via.map.ptr[result].val;
865+
if (obj->type == MSGPACK_OBJECT_ARRAY) {
831866
flb_log_event_encoder_append_body_string(encoder, "attributes", 10);
832867
result = json_payload_append_converted_kvlist(encoder,
833868
FLB_LOG_EVENT_BODY,
834-
&obj);
869+
obj);
835870
if (result != 0) {
836871
return -2;
837872
}
838873
}
839874
}
840875

876+
/* scope schemaUrl */
877+
if (scope_schema_url) {
878+
flb_log_event_encoder_append_body_values(encoder,
879+
FLB_LOG_EVENT_CSTRING_VALUE("schema_url"),
880+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(scope_schema_url));
881+
}
882+
841883
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY);
842884
}
843885

@@ -891,8 +933,8 @@ static int process_json_payload_root(struct flb_opentelemetry *ctx,
891933

892934
resource_logs = &root->ptr[result].val.via.array;
893935

894-
result = 0;
895936

937+
result = 0;
896938
for (index = 0 ; index < resource_logs->size ; index++) {
897939
result = process_json_payload_resource_logs_entry(
898940
ctx,
@@ -1262,6 +1304,7 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
12621304

12631305
for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) {
12641306
resource_log = resource_logs[resource_logs_index];
1307+
12651308
resource = resource_log->resource;
12661309
scope_logs = resource_log->scope_logs;
12671310

@@ -1322,18 +1365,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
13221365
msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24);
13231366
msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count);
13241367
}
1325-
}
1326-
flb_mp_map_header_end(&mh_tmp);
13271368

1328-
if (resource_log->schema_url) {
1329-
flb_mp_map_header_append(&mh);
1330-
msgpack_pack_str(&mp_pck, 10);
1331-
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
1369+
if (resource_log->schema_url) {
1370+
flb_mp_map_header_append(&mh_tmp);
1371+
msgpack_pack_str(&mp_pck, 10);
1372+
msgpack_pack_str_body(&mp_pck, "schema_url", 10);
13321373

1333-
len = strlen(resource_log->schema_url);
1334-
msgpack_pack_str(&mp_pck, len);
1335-
msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len);
1374+
len = strlen(resource_log->schema_url);
1375+
msgpack_pack_str(&mp_pck, len);
1376+
msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len);
1377+
}
13361378
}
1379+
flb_mp_map_header_end(&mh_tmp);
13371380

13381381
/* scope */
13391382
flb_mp_map_header_append(&mh);

0 commit comments

Comments
 (0)