Skip to content

Commit a6bf771

Browse files
committed
fix(in_kubernetes_events): add debug logging and fix buffering logic
- Add comprehensive debug logging to trace chunk processing - Fix buffering logic to only buffer when parse fails (incomplete JSON) - Previously was buffering all data even on successful parse Signed-off-by: Jesse Awan <jesse.awan@sap.com>
1 parent 6bddcbd commit a6bf771

File tree

1 file changed

+43
-28
lines changed

1 file changed

+43
-28
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,9 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
756756
* so we need to buffer incomplete data until we find a complete JSON line.
757757
*/
758758
if (ctx->chunk_buffer != NULL) {
759+
size_t buffer_len = flb_sds_len(ctx->chunk_buffer);
760+
flb_plg_debug(ctx->ins, "prepending %zu bytes from chunk_buffer to %zu new bytes",
761+
buffer_len, c->resp.payload_size);
759762
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
760763
if (!working_buffer) {
761764
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
@@ -771,6 +774,7 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
771774
token_start = working_buffer;
772775
}
773776
else {
777+
flb_plg_debug(ctx->ins, "processing %zu bytes from new chunk", c->resp.payload_size);
774778
token_start = c->resp.payload;
775779
}
776780

@@ -789,37 +793,32 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
789793
}
790794

791795
ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
792-
if (ret == -1) {
793-
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON");
794-
}
795-
else {
796-
/*
797-
* For non-buffered data, track consumed bytes.
798-
* For buffered data, we'll mark everything consumed after the loop.
799-
*/
796+
if (ret == 0) {
797+
/* Successfully parsed JSON */
798+
flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size);
800799
if (!working_buffer) {
801800
*bytes_consumed += token_size + 1;
802801
}
803802
ret = process_watched_event(ctx, buf_data, buf_size);
803+
flb_free(buf_data);
804+
buf_data = NULL;
805+
806+
token_start = token_end + 1;
807+
search_start = token_start;
808+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
809+
}
810+
else {
811+
/* JSON parse failed - this line is incomplete, don't advance */
812+
flb_plg_debug(ctx->ins, "JSON parse failed for %zu bytes at offset %ld - will buffer",
813+
token_size, token_start - (working_buffer ? working_buffer : c->resp.payload));
814+
break;
804815
}
805-
806-
flb_free(buf_data);
807-
buf_data = NULL;
808-
809-
token_start = token_end + 1;
810-
search_start = token_start;
811-
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
812816
}
813817

814818
/*
815-
* Always consume all bytes from the current chunk since we've examined them all.
816-
* Even if we buffer the data, we've still "consumed" it from the HTTP payload.
817-
*/
818-
*bytes_consumed = c->resp.payload_size;
819-
820-
/*
821-
* If there's remaining data without a newline delimiter, it means the JSON
822-
* object is incomplete (split across chunk boundaries). Buffer it for next chunk.
819+
* Calculate remaining unparsed data.
820+
* If we broke out of the loop due to parse failure or no newline found,
821+
* buffer the remaining data for the next chunk.
823822
*/
824823
if (working_buffer) {
825824
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
@@ -828,16 +827,32 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
828827
remaining = c->resp.payload_size - (token_start - c->resp.payload);
829828
}
830829

831-
if (remaining > 0 && ret == 0) {
830+
if (remaining > 0) {
831+
/* We have unparsed data - buffer it for next chunk */
832+
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
832833
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
833834
if (!ctx->chunk_buffer) {
834835
flb_plg_error(ctx->ins, "failed to create chunk buffer");
835-
ret = -1;
836-
}
837-
else {
838-
flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining);
836+
if (working_buffer) {
837+
flb_sds_destroy(working_buffer);
838+
}
839+
if (buf_data) {
840+
flb_free(buf_data);
841+
}
842+
return -1;
839843
}
840844
}
845+
else {
846+
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
847+
}
848+
849+
/*
850+
* Mark bytes consumed from the current HTTP chunk.
851+
* If we used working_buffer, all original payload bytes are consumed.
852+
*/
853+
if (working_buffer) {
854+
*bytes_consumed = c->resp.payload_size;
855+
}
841856

842857
if (working_buffer) {
843858
flb_sds_destroy(working_buffer);

0 commit comments

Comments
 (0)