Skip to content

Commit 1c1902a

Browse files
Leonardo Alminanaedsiper
authored andcommitted
filter_lua: fix bugs in multi record producers and record skipping
Signed-off-by: Leonardo Alminana <[email protected]>
1 parent 8aaf3ed commit 1c1902a

File tree

1 file changed

+59
-61
lines changed

1 file changed

+59
-61
lines changed

plugins/filter_lua/lua.c

Lines changed: 59 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -338,13 +338,46 @@ static int cb_lua_filter_mpack(const void *data, size_t bytes,
338338

339339
#else
340340

341+
static int pack_record(struct lua_filter *ctx,
342+
struct flb_log_event_encoder *log_encoder,
343+
struct flb_time *ts,
344+
msgpack_object *metadata,
345+
msgpack_object *body)
346+
{
347+
int ret;
348+
349+
ret = flb_log_event_encoder_begin_record(log_encoder);
350+
351+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
352+
ret = flb_log_event_encoder_set_timestamp(log_encoder, ts);
353+
}
354+
355+
if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) {
356+
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
357+
log_encoder, metadata);
358+
}
359+
360+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
361+
ret = flb_log_event_encoder_set_body_from_msgpack_object(
362+
log_encoder, body);
363+
}
364+
365+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
366+
ret = flb_log_event_encoder_commit_record(log_encoder);
367+
}
368+
369+
return ret;
370+
}
371+
341372
static int pack_result (struct lua_filter *ctx, struct flb_time *ts,
342373
msgpack_object *metadata,
343374
struct flb_log_event_encoder *log_encoder,
344375
char *data, size_t bytes)
345376
{
346377
int ret;
378+
size_t index = 0;
347379
size_t off = 0;
380+
msgpack_object *entry;
348381
msgpack_unpacked result;
349382
int map_detected;
350383
struct flb_log_event_decoder log_decoder;
@@ -361,85 +394,46 @@ static int pack_result (struct lua_filter *ctx, struct flb_time *ts,
361394
}
362395

363396
if (result.data.type == MSGPACK_OBJECT_MAP) {
364-
map_detected = FLB_TRUE;
397+
ret = pack_record(ctx, log_encoder,
398+
ts, metadata, &result.data);
365399

366-
if (result.data.via.map.size <= 0) {
367-
msgpack_unpacked_destroy(&result);
368-
369-
return FLB_FALSE;
370-
}
371-
}
372-
else if (result.data.type == MSGPACK_OBJECT_ARRAY) {
373-
map_detected = FLB_FALSE;
374-
}
375-
else {
376400
msgpack_unpacked_destroy(&result);
377401

378-
return FLB_FALSE;
379-
}
380-
381-
msgpack_unpacked_destroy(&result);
382-
383-
if (!map_detected) {
384-
ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes);
385-
386-
if (ret != FLB_EVENT_DECODER_SUCCESS) {
387-
flb_plg_error(ctx->ins,
388-
"Log event decoder initialization error : %d", ret);
389-
402+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
390403
return FLB_FALSE;
391404
}
392405

393-
while ((ret = flb_log_event_decoder_next(
394-
&log_decoder,
395-
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
396-
ret = flb_log_event_encoder_begin_record(log_encoder);
406+
return FLB_TRUE;
407+
}
408+
else if (result.data.type == MSGPACK_OBJECT_ARRAY) {
409+
for (index = 0 ; index < result.data.via.array.size ; index++) {
410+
entry = &result.data.via.array.ptr[index];
397411

398-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
399-
ret = flb_log_event_encoder_set_timestamp(log_encoder, ts);
400-
}
412+
if (entry->type == MSGPACK_OBJECT_MAP) {
413+
ret = pack_record(ctx, log_encoder,
414+
ts, metadata, entry);
401415

402-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
403-
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
404-
log_encoder, log_event.metadata);
405-
}
416+
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
417+
msgpack_unpacked_destroy(&result);
406418

407-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
408-
ret = flb_log_event_encoder_set_body_from_msgpack_object(
409-
log_encoder, log_event.body);
419+
return FLB_FALSE;
420+
}
410421
}
422+
else {
423+
msgpack_unpacked_destroy(&result);
411424

412-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
413-
ret = flb_log_event_encoder_commit_record(log_encoder);
425+
return FLB_FALSE;
414426
}
415427
}
416428

417-
flb_log_event_decoder_destroy(&log_decoder);
429+
msgpack_unpacked_destroy(&result);
418430

419431
return FLB_TRUE;
420432
}
421433

422-
ret = flb_log_event_encoder_begin_record(log_encoder);
423-
424-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
425-
ret = flb_log_event_encoder_set_timestamp(log_encoder, ts);
426-
}
427-
428-
if (ret == FLB_EVENT_ENCODER_SUCCESS && metadata != NULL) {
429-
ret = flb_log_event_encoder_set_metadata_from_msgpack_object(
430-
log_encoder, metadata);
431-
}
432-
433-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
434-
ret = flb_log_event_encoder_set_body_from_raw_msgpack(
435-
log_encoder, data, bytes);
436-
}
437-
438-
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
439-
ret = flb_log_event_encoder_commit_record(log_encoder);
440-
}
434+
msgpack_unpacked_destroy(&result);
441435

442-
return FLB_TRUE;
436+
return FLB_FALSE;
443437
}
444438

445439
static int cb_lua_filter(const void *data, size_t bytes,
@@ -626,7 +620,11 @@ static int cb_lua_filter(const void *data, size_t bytes,
626620
record_begining = record_end;
627621
}
628622

629-
if (log_encoder.output_length > 0) {
623+
if (ret == FLB_EVENT_DECODER_ERROR_INSUFFICIENT_DATA) {
624+
ret = FLB_EVENT_ENCODER_SUCCESS;
625+
}
626+
627+
if (ret == FLB_EVENT_ENCODER_SUCCESS) {
630628
*out_buf = log_encoder.output_buffer;
631629
*out_bytes = log_encoder.output_length;
632630

0 commit comments

Comments
 (0)