Skip to content

Commit f73949c

Browse files
committed
in_opentelemetry: logs: add support for metadata in JSON payload
This change adds support to parse and register the resources and scope metadata coming from Logs which are ingested from a JSON payload. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 8916ac7 commit f73949c

File tree

1 file changed

+203
-29
lines changed

1 file changed

+203
-29
lines changed

plugins/in_opentelemetry/opentelemetry_prot.c

Lines changed: 203 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -611,7 +611,7 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx,
611611
msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count);
612612
}
613613

614-
flb_mp_map_header_end(&mh_tmp);
614+
615615

616616
if (resource_log->schema_url) {
617617
flb_mp_map_header_append(&mh);
@@ -781,13 +781,23 @@ static int find_map_entry_by_key(msgpack_object_map *map,
781781
size_t match_index,
782782
int case_insensitive)
783783
{
784-
size_t match_count;
785784
int result;
786785
int index;
786+
int key_len;
787+
size_t match_count;
788+
789+
if (!key) {
790+
return -1;
791+
}
787792

793+
key_len = strlen(key);
788794
match_count = 0;
789795

790796
for (index = 0 ; index < (int) map->size ; index++) {
797+
if (key_len != map->ptr[index].key.via.str.size) {
798+
continue;
799+
}
800+
791801
if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) {
792802
if (case_insensitive) {
793803
result = strncasecmp(map->ptr[index].key.via.str.ptr,
@@ -1229,6 +1239,8 @@ static int process_json_payload_log_records_entry(
12291239
int body_type;
12301240
struct flb_time timestamp;
12311241
int result;
1242+
msgpack_object *severity_number = NULL;
1243+
msgpack_object *severity_text = NULL;
12321244

12331245
if (log_records_object->type != MSGPACK_OBJECT_MAP) {
12341246
flb_plg_error(ctx->ins, "unexpected logRecords entry type");
@@ -1288,12 +1300,28 @@ static int process_json_payload_log_records_entry(
12881300
flb_time_from_uint64(&timestamp, timestamp_uint64);
12891301
}
12901302

1303+
/* severityNumber */
1304+
result = find_map_entry_by_key(log_records_entry, "severityNumber", 0, FLB_TRUE);
1305+
if (result == -1) {
1306+
result = find_map_entry_by_key(log_records_entry, "severity_number", 0, FLB_TRUE);
1307+
}
1308+
if (result >= 0) {
1309+
severity_number = &log_records_entry->ptr[result].val;
1310+
}
1311+
1312+
/* severityText */
1313+
result = find_map_entry_by_key(log_records_entry, "severityText", 0, FLB_TRUE);
1314+
if (result == -1) {
1315+
result = find_map_entry_by_key(log_records_entry, "severity_text", 0, FLB_TRUE);
1316+
}
1317+
if (result >= 0) {
1318+
severity_text = &log_records_entry->ptr[result].val;
1319+
}
1320+
12911321

12921322
result = find_map_entry_by_key(log_records_entry, "attributes", 0, FLB_TRUE);
1293-
12941323
if (result == -1) {
12951324
flb_plg_debug(ctx->ins, "attributes missing");
1296-
12971325
metadata_object = NULL;
12981326
}
12991327
else {
@@ -1329,15 +1357,33 @@ static int process_json_payload_log_records_entry(
13291357
result = flb_log_event_encoder_set_timestamp(encoder, &timestamp);
13301358
}
13311359

1332-
if (result == FLB_EVENT_ENCODER_SUCCESS &&
1333-
metadata_object != NULL) {
1334-
flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
1360+
flb_log_event_encoder_dynamic_field_reset(&encoder->metadata);
1361+
result = flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA);
1362+
if (result == FLB_EVENT_ENCODER_SUCCESS) {
1363+
flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, ctx->logs_metadata_key, flb_sds_len(ctx->logs_metadata_key));
1364+
flb_log_event_encoder_begin_map(encoder, FLB_LOG_EVENT_METADATA);
1365+
1366+
if (severity_number != NULL) {
1367+
flb_log_event_encoder_append_metadata_values(encoder,
1368+
FLB_LOG_EVENT_STRING_VALUE("severity_number", 15),
1369+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(severity_number));
1370+
}
1371+
1372+
if (severity_text != NULL && severity_text->type == MSGPACK_OBJECT_STR) {
1373+
flb_log_event_encoder_append_metadata_values(encoder,
1374+
FLB_LOG_EVENT_STRING_VALUE("severity_text", 13),
1375+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(severity_text));
1376+
}
1377+
1378+
if (metadata_object != NULL) {
1379+
flb_log_event_encoder_append_string(encoder, FLB_LOG_EVENT_METADATA, "attributes", 10);
1380+
result = json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_METADATA, metadata_object);
1381+
}
1382+
1383+
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA);
13351384

1336-
result = json_payload_append_converted_kvlist(
1337-
encoder,
1338-
FLB_LOG_EVENT_METADATA,
1339-
metadata_object);
13401385
}
1386+
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_METADATA);
13411387

13421388
if (result == FLB_EVENT_ENCODER_SUCCESS &&
13431389
body_object != NULL) {
@@ -1400,7 +1446,6 @@ static int process_json_payload_scope_logs_entry(
14001446

14011447
if (result == -1) {
14021448
flb_plg_error(ctx->ins, "scopeLogs missing");
1403-
14041449
return -3;
14051450
}
14061451
}
@@ -1425,31 +1470,48 @@ static int process_json_payload_scope_logs_entry(
14251470
return result;
14261471
}
14271472

1428-
14291473
static int process_json_payload_resource_logs_entry(
1430-
struct flb_opentelemetry *ctx,
1431-
struct flb_log_event_encoder *encoder,
1432-
msgpack_object *resource_logs_object)
1474+
struct flb_opentelemetry *ctx,
1475+
struct flb_log_event_encoder *encoder,
1476+
size_t resource_logs_index,
1477+
msgpack_object *resource_logs_object)
14331478
{
1434-
msgpack_object_map *resource_logs_entry;
1479+
int ret;
1480+
int result;
1481+
size_t index;
1482+
msgpack_object obj;
1483+
msgpack_object_map *resource = NULL;
1484+
msgpack_object *resource_attr = NULL;
1485+
msgpack_object_map *resource_logs_entry = NULL;
1486+
msgpack_object *scope = NULL;
14351487
msgpack_object_array *scope_logs;
1436-
int result;
1437-
size_t index;
1438-
14391488

14401489
if (resource_logs_object->type != MSGPACK_OBJECT_MAP) {
14411490
flb_plg_error(ctx->ins, "unexpected resourceLogs entry type");
1442-
14431491
return -2;
14441492
}
14451493

1446-
resource_logs_entry = &resource_logs_object->via.map;
1494+
/* get 'resource' and resource['attributes'] */
1495+
result = find_map_entry_by_key(&resource_logs_object->via.map, "resource", 0, FLB_TRUE);
1496+
if (result >= 0) {
1497+
obj = resource_logs_object->via.map.ptr[result].val;
1498+
if (obj.type == MSGPACK_OBJECT_MAP) {
1499+
resource = &obj.via.map;
1500+
result = find_map_entry_by_key(resource, "attributes", 0, FLB_TRUE);
1501+
if (result >= 0) {
1502+
obj = resource->ptr[result].val;
1503+
if (obj.type == MSGPACK_OBJECT_ARRAY) {
1504+
resource_attr = &obj;
1505+
}
1506+
}
1507+
}
1508+
}
14471509

1510+
resource_logs_entry = &resource_logs_object->via.map;
14481511
result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE);
14491512

14501513
if (result == -1) {
14511514
result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE);
1452-
14531515
if (result == -1) {
14541516
flb_plg_error(ctx->ins, "scopeLogs missing");
14551517

@@ -1459,19 +1521,130 @@ static int process_json_payload_resource_logs_entry(
14591521

14601522
if (resource_logs_entry->ptr[result].val.type != MSGPACK_OBJECT_ARRAY) {
14611523
flb_plg_error(ctx->ins, "unexpected scopeLogs type");
1462-
14631524
return -2;
14641525
}
14651526

14661527
scope_logs = &resource_logs_entry->ptr[result].val.via.array;
14671528

1468-
result = 0;
1469-
14701529
for (index = 0 ; index < scope_logs->size ; index++) {
1530+
/*
1531+
* Add the information about OTLP metadata, we do this by registering
1532+
* a group-type record.
1533+
*/
1534+
flb_log_event_encoder_group_init(encoder);
1535+
1536+
/* pack internal schema */
1537+
ret = flb_log_event_encoder_append_metadata_values(encoder,
1538+
FLB_LOG_EVENT_STRING_VALUE("schema", 6),
1539+
FLB_LOG_EVENT_STRING_VALUE("otlp", 4),
1540+
FLB_LOG_EVENT_STRING_VALUE("resource_id", 11),
1541+
FLB_LOG_EVENT_INT64_VALUE(resource_logs_index),
1542+
FLB_LOG_EVENT_STRING_VALUE("scope_id", 8),
1543+
FLB_LOG_EVENT_INT64_VALUE(index));
1544+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
1545+
flb_plg_error(ctx->ins, "could not set group content metadata");
1546+
return -2;
1547+
}
1548+
1549+
/* Resource key */
1550+
flb_log_event_encoder_append_body_string(encoder, "resource", 8);
1551+
1552+
/* start resource value (map) */
1553+
flb_log_event_encoder_body_begin_map(encoder);
1554+
1555+
/* Check if we have OTel resource attributes */
1556+
if (resource_attr) {
1557+
flb_log_event_encoder_append_body_string(encoder, "attributes", 10);
1558+
result = json_payload_append_converted_kvlist(encoder,
1559+
FLB_LOG_EVENT_BODY,
1560+
resource_attr);
1561+
}
1562+
1563+
/* resource dropped_attributers_count */
1564+
result = find_map_entry_by_key(resource, "droppedAttributesCount", 0, FLB_TRUE);
1565+
if (result >= 0) {
1566+
obj = resource->ptr[result].val;
1567+
flb_log_event_encoder_append_body_values(encoder,
1568+
FLB_LOG_EVENT_CSTRING_VALUE("dropped_attributes_count"),
1569+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
1570+
}
1571+
1572+
/* close resource map */
1573+
flb_log_event_encoder_body_commit_map(encoder);
1574+
1575+
/* scope metadata */
1576+
scope = NULL;
1577+
obj = scope_logs->ptr[index];
1578+
if (obj.type == MSGPACK_OBJECT_MAP) {
1579+
result = find_map_entry_by_key(&obj.via.map, "scope", 0, FLB_TRUE);
1580+
if (result >= 0) {
1581+
if (obj.via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) {
1582+
scope = &obj.via.map.ptr[result].val;
1583+
}
1584+
}
1585+
}
1586+
1587+
if (scope) {
1588+
/*
1589+
* if the scope is found, process every expected key one by one to avoid
1590+
* wrongly ingested items.
1591+
*/
1592+
1593+
/* append scope key */
1594+
flb_log_event_encoder_append_body_string(encoder, "scope", 5);
1595+
1596+
/* scope map value */
1597+
flb_log_event_encoder_body_begin_map(encoder);
1598+
1599+
/* scope name */
1600+
result = find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE);
1601+
if (result >= 0) {
1602+
obj = scope->via.map.ptr[result].val;
1603+
if (obj.type == MSGPACK_OBJECT_STR) {
1604+
flb_log_event_encoder_append_body_values(encoder,
1605+
FLB_LOG_EVENT_CSTRING_VALUE("name"),
1606+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
1607+
}
1608+
}
1609+
1610+
/* scope version */
1611+
result = find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE);
1612+
if (result >= 0) {
1613+
obj = scope->via.map.ptr[result].val;
1614+
if (obj.type == MSGPACK_OBJECT_STR) {
1615+
flb_log_event_encoder_append_body_values(encoder,
1616+
FLB_LOG_EVENT_CSTRING_VALUE("version"),
1617+
FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj));
1618+
}
1619+
}
1620+
1621+
/* scope attributes */
1622+
result = find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE);
1623+
if (result >= 0) {
1624+
obj = scope->via.map.ptr[result].val;
1625+
if (obj.type == MSGPACK_OBJECT_ARRAY) {
1626+
flb_log_event_encoder_append_body_string(encoder, "attributes", 10);
1627+
result = json_payload_append_converted_kvlist(encoder,
1628+
FLB_LOG_EVENT_BODY,
1629+
&obj);
1630+
if (result != 0) {
1631+
return -2;
1632+
}
1633+
}
1634+
}
1635+
1636+
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY);
1637+
}
1638+
1639+
flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY);
1640+
1641+
flb_log_event_encoder_group_header_end(encoder);
1642+
14711643
result = process_json_payload_scope_logs_entry(
1472-
ctx,
1473-
encoder,
1474-
&scope_logs->ptr[index]);
1644+
ctx,
1645+
encoder,
1646+
&scope_logs->ptr[index]);
1647+
flb_log_event_encoder_group_end(encoder);
14751648
}
14761649

14771650
return result;
@@ -1520,6 +1693,7 @@ static int process_json_payload_root(struct flb_opentelemetry *ctx,
15201693
result = process_json_payload_resource_logs_entry(
15211694
ctx,
15221695
encoder,
1696+
index,
15231697
&resource_logs->ptr[index]);
15241698
}
15251699

0 commit comments

Comments
 (0)