Skip to content

Commit 2af527c

Browse files
Leonardo Alminanaedsiper
authored andcommitted
out_splunk: added otlp metadata support
Signed-off-by: Leonardo Alminana <[email protected]>
1 parent e7c516c commit 2af527c

File tree

2 files changed

+260
-4
lines changed

2 files changed

+260
-4
lines changed

plugins/out_splunk/splunk.c

Lines changed: 249 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,215 @@ static int cb_splunk_init(struct flb_output_instance *ins,
5353
return 0;
5454
}
5555

56+
static msgpack_object *local_msgpack_map_lookup(
57+
msgpack_object *map_object,
58+
char *key)
59+
{
60+
size_t key_length;
61+
size_t index;
62+
msgpack_object_map *map;
63+
64+
if (key == NULL) {
65+
return NULL;
66+
}
67+
68+
if (map_object == NULL) {
69+
return NULL;
70+
}
71+
72+
if (map_object->type != MSGPACK_OBJECT_MAP) {
73+
return NULL;
74+
}
75+
76+
map = &map_object->via.map;
77+
78+
key_length = strlen(key);
79+
80+
for (index = 0; index < map->size ; index++) {
81+
if (map->ptr[index].key.type == MSGPACK_OBJECT_STR) {
82+
if (map->ptr[index].key.via.str.size == key_length) {
83+
if (strncmp(map->ptr[index].key.via.str.ptr,
84+
key,
85+
key_length) == 0) {
86+
return &map->ptr[index].val;
87+
}
88+
}
89+
}
90+
}
91+
92+
return NULL;
93+
}
94+
95+
static int local_msgpack_map_string_lookup(
96+
msgpack_object *map_object,
97+
char *key,
98+
char **value,
99+
size_t *value_size)
100+
{
101+
msgpack_object *value_object;
102+
103+
value_object = local_msgpack_map_lookup(map_object, key);
104+
105+
if (value_object == NULL) {
106+
return -1;
107+
}
108+
109+
if (value_object->type != MSGPACK_OBJECT_STR) {
110+
return -2;
111+
}
112+
113+
*value = (char *) value_object->via.str.ptr;
114+
*value_size = value_object->via.str.size;
115+
116+
return 0;
117+
}
118+
119+
static int local_msgpack_map_string_extract(
120+
msgpack_object *map_object,
121+
char *key,
122+
char *output_buffer,
123+
size_t output_buffer_size)
124+
{
125+
size_t value_size;
126+
int result;
127+
char *value;
128+
129+
result = local_msgpack_map_string_lookup(map_object,
130+
key,
131+
&value,
132+
&value_size);
133+
134+
if (result != 0) {
135+
return -1;
136+
}
137+
138+
if (value_size >= output_buffer_size) {
139+
return -2;
140+
}
141+
142+
strncpy(output_buffer,
143+
value,
144+
value_size);
145+
146+
output_buffer[value_size] = '\0';
147+
148+
return 0;
149+
}
150+
151+
static inline void local_msgpack_pack_cstr(msgpack_packer *packer, char *value)
152+
{
153+
msgpack_pack_str(packer, strlen(value));
154+
msgpack_pack_str_body(packer, value, strlen(value));
155+
}
156+
157+
static int pack_otel_data(struct flb_splunk *ctx,
158+
msgpack_packer *mp_pck,
159+
struct flb_mp_map_header *mh_pck,
160+
msgpack_object *group_metadata,
161+
msgpack_object *group_attributes,
162+
msgpack_object *record_attributes)
163+
{
164+
msgpack_object *source_map;
165+
char schema[8];
166+
int result;
167+
struct flb_mp_map_header mh_tmp;
168+
msgpack_object *value;
169+
size_t index;
170+
171+
result = local_msgpack_map_string_extract(group_metadata,
172+
"schema",
173+
schema,
174+
sizeof(schema));
175+
176+
if (result != 0) {
177+
return -1;
178+
}
179+
180+
if (strcmp(schema, "otlp") != 0) {
181+
return 0;
182+
}
183+
184+
source_map = local_msgpack_map_lookup(group_attributes, "resource");
185+
186+
if (source_map != NULL) {
187+
source_map = local_msgpack_map_lookup(source_map, "attributes");
188+
189+
if (source_map != NULL) {
190+
value = local_msgpack_map_lookup(source_map,
191+
"host.name");
192+
193+
if (value != NULL) {
194+
flb_mp_map_header_append(mh_pck);
195+
196+
local_msgpack_pack_cstr(mp_pck, "host");
197+
198+
msgpack_pack_object(mp_pck, *value);
199+
}
200+
else {
201+
return -2;
202+
}
203+
}
204+
}
205+
206+
flb_mp_map_header_append(mh_pck);
207+
208+
local_msgpack_pack_cstr(mp_pck, "fields");
209+
210+
flb_mp_map_header_init(&mh_tmp, mp_pck);
211+
212+
source_map = local_msgpack_map_lookup(record_attributes, "otlp");
213+
214+
if (source_map != NULL) {
215+
value = local_msgpack_map_lookup(source_map,
216+
"severity_number");
217+
218+
if (value != NULL &&
219+
(value->type == MSGPACK_OBJECT_POSITIVE_INTEGER ||
220+
value->type == MSGPACK_OBJECT_NEGATIVE_INTEGER)) {
221+
flb_mp_map_header_append(&mh_tmp);
222+
223+
local_msgpack_pack_cstr(mp_pck, "otel.log.severity.number");
224+
225+
msgpack_pack_object(mp_pck, *value);
226+
}
227+
else {
228+
return -2;
229+
}
230+
231+
value = local_msgpack_map_lookup(source_map,
232+
"severity_text");
233+
234+
if (value != NULL &&
235+
value->type == MSGPACK_OBJECT_STR) {
236+
flb_mp_map_header_append(&mh_tmp);
237+
238+
local_msgpack_pack_cstr(mp_pck, "otel.log.severity.text");
239+
240+
msgpack_pack_object(mp_pck, *value);
241+
}
242+
else {
243+
return -3;
244+
}
245+
246+
source_map = local_msgpack_map_lookup(source_map, "attributes");
247+
248+
if (source_map != NULL &&
249+
source_map->type == MSGPACK_OBJECT_MAP) {
250+
251+
for (index = 0; index < source_map->via.map.size ; index++) {
252+
flb_mp_map_header_append(&mh_tmp);
253+
254+
msgpack_pack_object(mp_pck, source_map->via.map.ptr[index].key);
255+
msgpack_pack_object(mp_pck, source_map->via.map.ptr[index].val);
256+
}
257+
}
258+
}
259+
260+
flb_mp_map_header_end(&mh_tmp);
261+
262+
return 0;
263+
}
264+
56265
static int pack_map_meta(struct flb_splunk *ctx,
57266
struct flb_mp_map_header *mh,
58267
msgpack_packer *mp_pck,
@@ -202,9 +411,15 @@ static int pack_map_meta(struct flb_splunk *ctx,
202411
}
203412

204413
static int pack_map(struct flb_splunk *ctx, msgpack_packer *mp_pck,
205-
struct flb_time *tm, msgpack_object map,
206-
char *tag, int tag_len)
414+
struct flb_time *tm,
415+
msgpack_object *group_metadata,
416+
msgpack_object *group_attributes,
417+
msgpack_object *record_attributes,
418+
msgpack_object map,
419+
char *tag,
420+
int tag_len)
207421
{
422+
int result;
208423
int i;
209424
double t;
210425
int map_size;
@@ -232,6 +447,20 @@ static int pack_map(struct flb_splunk *ctx, msgpack_packer *mp_pck,
232447
/* Pack Splunk metadata */
233448
pack_map_meta(ctx, &mh, mp_pck, map, tag, tag_len);
234449

450+
/* Pack Otel specific metadata */
451+
452+
result = pack_otel_data(ctx,
453+
mp_pck,
454+
&mh,
455+
group_metadata,
456+
group_attributes,
457+
record_attributes);
458+
459+
if (result != 0) {
460+
printf("ERROR %d\n", result);
461+
return -1;
462+
}
463+
235464
/* Add k/v pairs under the key 'event' instead of to the top level object */
236465
flb_mp_map_header_append(&mh);
237466
msgpack_pack_str(mp_pck, sizeof(FLB_SPLUNK_DEFAULT_EVENT) -1);
@@ -475,12 +704,28 @@ static inline int splunk_format(const void *in_buf, size_t in_bytes,
475704
* record, we just warn the user and try to pack it
476705
* as a normal map.
477706
*/
478-
ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len);
707+
ret = pack_map(ctx,
708+
&mp_pck,
709+
&log_event.timestamp,
710+
log_event.group_metadata,
711+
log_event.group_attributes,
712+
log_event.metadata,
713+
map,
714+
tag,
715+
tag_len);
479716
}
480717
}
481718
else {
482719
/* Pack as a map */
483-
ret = pack_map(ctx, &mp_pck, &log_event.timestamp, map, tag, tag_len);
720+
ret = pack_map(ctx,
721+
&mp_pck,
722+
&log_event.timestamp,
723+
log_event.group_metadata,
724+
log_event.group_attributes,
725+
log_event.metadata,
726+
map,
727+
tag,
728+
tag_len);
484729
}
485730

486731
/* Validate packaging */

plugins/out_stdout/stdout.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,17 @@ static void cb_stdout_flush(struct flb_event_chunk *event_chunk,
284284

285285
while (flb_log_event_decoder_next(&log_decoder,
286286
&log_event) == FLB_EVENT_DECODER_SUCCESS) {
287+
288+
if (log_event.group_attributes != NULL) {
289+
printf("GROUP METADATA : \n\n");
290+
msgpack_object_print(stdout, *log_event.group_metadata);
291+
printf("\n\n");
292+
293+
printf("GROUP ATTRIBUTES : \n\n");
294+
msgpack_object_print(stdout, *log_event.group_attributes);
295+
printf("\n\n");
296+
}
297+
287298
printf("[%zd] %s: [[", cnt++, event_chunk->tag);
288299

289300
printf("%"PRId32".%09lu, ", (int32_t) log_event.timestamp.tm.tv_sec,

0 commit comments

Comments
 (0)