Skip to content

Commit 6bddcbd

Browse files
committed
fix(in_kubernetes_events): buffer incomplete JSON across HTTP chunk boundaries
Fixes #11252 When HTTP chunked transfer encoding splits JSON event objects across chunk boundaries, implement buffering to handle incomplete data until a complete JSON message is received. This prevents 'bad formed JSON' errors and watch stream stalls. - Add chunk_buffer field to k8s_events struct - Modify process_http_chunk() to buffer incomplete JSON data - Process buffered data when stream closes - Add test with 400-byte chunks splitting events into 3 parts Signed-off-by: Jesse Awan <jesse.awan@sap.com>
1 parent c88c545 commit 6bddcbd

File tree

4 files changed

+176
-10
lines changed

4 files changed

+176
-10
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 120 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -746,32 +746,107 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
746746
size_t token_size = 0;
747747
char *token_start = 0;
748748
char *token_end = NULL;
749+
char *search_start;
750+
size_t remaining;
751+
flb_sds_t working_buffer = NULL;
752+
753+
/*
754+
* Prepend any buffered incomplete data from previous chunks.
755+
* HTTP chunked encoding can split JSON objects across chunk boundaries,
756+
* so we need to buffer incomplete data until we find a complete JSON line.
757+
*/
758+
if (ctx->chunk_buffer != NULL) {
759+
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
760+
if (!working_buffer) {
761+
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
762+
flb_sds_destroy(ctx->chunk_buffer);
763+
ctx->chunk_buffer = NULL;
764+
return -1;
765+
}
766+
/*
767+
* flb_sds_cat modifies and returns the first argument, so working_buffer
768+
* IS ctx->chunk_buffer (reallocated). Clear our reference to it.
769+
*/
770+
ctx->chunk_buffer = NULL;
771+
token_start = working_buffer;
772+
}
773+
else {
774+
token_start = c->resp.payload;
775+
}
749776

750-
token_start = c->resp.payload;
751-
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
752-
while ( token_end != NULL && ret == 0 ) {
777+
search_start = token_start;
778+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
779+
780+
while (token_end != NULL && ret == 0) {
753781
token_size = token_end - token_start;
782+
783+
/* Skip empty lines */
784+
if (token_size == 0) {
785+
token_start = token_end + 1;
786+
search_start = token_start;
787+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
788+
continue;
789+
}
790+
754791
ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
755792
if (ret == -1) {
756-
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
757-
c->resp.payload);
793+
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON");
758794
}
759795
else {
760-
*bytes_consumed += token_size + 1;
796+
/*
797+
* For non-buffered data, track consumed bytes.
798+
* For buffered data, we'll mark everything consumed after the loop.
799+
*/
800+
if (!working_buffer) {
801+
*bytes_consumed += token_size + 1;
802+
}
761803
ret = process_watched_event(ctx, buf_data, buf_size);
762804
}
763805

764806
flb_free(buf_data);
765-
if (buf_data) {
766-
buf_data = NULL;
807+
buf_data = NULL;
808+
809+
token_start = token_end + 1;
810+
search_start = token_start;
811+
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
812+
}
813+
814+
/*
815+
* Always consume all bytes from the current chunk since we've examined them all.
816+
* Even if we buffer the data, we've still "consumed" it from the HTTP payload.
817+
*/
818+
*bytes_consumed = c->resp.payload_size;
819+
820+
/*
821+
* If there's remaining data without a newline delimiter, it means the JSON
822+
* object is incomplete (split across chunk boundaries). Buffer it for next chunk.
823+
*/
824+
if (working_buffer) {
825+
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
826+
}
827+
else {
828+
remaining = c->resp.payload_size - (token_start - c->resp.payload);
829+
}
830+
831+
if (remaining > 0 && ret == 0) {
832+
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
833+
if (!ctx->chunk_buffer) {
834+
flb_plg_error(ctx->ins, "failed to create chunk buffer");
835+
ret = -1;
836+
}
837+
else {
838+
flb_plg_trace(ctx->ins, "buffering %zu bytes of incomplete JSON data", remaining);
767839
}
768-
token_start = token_end+1;
769-
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
770840
}
771841

842+
if (working_buffer) {
843+
flb_sds_destroy(working_buffer);
844+
}
845+
772846
if (buf_data) {
773847
flb_free(buf_data);
774848
}
849+
775850
return ret;
776851
}
777852

@@ -889,6 +964,13 @@ static int check_and_init_stream(struct k8s_events *ctx)
889964
flb_upstream_conn_release(ctx->current_connection);
890965
ctx->current_connection = NULL;
891966
}
967+
968+
/* Clear any buffered incomplete data on failure */
969+
if (ctx->chunk_buffer) {
970+
flb_sds_destroy(ctx->chunk_buffer);
971+
ctx->chunk_buffer = NULL;
972+
}
973+
892974
return FLB_FALSE;
893975
}
894976

@@ -927,6 +1009,28 @@ static int k8s_events_collect(struct flb_input_instance *ins,
9271009
}
9281010
else if (ret == FLB_HTTP_OK) {
9291011
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");
1012+
1013+
/*
1014+
* If there's buffered data when stream closes, try to process it.
1015+
* This handles the case where the last chunk doesn't end with a newline.
1016+
*/
1017+
if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) {
1018+
int buf_ret;
1019+
int root_type;
1020+
size_t consumed = 0;
1021+
char *buf_data = NULL;
1022+
size_t buf_size;
1023+
1024+
buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer),
1025+
&buf_data, &buf_size, &root_type, &consumed);
1026+
if (buf_ret == 0) {
1027+
process_watched_event(ctx, buf_data, buf_size);
1028+
}
1029+
1030+
if (buf_data) {
1031+
flb_free(buf_data);
1032+
}
1033+
}
9301034
}
9311035
else {
9321036
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
@@ -938,6 +1042,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
9381042
flb_upstream_conn_release(ctx->current_connection);
9391043
ctx->streaming_client = NULL;
9401044
ctx->current_connection = NULL;
1045+
1046+
/* Clear any buffered incomplete data when stream closes */
1047+
if (ctx->chunk_buffer) {
1048+
flb_sds_destroy(ctx->chunk_buffer);
1049+
ctx->chunk_buffer = NULL;
1050+
}
9411051
}
9421052

9431053
pthread_mutex_unlock(&ctx->lock);

plugins/in_kubernetes_events/kubernetes_events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,9 @@ struct k8s_events {
8585
struct flb_connection *current_connection;
8686
struct flb_http_client *streaming_client;
8787

88+
/* Buffer for incomplete JSON data from chunked responses */
89+
flb_sds_t chunk_buffer;
90+
8891
/* limit for event queries */
8992
int limit_request;
9093
/* last highest seen resource_version */

plugins/in_kubernetes_events/kubernetes_events_conf.c

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
158158
pthread_mutexattr_init(&attr);
159159
pthread_mutex_init(&ctx->lock, &attr);
160160

161+
/* Initialize buffer for incomplete chunk data */
162+
ctx->chunk_buffer = NULL;
163+
161164
/* Load the config map */
162165
ret = flb_input_config_map_set(ins, (void *) ctx);
163166
if (ret == -1) {
@@ -289,6 +292,10 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
289292
flb_ra_destroy(ctx->ra_resource_version);
290293
}
291294

295+
if (ctx->chunk_buffer) {
296+
flb_sds_destroy(ctx->chunk_buffer);
297+
}
298+
292299
if(ctx->streaming_client) {
293300
flb_http_client_destroy(ctx->streaming_client);
294301
}

tests/runtime/in_kubernetes_events.c

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,10 +444,56 @@ void flb_test_events_with_chunkedrecv()
444444
test_ctx_destroy(ctx);
445445
}
446446

447+
/* Test with smaller chunks - splits single event into 3 chunks */
448+
void flb_test_events_with_3chunks()
449+
{
450+
struct flb_lib_out_cb cb_data;
451+
struct test_ctx *ctx;
452+
int trys;
453+
454+
int ret;
455+
int num;
456+
const char *filename = "eventlist_v1_with_lastTimestamp";
457+
const char *stream_filename = "watch_v1_with_lastTimestamp";
458+
459+
clear_output_num();
460+
461+
/* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */
462+
struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
463+
filename, stream_filename, 400
464+
);
465+
466+
cb_data.cb = cb_check_result_json;
467+
cb_data.data = (void *)k8s_server;
468+
469+
ctx = test_ctx_create(&cb_data);
470+
if (!TEST_CHECK(ctx != NULL)) {
471+
TEST_MSG("test_ctx_create failed");
472+
exit(EXIT_FAILURE);
473+
}
474+
475+
ret = flb_start(ctx->flb);
476+
TEST_CHECK(ret == 0);
477+
478+
/* waiting to flush */
479+
for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) {
480+
flb_time_msleep(1000);
481+
}
482+
483+
num = get_output_num();
484+
if (!TEST_CHECK(num >= 2)) {
485+
TEST_MSG("2 output records are expected found %d", num);
486+
}
487+
488+
mock_k8s_api_destroy(k8s_server);
489+
test_ctx_destroy(ctx);
490+
}
491+
447492
TEST_LIST = {
448493
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
449494
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
450495
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
496+
{"events_v1_with_3chunks", flb_test_events_with_3chunks},
451497
{NULL, NULL}
452498
};
453499

0 commit comments

Comments
 (0)