Skip to content

Commit 97183b1

Browse files
committed
out_vivo_exporter: add groups support
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 652b3fb commit 97183b1

File tree

5 files changed

+181
-22
lines changed

5 files changed

+181
-22
lines changed

plugins/out_vivo_exporter/vivo.c

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,11 +23,35 @@
2323
#include <fluent-bit/flb_mp.h>
2424
#include <fluent-bit/flb_log_event_decoder.h>
2525
#include <fluent-bit/flb_log_event_encoder.h>
26+
#include <string.h>
2627

2728
#include "vivo.h"
2829
#include "vivo_http.h"
2930
#include "vivo_stream.h"
3031

32+
static msgpack_object *find_map_value(msgpack_object *map,
33+
const char *key, size_t key_len)
34+
{
35+
size_t i;
36+
37+
if (!map || map->type != MSGPACK_OBJECT_MAP) {
38+
return NULL;
39+
}
40+
41+
for (i = 0; i < map->via.map.size; i++) {
42+
if (map->via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) {
43+
continue;
44+
}
45+
46+
if (map->via.map.ptr[i].key.via.str.size == key_len &&
47+
strncmp(map->via.map.ptr[i].key.via.str.ptr, key, key_len) == 0) {
48+
return &map->via.map.ptr[i].val;
49+
}
50+
}
51+
52+
return NULL;
53+
}
54+
3155
static flb_sds_t format_logs(struct flb_input_instance *src_ins,
3256
struct flb_event_chunk *event_chunk, struct flb_config *config)
3357
{
@@ -38,9 +62,19 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
3862
flb_sds_t out_buf = NULL;
3963
msgpack_sbuffer tmp_sbuf;
4064
msgpack_packer tmp_pck;
65+
int group_mismatch = FLB_FALSE;
66+
int is_otlp = FLB_FALSE;
4167
struct flb_log_event log_event;
4268
struct flb_log_event_decoder log_decoder;
4369
struct flb_mp_map_header mh;
70+
struct flb_mp_map_header root_map;
71+
struct flb_mp_map_header otlp_map;
72+
struct flb_mp_map_header group_map;
73+
msgpack_object *group_metadata = NULL;
74+
msgpack_object *group_attributes = NULL;
75+
msgpack_object *schema_value = NULL;
76+
msgpack_object *resource_value = NULL;
77+
msgpack_object *scope_value = NULL;
4478

4579
result = flb_log_event_decoder_init(&log_decoder,
4680
(char *) event_chunk->data,
@@ -87,9 +121,10 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
87121
* }
88122
*/
89123

90-
msgpack_pack_map(&tmp_pck, 4);
124+
flb_mp_map_header_init(&root_map, &tmp_pck);
91125

92126
/* source_type: internal type of the plugin */
127+
flb_mp_map_header_append(&root_map);
93128
name = src_ins->p->name;
94129
len = strlen(name);
95130

@@ -99,6 +134,7 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
99134
msgpack_pack_str_body(&tmp_pck, name, len);
100135

101136
/* source_name: internal name or alias set by the user */
137+
flb_mp_map_header_append(&root_map);
102138
name = (char *) flb_input_name(src_ins);
103139
len = strlen(name);
104140
msgpack_pack_str(&tmp_pck, 11);
@@ -107,12 +143,14 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
107143
msgpack_pack_str_body(&tmp_pck, name, len);
108144

109145
/* tag */
146+
flb_mp_map_header_append(&root_map);
110147
msgpack_pack_str(&tmp_pck, 3);
111148
msgpack_pack_str_body(&tmp_pck, "tag", 3);
112149
msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag));
113150
msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag));
114151

115152
/* records */
153+
flb_mp_map_header_append(&root_map);
116154
msgpack_pack_str(&tmp_pck, 7);
117155
msgpack_pack_str_body(&tmp_pck, "records", 7);
118156

@@ -122,6 +160,24 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
122160
&log_decoder,
123161
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
124162

163+
if (log_event.group_metadata != NULL) {
164+
if (group_metadata == NULL) {
165+
group_metadata = log_event.group_metadata;
166+
}
167+
else if (group_metadata != log_event.group_metadata) {
168+
group_mismatch = FLB_TRUE;
169+
}
170+
}
171+
172+
if (log_event.group_attributes != NULL) {
173+
if (group_attributes == NULL) {
174+
group_attributes = log_event.group_attributes;
175+
}
176+
else if (group_attributes != log_event.group_attributes) {
177+
group_mismatch = FLB_TRUE;
178+
}
179+
}
180+
125181
flb_mp_array_header_append(&mh);
126182

127183
/*
@@ -140,6 +196,78 @@ static flb_sds_t format_logs(struct flb_input_instance *src_ins,
140196

141197
flb_mp_array_header_end(&mh);
142198

199+
if (group_mismatch == FLB_FALSE &&
200+
(group_metadata != NULL || group_attributes != NULL)) {
201+
if (group_metadata != NULL) {
202+
schema_value = find_map_value(group_metadata, "schema", 6);
203+
}
204+
205+
if (schema_value &&
206+
schema_value->type == MSGPACK_OBJECT_STR &&
207+
schema_value->via.str.size == 4 &&
208+
strncmp(schema_value->via.str.ptr, "otlp", 4) == 0) {
209+
is_otlp = FLB_TRUE;
210+
}
211+
212+
if (is_otlp == FLB_TRUE) {
213+
resource_value = NULL;
214+
scope_value = NULL;
215+
216+
if (group_attributes != NULL &&
217+
group_attributes->type == MSGPACK_OBJECT_MAP) {
218+
resource_value = find_map_value(group_attributes, "resource", 8);
219+
scope_value = find_map_value(group_attributes, "scope", 5);
220+
}
221+
222+
flb_mp_map_header_append(&root_map);
223+
msgpack_pack_str(&tmp_pck, 4);
224+
msgpack_pack_str_body(&tmp_pck, "otlp", 4);
225+
226+
flb_mp_map_header_init(&otlp_map, &tmp_pck);
227+
228+
if (resource_value != NULL) {
229+
flb_mp_map_header_append(&otlp_map);
230+
msgpack_pack_str(&tmp_pck, 8);
231+
msgpack_pack_str_body(&tmp_pck, "resource", 8);
232+
msgpack_pack_object(&tmp_pck, *resource_value);
233+
}
234+
235+
if (scope_value != NULL) {
236+
flb_mp_map_header_append(&otlp_map);
237+
msgpack_pack_str(&tmp_pck, 5);
238+
msgpack_pack_str_body(&tmp_pck, "scope", 5);
239+
msgpack_pack_object(&tmp_pck, *scope_value);
240+
}
241+
242+
flb_mp_map_header_end(&otlp_map);
243+
}
244+
else {
245+
flb_mp_map_header_append(&root_map);
246+
msgpack_pack_str(&tmp_pck, 9);
247+
msgpack_pack_str_body(&tmp_pck, "flb_group", 9);
248+
249+
flb_mp_map_header_init(&group_map, &tmp_pck);
250+
251+
if (group_metadata != NULL) {
252+
flb_mp_map_header_append(&group_map);
253+
msgpack_pack_str(&tmp_pck, 8);
254+
msgpack_pack_str_body(&tmp_pck, "metadata", 8);
255+
msgpack_pack_object(&tmp_pck, *group_metadata);
256+
}
257+
258+
if (group_attributes != NULL) {
259+
flb_mp_map_header_append(&group_map);
260+
msgpack_pack_str(&tmp_pck, 4);
261+
msgpack_pack_str_body(&tmp_pck, "body", 4);
262+
msgpack_pack_object(&tmp_pck, *group_attributes);
263+
}
264+
265+
flb_mp_map_header_end(&group_map);
266+
}
267+
}
268+
269+
flb_mp_map_header_end(&root_map);
270+
143271
/* Release the unpacker */
144272
flb_log_event_decoder_destroy(&log_decoder);
145273

plugins/out_vivo_exporter/vivo_http.c

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,6 @@
3131

3232
#define VIVO_CONTENT_TYPE "Content-Type"
3333
#define VIVO_CONTENT_TYPE_JSON "application/json"
34-
#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID"
35-
#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID"
36-
3734
static int stream_get_uri_properties(mk_request_t *request,
3835
int64_t *from, int64_t *to, int64_t *limit)
3936
{
@@ -106,46 +103,55 @@ static void headers_set(mk_request_t *request, struct vivo_stream *vs)
106103
mk_http_header(request,
107104
"Access-Control-Expose-Headers",
108105
sizeof("Access-Control-Expose-Headers") - 1,
109-
"vivo-stream-start-id, vivo-stream-end-id",
110-
sizeof("vivo-stream-start-id, vivo-stream-end-id") - 1);
106+
"vivo-stream-start-id, vivo-stream-end-id, vivo-stream-next-id",
107+
sizeof("vivo-stream-start-id, vivo-stream-end-id, vivo-stream-next-id") - 1);
111108
}
112109
}
113110

114-
static void serve_content(mk_request_t *request, struct vivo_stream *vs)
111+
void vivo_http_serve_content(mk_request_t *request, struct vivo_stream *vs)
115112
{
116113
int64_t from = -1;
117114
int64_t to = -1;
118115
int64_t limit = -1;
119116
int64_t stream_start_id = -1;
120117
int64_t stream_end_id = -1;
118+
int64_t stream_next_id = -1;
121119
flb_sds_t payload;
122120
flb_sds_t str_start;
123121
flb_sds_t str_end;
122+
flb_sds_t str_next;
124123

125124

126125
if (request->query_string.len > 0) {
127126
stream_get_uri_properties(request, &from, &to, &limit);
128127
}
129128

130129
payload = vivo_stream_get_content(vs, from, to, limit,
131-
&stream_start_id, &stream_end_id);
130+
&stream_start_id, &stream_end_id,
131+
&stream_next_id);
132132
if (!payload) {
133133
mk_http_status(request, 500);
134134
return;
135135
}
136136

137-
if (flb_sds_len(payload) == 0) {
138-
mk_http_status(request, 200);
139-
headers_set(request, vs);
140-
flb_sds_destroy(payload);
141-
return;
142-
}
143-
144137
mk_http_status(request, 200);
145138

146139
/* set response headers */
147140
headers_set(request, vs);
148141

142+
str_next = flb_sds_create_size(32);
143+
flb_sds_printf(&str_next, "%" PRId64, stream_next_id);
144+
145+
mk_http_header(request,
146+
VIVO_STREAM_NEXT_ID, sizeof(VIVO_STREAM_NEXT_ID) - 1,
147+
str_next, flb_sds_len(str_next));
148+
149+
if (flb_sds_len(payload) == 0) {
150+
flb_sds_destroy(payload);
151+
flb_sds_destroy(str_next);
152+
return;
153+
}
154+
149155
/* stream ids served: compose buffer and set headers */
150156
str_start = flb_sds_create_size(32);
151157
flb_sds_printf(&str_start, "%" PRId64, stream_start_id);
@@ -168,6 +174,7 @@ static void serve_content(mk_request_t *request, struct vivo_stream *vs)
168174
flb_sds_destroy(payload);
169175
flb_sds_destroy(str_start);
170176
flb_sds_destroy(str_end);
177+
flb_sds_destroy(str_next);
171178
}
172179

173180
/* HTTP endpoint: /api/v1/logs */
@@ -177,7 +184,7 @@ static void cb_logs(mk_request_t *request, void *data)
177184

178185
ctx = (struct vivo_exporter *) data;
179186

180-
serve_content(request, ctx->stream_logs);
187+
vivo_http_serve_content(request, ctx->stream_logs);
181188
mk_http_done(request);
182189
}
183190

@@ -188,7 +195,7 @@ static void cb_metrics(mk_request_t *request, void *data)
188195

189196
ctx = (struct vivo_exporter *) data;
190197

191-
serve_content(request, ctx->stream_metrics);
198+
vivo_http_serve_content(request, ctx->stream_metrics);
192199
mk_http_done(request);
193200
}
194201

@@ -199,7 +206,7 @@ static void cb_traces(mk_request_t *request, void *data)
199206

200207
ctx = (struct vivo_exporter *) data;
201208

202-
serve_content(request, ctx->stream_traces);
209+
vivo_http_serve_content(request, ctx->stream_traces);
203210
mk_http_done(request);
204211
}
205212

plugins/out_vivo_exporter/vivo_http.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,12 @@
2525

2626
#include "vivo.h"
2727

28+
#define VIVO_STREAM_START_ID "Vivo-Stream-Start-ID"
29+
#define VIVO_STREAM_END_ID "Vivo-Stream-End-ID"
30+
#define VIVO_STREAM_NEXT_ID "Vivo-Stream-Next-ID"
31+
32+
struct vivo_stream;
33+
2834
/* HTTP response payload received through a Message Queue */
2935
struct vivo_http_buf {
3036
int users;
@@ -53,4 +59,6 @@ int vivo_http_server_stop(struct vivo_http *ph);
5359
int vivo_http_server_mq_push_metrics(struct vivo_http *ph,
5460
void *data, size_t size);
5561

62+
void vivo_http_serve_content(mk_request_t *request, struct vivo_stream *vs);
63+
5664
#endif

plugins/out_vivo_exporter/vivo_stream.c

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,8 @@ void vivo_stream_destroy(struct vivo_stream *vs)
136136

137137
flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to,
138138
int64_t limit,
139-
int64_t *stream_start_id, int64_t *stream_end_id)
139+
int64_t *stream_start_id, int64_t *stream_end_id,
140+
int64_t *stream_next_id)
140141
{
141142
int64_t count = 0;
142143
flb_sds_t buf;
@@ -151,6 +152,18 @@ flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t
151152

152153
stream_lock(vs);
153154

155+
if (stream_start_id) {
156+
*stream_start_id = -1;
157+
}
158+
159+
if (stream_end_id) {
160+
*stream_end_id = -1;
161+
}
162+
163+
if (stream_next_id) {
164+
*stream_next_id = vs->entries_added;
165+
}
166+
154167
mk_list_foreach(head, &vs->entries) {
155168
e = mk_list_entry(head, struct vivo_stream_entry, _head);
156169

@@ -162,13 +175,15 @@ flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t
162175
break;
163176
}
164177

165-
if (count == 0) {
178+
if (count == 0 && stream_start_id) {
166179
*stream_start_id = e->id;
167180
}
168181

169182
flb_sds_cat_safe(&buf, e->data, flb_sds_len(e->data));
170183

171-
*stream_end_id = e->id;
184+
if (stream_end_id) {
185+
*stream_end_id = e->id;
186+
}
172187
count++;
173188

174189
if (limit > 0 && count >= limit) {

plugins/out_vivo_exporter/vivo_stream.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ struct vivo_stream_entry *vivo_stream_append(struct vivo_stream *vs, void *data,
5454
size_t size);
5555
flb_sds_t vivo_stream_get_content(struct vivo_stream *vs, int64_t from, int64_t to,
5656
int64_t limit,
57-
int64_t *stream_start_id, int64_t *stream_end_id);
57+
int64_t *stream_start_id, int64_t *stream_end_id,
58+
int64_t *stream_next_id);
5859

5960
#endif

0 commit comments

Comments
 (0)