Skip to content

Commit cde2a9c

Browse files
committed
fix(in_kubernetes_events): correct chunked response parsing and memory safety
Signed-off-by: Jesse Awan <jesse.awan@sap.com>
1 parent 8b3a918 commit cde2a9c

File tree

2 files changed

+97
-33
lines changed

2 files changed

+97
-33
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 86 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -780,16 +780,16 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
780780
}
781781

782782
search_start = token_start;
783-
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
783+
token_end = strstr(search_start, JSON_ARRAY_DELIM);
784784

785785
while (token_end != NULL && ret == 0) {
786786
token_size = token_end - token_start;
787787

788788
/* Skip empty lines */
789789
if (token_size == 0) {
790-
token_start = token_end + 1;
790+
token_start = token_end + strlen(JSON_ARRAY_DELIM);
791791
search_start = token_start;
792-
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
792+
token_end = strstr(search_start, JSON_ARRAY_DELIM);
793793
continue;
794794
}
795795

@@ -801,9 +801,9 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
801801
flb_free(buf_data);
802802
buf_data = NULL;
803803

804-
token_start = token_end + 1;
804+
token_start = token_end + strlen(JSON_ARRAY_DELIM);
805805
search_start = token_start;
806-
token_end = strpbrk(search_start, JSON_ARRAY_DELIM);
806+
token_end = strstr(search_start, JSON_ARRAY_DELIM);
807807
}
808808
else {
809809
/* JSON parse failed - this line is incomplete, don't advance */
@@ -814,43 +814,90 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
814814
}
815815

816816
/*
817-
* Calculate remaining unparsed data.
818-
* If we broke out of the loop due to parse failure or no newline found,
819-
* buffer the remaining data for the next chunk.
817+
* Calculate how many bytes we consumed and buffer any remaining tail.
818+
*
819+
* When working_buffer exists: it = old_buffered + new_payload
820+
* When no working_buffer: we're processing c->resp.payload directly
821+
*
822+
* In both cases, we need to:
823+
* 1. Calculate what we actually consumed (token_start advanced position)
824+
* 2. Buffer the remaining unparsed tail for next chunk
825+
* 3. Report to HTTP client how many NEW bytes we consumed
820826
*/
821827
if (working_buffer) {
822-
remaining = flb_sds_len(working_buffer) - (token_start - working_buffer);
823-
}
824-
else {
825-
remaining = c->resp.payload_size - (token_start - c->resp.payload);
826-
}
827-
828-
if (remaining > 0) {
829-
/* We have unparsed data - buffer it for next chunk */
830-
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
831-
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
832-
if (!ctx->chunk_buffer) {
833-
flb_plg_error(ctx->ins, "failed to create chunk buffer");
834-
if (working_buffer) {
828+
/* working_buffer = old + new; token_start shows total progress */
829+
size_t total_consumed = (size_t)(token_start - working_buffer);
830+
size_t total_len = flb_sds_len(working_buffer);
831+
832+
if (total_consumed > total_len) {
833+
total_consumed = total_len;
834+
}
835+
836+
remaining = total_len - total_consumed;
837+
838+
if (remaining > 0) {
839+
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
840+
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
841+
if (!ctx->chunk_buffer) {
842+
flb_plg_error(ctx->ins, "failed to create chunk buffer");
835843
flb_sds_destroy(working_buffer);
844+
if (buf_data) {
845+
flb_free(buf_data);
846+
}
847+
return -1;
836848
}
837-
if (buf_data) {
838-
flb_free(buf_data);
849+
}
850+
else {
851+
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
852+
}
853+
854+
/*
855+
* Calculate how many bytes of the NEW payload were consumed.
856+
* working_buffer = old_tail + new_payload
857+
* old_len = length of old tail, new_len = length of new payload
858+
* consumed_new = bytes of NEW payload consumed = total_consumed - old_len
859+
*/
860+
size_t new_len = c->resp.payload_size;
861+
size_t old_len = (total_len > new_len) ? (total_len - new_len) : 0;
862+
size_t consumed_new = 0;
863+
864+
if (total_consumed > old_len) {
865+
consumed_new = total_consumed - old_len;
866+
if (consumed_new > new_len) {
867+
consumed_new = new_len;
839868
}
840-
return -1;
841869
}
870+
871+
*bytes_consumed = consumed_new;
842872
}
843873
else {
844-
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
874+
/* No buffered data; token_start shows progress in current payload */
875+
size_t consumed = (size_t)(token_start - c->resp.payload);
876+
877+
if (consumed > c->resp.payload_size) {
878+
consumed = c->resp.payload_size;
879+
}
880+
881+
remaining = c->resp.payload_size - consumed;
882+
883+
if (remaining > 0) {
884+
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
885+
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
886+
if (!ctx->chunk_buffer) {
887+
flb_plg_error(ctx->ins, "failed to create chunk buffer");
888+
if (buf_data) {
889+
flb_free(buf_data);
890+
}
891+
return -1;
892+
}
893+
}
894+
else {
895+
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
896+
}
897+
898+
/* Report how much we consumed from current payload */
899+
*bytes_consumed = consumed;
845900
}
846-
847-
/*
848-
* At this point we've either parsed all complete lines and/or buffered
849-
* any remaining tail into ctx->chunk_buffer, so we no longer need any
850-
* bytes from this HTTP payload. Tell the HTTP client that the whole
851-
* payload has been consumed to avoid duplicates.
852-
*/
853-
*bytes_consumed = c->resp.payload_size;
854901

855902
if (working_buffer) {
856903
flb_sds_destroy(working_buffer);
@@ -1021,6 +1068,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
10211068
}
10221069
/* NOTE: skipping any processing after streaming socket closes */
10231070

1071+
/* Safety check: streaming_client might be NULL after error/processing */
1072+
if (!ctx->streaming_client) {
1073+
pthread_mutex_unlock(&ctx->lock);
1074+
FLB_INPUT_RETURN(0);
1075+
}
1076+
10241077
if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR || ret == FLB_HTTP_OK) {
10251078
if (ret == FLB_HTTP_ERROR) {
10261079
flb_plg_warn(ins, "kubernetes chunked stream error.");

plugins/in_kubernetes_events/kubernetes_events_conf.c

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,47 +329,58 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
329329

330330
if (ctx->ra_resource_version) {
331331
flb_ra_destroy(ctx->ra_resource_version);
332+
ctx->ra_resource_version = NULL;
332333
}
333334

334335
if (ctx->chunk_buffer) {
335336
flb_sds_destroy(ctx->chunk_buffer);
337+
ctx->chunk_buffer = NULL;
336338
}
337339

338340
if(ctx->streaming_client) {
339341
flb_http_client_destroy(ctx->streaming_client);
342+
ctx->streaming_client = NULL;
340343
}
341344

342345
if(ctx->current_connection) {
343346
flb_upstream_conn_release(ctx->current_connection);
347+
ctx->current_connection = NULL;
344348
}
345349

346350
if (ctx->upstream) {
347351
flb_upstream_destroy(ctx->upstream);
352+
ctx->upstream = NULL;
348353
}
349354

350355
if (ctx->encoder) {
351356
flb_log_event_encoder_destroy(ctx->encoder);
357+
ctx->encoder = NULL;
352358
}
353359

354360
if (ctx->api_host) {
355361
flb_free(ctx->api_host);
362+
ctx->api_host = NULL;
356363
}
357364
if (ctx->token) {
358365
flb_free(ctx->token);
366+
ctx->token = NULL;
359367
}
360368
if (ctx->auth) {
361369
flb_free(ctx->auth);
370+
ctx->auth = NULL;
362371
}
363372

364373
#ifdef FLB_HAVE_TLS
365374
if (ctx->tls) {
366375
flb_tls_destroy(ctx->tls);
376+
ctx->tls = NULL;
367377
}
368378
#endif
369379

370380
#ifdef FLB_HAVE_SQLDB
371381
if (ctx->db) {
372382
flb_kubernetes_event_db_close(ctx->db);
383+
ctx->db = NULL;
373384
}
374385
#endif
375386

0 commit comments

Comments
 (0)