Skip to content

Commit e8aa968

Browse files
committed
out_http: fix gelf multiline, group all records in one payload
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 8f95e3b commit e8aa968

File tree

1 file changed

+25
-19
lines changed

1 file changed

+25
-19
lines changed

plugins/out_http/http.c

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -213,21 +213,25 @@ static int http_gelf(struct flb_out_http *ctx,
213213
flb_sds_t tmp;
214214
msgpack_unpacked result;
215215
size_t off = 0;
216-
size_t prev_off = 0;
217216
size_t size = 0;
218217
msgpack_object root;
219218
msgpack_object map;
220219
msgpack_object *obj;
221220
struct flb_time tm;
222221
int ret;
223222

224-
msgpack_unpacked_init(&result);
223+
size = bytes * 1.5;
225224

225+
/* Allocate buffer for our new payload */
226+
s = flb_sds_create_size(size);
227+
if (!s) {
228+
return FLB_RETRY;
229+
}
230+
231+
msgpack_unpacked_init(&result);
226232
while (msgpack_unpack_next(&result, data, bytes, &off) ==
227233
MSGPACK_UNPACK_SUCCESS) {
228234

229-
size = off - prev_off;
230-
prev_off = off;
231235
if (result.data.type != MSGPACK_OBJECT_ARRAY) {
232236
continue;
233237
}
@@ -240,33 +244,35 @@ static int http_gelf(struct flb_out_http *ctx,
240244
flb_time_pop_from_msgpack(&tm, &result, &obj);
241245
map = root.via.array.ptr[1];
242246

243-
size = (size * 1.4);
244-
s = flb_sds_create_size(size);
245-
if (s == NULL) {
246-
msgpack_unpacked_destroy(&result);
247-
return FLB_RETRY;
248-
}
249-
250247
tmp = flb_msgpack_to_gelf(&s, &map, &tm, &(ctx->gelf_fields));
251248
if (tmp != NULL) {
252249
s = tmp;
253-
ret = http_post(ctx, s, flb_sds_len(s), tag, tag_len);
254-
if (ret != FLB_OK) {
255-
msgpack_unpacked_destroy(&result);
256-
flb_sds_destroy(s);
257-
return ret;
258-
}
259250
}
260251
else {
261252
flb_error("[out_http] error encoding to GELF");
253+
flb_sds_destroy(s);
254+
msgpack_unpacked_destroy(&result);
255+
return FLB_ERROR;
262256
}
263257

264-
flb_sds_destroy(s);
258+
/* Append new line */
259+
tmp = flb_sds_cat(s, "\n", 1);
260+
if (tmp != NULL) {
261+
s = tmp;
262+
}
263+
else {
264+
flb_error("[out_http] error concatenating records");
265+
flb_sds_destroy(s);
266+
msgpack_unpacked_destroy(&result);
267+
return FLB_RETRY;
268+
}
265269
}
266270

271+
ret = http_post(ctx, s, flb_sds_len(s), tag, tag_len);
272+
flb_sds_destroy(s);
267273
msgpack_unpacked_destroy(&result);
268274

269-
return FLB_OK;
275+
return ret;
270276
}
271277

272278
static void cb_http_flush(const void *data, size_t bytes,

0 commit comments

Comments
 (0)