Skip to content

Commit 2431eba

Browse files
cosmo0920edsiper
authored andcommitted
out_chronicle: Handle partially succeeded to encode cases
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 8a43a3e commit 2431eba

File tree

1 file changed

+95
-58
lines changed

1 file changed

+95
-58
lines changed

plugins/out_chronicle/chronicle.c

Lines changed: 95 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -664,6 +664,13 @@ static int count_mp_with_threshold(size_t last_offset, size_t threshold,
664664
return array_size;
665665
}
666666

667+
struct chronicle_entry {
668+
flb_sds_t log_text;
669+
size_t log_text_size;
670+
struct flb_time timestamp;
671+
struct cfl_list _head;
672+
};
673+
667674
static int chronicle_format(const void *data, size_t bytes,
668675
const char *tag, size_t tag_len,
669676
char **out_data, size_t *out_size,
@@ -689,12 +696,88 @@ static int chronicle_format(const void *data, size_t bytes,
689696
flb_sds_t log_text = NULL;
690697
int log_text_size;
691698
char *json_str;
699+
struct cfl_list entry_list;
700+
struct chronicle_entry *entry;
701+
struct cfl_list *tmp;
702+
struct cfl_list *head;
703+
704+
cfl_list_init(&entry_list);
692705

693706
array_size = count_mp_with_threshold(last_offset, threshold, log_decoder, ctx);
694707

695708
/* Reset the decoder state */
696709
flb_log_event_decoder_reset(log_decoder, (char *) data, bytes);
697710

711+
flb_plg_trace(ctx->ins, "last offset is %zu", last_offset);
712+
/* Adjust decoder offset */
713+
if (last_offset != 0) {
714+
log_decoder->offset = last_offset;
715+
}
716+
717+
while ((ret = flb_log_event_decoder_next(
718+
log_decoder,
719+
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
720+
off = log_decoder->offset;
721+
alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */
722+
last_off = off;
723+
724+
if (ctx->log_key != NULL) {
725+
log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event);
726+
if (log_text == NULL) {
727+
flb_plg_error(ctx->ins, "log_key extraction failed, skipping record");
728+
continue;
729+
}
730+
log_text_size = flb_sds_len(log_text);
731+
}
732+
else {
733+
json_str = flb_msgpack_to_json_str(alloc_size, log_event.body);
734+
if (json_str == NULL) {
735+
flb_plg_error(ctx->ins, "Could not convert record to json string");
736+
msgpack_sbuffer_destroy(&mp_sbuf);
737+
return -1;
738+
}
739+
log_text = flb_sds_create(json_str);
740+
flb_free(json_str); /* free the original buffer */
741+
742+
if (log_text == NULL) {
743+
flb_plg_error(ctx->ins, "Could not create sds string for log");
744+
msgpack_sbuffer_destroy(&mp_sbuf);
745+
return -1;
746+
}
747+
log_text_size = flb_sds_len(log_text);
748+
}
749+
750+
if (log_text == NULL) {
751+
flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
752+
return -1;
753+
}
754+
755+
entry = flb_malloc(sizeof(struct chronicle_entry));
756+
if (!entry) {
757+
continue;
758+
}
759+
760+
entry->log_text = log_text;
761+
entry->log_text_size = log_text_size;
762+
entry->timestamp = log_event.timestamp;
763+
764+
cfl_list_add(&entry->_head, &entry_list);
765+
766+
if (off >= (threshold + last_offset)) {
767+
flb_plg_debug(ctx->ins,
768+
"the offset %zu is exceeded the threshold %zu. "
769+
"Splitting the payload over the threshold so the processed array size has %d.",
770+
off, threshold, array_size);
771+
772+
break;
773+
}
774+
}
775+
776+
/* If the list is empty, no records were valid. */
777+
if (cfl_list_is_empty(&entry_list)) {
778+
return -1;
779+
}
780+
698781
/* Create temporary msgpack buffer */
699782
msgpack_sbuffer_init(&mp_sbuf);
700783
msgpack_packer_init(&mp_pck, &mp_sbuf, msgpack_sbuffer_write);
@@ -738,21 +821,11 @@ static int chronicle_format(const void *data, size_t bytes,
738821
msgpack_pack_str_body(&mp_pck, "entries", 7);
739822

740823
/* Append entries */
741-
msgpack_pack_array(&mp_pck, array_size);
742-
743-
flb_plg_trace(ctx->ins, "last offset is %zu", last_offset);
744-
/* Adjust decoder offset */
745-
if (last_offset != 0) {
746-
log_decoder->offset = last_offset;
747-
}
748-
749-
while ((ret = flb_log_event_decoder_next(
750-
log_decoder,
751-
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {
752-
off = log_decoder->offset;
753-
alloc_size = (off - last_off) + 128; /* JSON is larger than msgpack */
754-
last_off = off;
824+
msgpack_pack_array(&mp_pck, cfl_list_size(&entry_list));
755825

826+
/* Iterate the list of valid entries and pack them */
827+
cfl_list_foreach_safe(head, tmp, &entry_list) {
828+
entry = cfl_list_entry(head, struct chronicle_entry, _head);
756829
/*
757830
* Pack entries
758831
*
@@ -767,64 +840,28 @@ static int chronicle_format(const void *data, size_t bytes,
767840
/* log_text */
768841
msgpack_pack_str(&mp_pck, 8);
769842
msgpack_pack_str_body(&mp_pck, "log_text", 8);
770-
if (ctx->log_key != NULL) {
771-
log_text = flb_pack_msgpack_extract_log_key(ctx, bytes, log_event);
772-
if (log_text == NULL) {
773-
flb_plg_error(ctx->ins, "log_key extraction failed, skipping record");
774-
msgpack_sbuffer_destroy(&mp_sbuf);
775-
return -1;
776-
}
777-
log_text_size = flb_sds_len(log_text);
778-
}
779-
else {
780-
json_str = flb_msgpack_to_json_str(alloc_size, log_event.body);
781-
if (json_str == NULL) {
782-
flb_plg_error(ctx->ins, "Could not convert record to json string");
783-
msgpack_sbuffer_destroy(&mp_sbuf);
784-
return -1;
785-
}
786-
log_text = flb_sds_create(json_str);
787-
flb_free(json_str); /* free the original buffer */
788-
789-
if (log_text == NULL) {
790-
flb_plg_error(ctx->ins, "Could not create sds string for log");
791-
msgpack_sbuffer_destroy(&mp_sbuf);
792-
return -1;
793-
}
794-
log_text_size = flb_sds_len(log_text);
795-
}
843+
msgpack_pack_str(&mp_pck, entry->log_text_size);
844+
msgpack_pack_str_body(&mp_pck, entry->log_text, entry->log_text_size);
796845

797-
if (log_text == NULL) {
798-
flb_plg_error(ctx->ins, "Could not marshal msgpack to output string");
799-
return -1;
800-
}
801-
msgpack_pack_str(&mp_pck, log_text_size);
802-
msgpack_pack_str_body(&mp_pck, log_text, log_text_size);
803-
804-
flb_sds_destroy(log_text);
805846
/* timestamp */
806847
msgpack_pack_str(&mp_pck, 10);
807848
msgpack_pack_str_body(&mp_pck, "ts_rfc3339", 10);
808849

809-
gmtime_r(&log_event.timestamp.tm.tv_sec, &tm);
850+
gmtime_r(&entry->timestamp.tm.tv_sec, &tm);
810851
s = strftime(time_formatted, sizeof(time_formatted) - 1,
811852
FLB_STD_TIME_FMT, &tm);
812853
len = snprintf(time_formatted + s, sizeof(time_formatted) - 1 - s,
813854
".%03" PRIu64 "Z",
814-
(uint64_t) log_event.timestamp.tm.tv_nsec);
855+
(uint64_t) entry->timestamp.tm.tv_nsec);
815856
s += len;
816857

817858
msgpack_pack_str(&mp_pck, s);
818859
msgpack_pack_str_body(&mp_pck, time_formatted, s);
819860

820-
if (off >= (threshold + last_offset)) {
821-
flb_plg_debug(ctx->ins,
822-
"the offset %zu is exceeded the threshold %zu. "
823-
"Splitting the payload over the threshold so the processed array size has %d.",
824-
off, threshold, array_size);
825-
826-
break;
827-
}
861+
/* Clean up the entry now that it's packed */
862+
flb_sds_destroy(entry->log_text);
863+
cfl_list_del(&entry->_head);
864+
flb_free(entry);
828865
}
829866

830867
/* Convert from msgpack to JSON */

0 commit comments

Comments
 (0)