Skip to content

Commit 78b14aa

Browse files
committed
mp: extend cobj API to expose group metadata and attributes
Add cobj_group_metadata and cobj_group_attributes to flb_mp_chunk_record to give processors access to shared group context alongside individual record data. This enables processors to read and modify OpenTelemetry resource attributes and scope metadata using the CFL object interface. Signed-off-by: Eduardo Silva <[email protected]>
1 parent 1c7c4e1 commit 78b14aa

File tree

2 files changed

+139
-1
lines changed

2 files changed

+139
-1
lines changed

include/fluent-bit/flb_mp_chunk.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ struct flb_mp_chunk_record {
3636
struct flb_log_event event;
3737
struct cfl_object *cobj_metadata;
3838
struct cfl_object *cobj_record;
39+
struct cfl_object *cobj_group_metadata;
40+
struct cfl_object *cobj_group_attributes;
41+
int owns_group_metadata;
42+
int owns_group_attributes;
3943
struct cfl_list _head;
4044
};
4145

@@ -47,6 +51,9 @@ struct flb_mp_chunk_cobj {
4751
struct flb_mp_chunk_record *record_pos;
4852
struct cfl_list records;
4953

54+
struct cfl_object *active_group_metadata;
55+
struct cfl_object *active_group_attributes;
56+
5057
/* Condition for filtering records during processing */
5158
struct flb_condition *condition;
5259
};

src/flb_mp.c

Lines changed: 132 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,10 @@ struct flb_mp_chunk_record *flb_mp_chunk_record_create(struct flb_mp_chunk_cobj
10541054
return NULL;
10551055
}
10561056
record->modified = FLB_FALSE;
1057+
record->cobj_group_metadata = NULL;
1058+
record->cobj_group_attributes = NULL;
1059+
record->owns_group_metadata = FLB_FALSE;
1060+
record->owns_group_attributes = FLB_FALSE;
10571061

10581062
return record;
10591063
}
@@ -1076,6 +1080,8 @@ struct flb_mp_chunk_cobj *flb_mp_chunk_cobj_create(struct flb_log_event_encoder
10761080
chunk_cobj->log_encoder = log_encoder;
10771081
chunk_cobj->log_decoder = log_decoder;
10781082
chunk_cobj->condition = NULL;
1083+
chunk_cobj->active_group_metadata = NULL;
1084+
chunk_cobj->active_group_attributes = NULL;
10791085

10801086
return chunk_cobj;
10811087
}
@@ -1100,6 +1106,7 @@ static int generate_empty_msgpack_map(char **out_buf, size_t *out_size)
11001106
int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_buf, size_t *out_size)
11011107
{
11021108
int ret;
1109+
int record_type;
11031110
char *mp_buf;
11041111
size_t mp_size;
11051112
struct cfl_list *head;
@@ -1123,6 +1130,21 @@ int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_bu
11231130
return -1;
11241131
}
11251132

1133+
/* Determine record type from timestamp */
1134+
if (record->event.timestamp.tm.tv_sec >= 0) {
1135+
record_type = FLB_LOG_EVENT_NORMAL;
1136+
}
1137+
else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_START) {
1138+
record_type = FLB_LOG_EVENT_GROUP_START;
1139+
}
1140+
else if (record->event.timestamp.tm.tv_sec == FLB_LOG_EVENT_GROUP_END) {
1141+
record_type = FLB_LOG_EVENT_GROUP_END;
1142+
}
1143+
else {
1144+
record_type = FLB_LOG_EVENT_NORMAL;
1145+
}
1146+
1147+
11261148
if (record->cobj_metadata) {
11271149
ret = flb_mp_cfl_to_msgpack(record->cobj_metadata, &mp_buf, &mp_size);
11281150
if (ret == -1) {
@@ -1143,7 +1165,14 @@ int flb_mp_chunk_cobj_encode(struct flb_mp_chunk_cobj *chunk_cobj, char **out_bu
11431165
}
11441166
flb_free(mp_buf);
11451167

1146-
if (record->cobj_record) {
1168+
/* For group start records, use group attributes as body if available */
1169+
if (record_type == FLB_LOG_EVENT_GROUP_START && record->cobj_group_attributes) {
1170+
ret = flb_mp_cfl_to_msgpack(record->cobj_group_attributes, &mp_buf, &mp_size);
1171+
if (ret == -1) {
1172+
return -1;
1173+
}
1174+
}
1175+
else if (record->cobj_record) {
11471176
ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size);
11481177
if (ret == -1) {
11491178
return -1;
@@ -1195,6 +1224,14 @@ int flb_mp_chunk_cobj_destroy(struct flb_mp_chunk_cobj *chunk_cobj)
11951224
if (record->cobj_record) {
11961225
cfl_object_destroy(record->cobj_record);
11971226
}
1227+
if (record->owns_group_metadata && record->cobj_group_metadata &&
1228+
record->cobj_group_metadata != record->cobj_metadata) {
1229+
cfl_object_destroy(record->cobj_group_metadata);
1230+
}
1231+
if (record->owns_group_attributes && record->cobj_group_attributes &&
1232+
record->cobj_group_attributes != record->cobj_record) {
1233+
cfl_object_destroy(record->cobj_group_attributes);
1234+
}
11981235
cfl_list_del(&record->_head);
11991236
flb_free(record);
12001237
}
@@ -1208,6 +1245,7 @@ int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
12081245
{
12091246
int ret = FLB_MP_CHUNK_RECORD_EOF;
12101247
size_t bytes;
1248+
int record_type = FLB_LOG_EVENT_NORMAL;
12111249
struct flb_mp_chunk_record *record = NULL;
12121250
struct flb_condition *condition = NULL;
12131251

@@ -1246,6 +1284,82 @@ int flb_mp_chunk_cobj_record_next(struct flb_mp_chunk_cobj *chunk_cobj,
12461284
return -1;
12471285
}
12481286

1287+
ret = flb_log_event_decoder_get_record_type(&record->event, &record_type);
1288+
if (ret != FLB_EVENT_DECODER_SUCCESS) {
1289+
cfl_object_destroy(record->cobj_record);
1290+
cfl_object_destroy(record->cobj_metadata);
1291+
flb_free(record);
1292+
return FLB_MP_CHUNK_RECORD_ERROR;
1293+
}
1294+
1295+
record->owns_group_metadata = FLB_FALSE;
1296+
record->owns_group_attributes = FLB_FALSE;
1297+
1298+
if (record_type == FLB_LOG_EVENT_GROUP_START) {
1299+
if (record->cobj_metadata) {
1300+
record->cobj_group_metadata = record->cobj_metadata;
1301+
record->owns_group_metadata = FLB_TRUE;
1302+
}
1303+
if (record->cobj_record) {
1304+
record->cobj_group_attributes = record->cobj_record;
1305+
record->owns_group_attributes = FLB_TRUE;
1306+
}
1307+
1308+
chunk_cobj->active_group_metadata = record->cobj_group_metadata;
1309+
chunk_cobj->active_group_attributes = record->cobj_group_attributes;
1310+
}
1311+
else if (record_type == FLB_LOG_EVENT_GROUP_END) {
1312+
record->cobj_group_metadata = chunk_cobj->active_group_metadata;
1313+
record->cobj_group_attributes = chunk_cobj->active_group_attributes;
1314+
1315+
chunk_cobj->active_group_metadata = NULL;
1316+
chunk_cobj->active_group_attributes = NULL;
1317+
}
1318+
else {
1319+
record->cobj_group_metadata = chunk_cobj->active_group_metadata;
1320+
record->cobj_group_attributes = chunk_cobj->active_group_attributes;
1321+
}
1322+
1323+
if (!record->cobj_group_metadata &&
1324+
record->event.group_metadata &&
1325+
(record->event.group_metadata->type == MSGPACK_OBJECT_MAP ||
1326+
record->event.group_metadata->type == MSGPACK_OBJECT_ARRAY)) {
1327+
record->cobj_group_metadata = flb_mp_object_to_cfl(record->event.group_metadata);
1328+
if (!record->cobj_group_metadata) {
1329+
if (record->owns_group_attributes && record->cobj_group_attributes) {
1330+
cfl_object_destroy(record->cobj_group_attributes);
1331+
}
1332+
cfl_object_destroy(record->cobj_record);
1333+
cfl_object_destroy(record->cobj_metadata);
1334+
flb_free(record);
1335+
return FLB_MP_CHUNK_RECORD_ERROR;
1336+
}
1337+
record->owns_group_metadata = FLB_TRUE;
1338+
if (!chunk_cobj->active_group_metadata) {
1339+
chunk_cobj->active_group_metadata = record->cobj_group_metadata;
1340+
}
1341+
}
1342+
1343+
if (!record->cobj_group_attributes &&
1344+
record->event.group_attributes &&
1345+
(record->event.group_attributes->type == MSGPACK_OBJECT_MAP ||
1346+
record->event.group_attributes->type == MSGPACK_OBJECT_ARRAY)) {
1347+
record->cobj_group_attributes = flb_mp_object_to_cfl(record->event.group_attributes);
1348+
if (!record->cobj_group_attributes) {
1349+
if (record->owns_group_metadata && record->cobj_group_metadata) {
1350+
cfl_object_destroy(record->cobj_group_metadata);
1351+
}
1352+
cfl_object_destroy(record->cobj_record);
1353+
cfl_object_destroy(record->cobj_metadata);
1354+
flb_free(record);
1355+
return FLB_MP_CHUNK_RECORD_ERROR;
1356+
}
1357+
record->owns_group_attributes = FLB_TRUE;
1358+
if (!chunk_cobj->active_group_attributes) {
1359+
chunk_cobj->active_group_attributes = record->cobj_group_attributes;
1360+
}
1361+
}
1362+
12491363
cfl_list_add(&record->_head, &chunk_cobj->records);
12501364

12511365
/* If there's a condition, check if the record matches */
@@ -1339,12 +1453,29 @@ int flb_mp_chunk_cobj_record_destroy(struct flb_mp_chunk_cobj *chunk_cobj,
13391453
}
13401454
}
13411455

1456+
if (chunk_cobj && record->owns_group_metadata &&
1457+
chunk_cobj->active_group_metadata == record->cobj_group_metadata) {
1458+
chunk_cobj->active_group_metadata = NULL;
1459+
}
1460+
if (chunk_cobj && record->owns_group_attributes &&
1461+
chunk_cobj->active_group_attributes == record->cobj_group_attributes) {
1462+
chunk_cobj->active_group_attributes = NULL;
1463+
}
1464+
13421465
if (record->cobj_metadata) {
13431466
cfl_object_destroy(record->cobj_metadata);
13441467
}
13451468
if (record->cobj_record) {
13461469
cfl_object_destroy(record->cobj_record);
13471470
}
1471+
if (record->owns_group_metadata && record->cobj_group_metadata &&
1472+
record->cobj_group_metadata != record->cobj_metadata) {
1473+
cfl_object_destroy(record->cobj_group_metadata);
1474+
}
1475+
if (record->owns_group_attributes && record->cobj_group_attributes &&
1476+
record->cobj_group_attributes != record->cobj_record) {
1477+
cfl_object_destroy(record->cobj_group_attributes);
1478+
}
13481479

13491480
cfl_list_del(&record->_head);
13501481
flb_free(record);

0 commit comments

Comments
 (0)