Skip to content
Open
198 changes: 184 additions & 14 deletions plugins/in_kubernetes_events/kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -746,32 +746,159 @@ static int process_http_chunk(struct k8s_events* ctx, struct flb_http_client *c,
size_t token_size = 0;
char *token_start = 0;
char *token_end = NULL;
char *search_start;
size_t remaining;
flb_sds_t working_buffer = NULL;
size_t buffer_len;

/*
* Prepend any buffered incomplete data from previous chunks.
* HTTP chunked encoding can split JSON objects across chunk boundaries,
* so we need to buffer incomplete data until we find a complete JSON line.
*/
if (ctx->chunk_buffer != NULL) {
buffer_len = flb_sds_len(ctx->chunk_buffer);
flb_plg_debug(ctx->ins, "prepending %zu bytes from chunk_buffer to %zu new bytes",
buffer_len, c->resp.payload_size);
working_buffer = flb_sds_cat(ctx->chunk_buffer, c->resp.payload, c->resp.payload_size);
if (!working_buffer) {
flb_plg_error(ctx->ins, "failed to concatenate chunk buffer");
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
return -1;
}
/*
* flb_sds_cat modifies and returns the first argument, so working_buffer
* IS ctx->chunk_buffer (reallocated). Clear our reference to it.
*/
ctx->chunk_buffer = NULL;
token_start = working_buffer;
}
else {
flb_plg_debug(ctx->ins, "processing %zu bytes from new chunk", c->resp.payload_size);
token_start = c->resp.payload;
}

token_start = c->resp.payload;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
while ( token_end != NULL && ret == 0 ) {
search_start = token_start;
token_end = strstr(search_start, JSON_ARRAY_DELIM);

while (token_end != NULL && ret == 0) {
token_size = token_end - token_start;

/* Skip empty lines */
if (token_size == 0) {
token_start = token_end + strlen(JSON_ARRAY_DELIM);
search_start = token_start;
token_end = strstr(search_start, JSON_ARRAY_DELIM);
continue;
}

ret = flb_pack_json(token_start, token_size, &buf_data, &buf_size, &root_type, &consumed);
if (ret == -1) {
flb_plg_debug(ctx->ins, "could not process payload, incomplete or bad formed JSON: %s",
c->resp.payload);
if (ret == 0) {
/* Successfully parsed JSON */
flb_plg_debug(ctx->ins, "successfully parsed JSON event (%zu bytes)", token_size);
ret = process_watched_event(ctx, buf_data, buf_size);
flb_free(buf_data);
buf_data = NULL;

token_start = token_end + strlen(JSON_ARRAY_DELIM);
search_start = token_start;
token_end = strstr(search_start, JSON_ARRAY_DELIM);
}
else {
*bytes_consumed += token_size + 1;
ret = process_watched_event(ctx, buf_data, buf_size);
/* JSON parse failed - this line is incomplete, don't advance */
flb_plg_debug(ctx->ins, "JSON parse failed for %zu bytes at offset %ld - will buffer",
token_size, token_start - (working_buffer ? working_buffer : c->resp.payload));
break;
}

flb_free(buf_data);
if (buf_data) {
buf_data = NULL;
}

/*
* Calculate how many bytes we consumed and buffer any remaining tail.
*
* When working_buffer exists: it = old_buffered + new_payload
* When no working_buffer: we're processing c->resp.payload directly
*
* In both cases, we need to:
* 1. Calculate what we actually consumed (token_start advanced position)
* 2. Buffer the remaining unparsed tail for next chunk
* 3. Report to HTTP client how many NEW bytes we consumed
*/
if (working_buffer) {
/* working_buffer = old + new; token_start shows total progress */
size_t total_consumed = (size_t)(token_start - working_buffer);
size_t total_len = flb_sds_len(working_buffer);

if (total_consumed > total_len) {
total_consumed = total_len;
}

remaining = total_len - total_consumed;

if (remaining > 0) {
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
if (!ctx->chunk_buffer) {
flb_plg_error(ctx->ins, "failed to create chunk buffer");
flb_sds_destroy(working_buffer);
if (buf_data) {
flb_free(buf_data);
}
return -1;
}
}
token_start = token_end+1;
token_end = strpbrk(token_start, JSON_ARRAY_DELIM);
else {
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
}

/*
* Always report that we consumed the full NEW payload.
* Even if we buffered data without parsing, we ACCEPTED the bytes
* from the HTTP layer, so we must tell it to discard them.
* Reporting 0 would cause the HTTP client to re-send the same data.
*/
*bytes_consumed = c->resp.payload_size;
}
else {
/* No buffered data; token_start shows progress in current payload */
size_t consumed = (size_t)(token_start - c->resp.payload);

if (consumed > c->resp.payload_size) {
consumed = c->resp.payload_size;
}

remaining = c->resp.payload_size - consumed;

if (remaining > 0) {
flb_plg_debug(ctx->ins, "buffering %zu bytes of incomplete JSON data for next chunk", remaining);
ctx->chunk_buffer = flb_sds_create_len(token_start, remaining);
if (!ctx->chunk_buffer) {
flb_plg_error(ctx->ins, "failed to create chunk buffer");
if (buf_data) {
flb_free(buf_data);
}
return -1;
}
}
else {
flb_plg_debug(ctx->ins, "all data processed, no buffering needed");
}

/*
* Always report that we consumed the full payload.
* Even if we buffered data without parsing, we ACCEPTED the bytes.
*/
*bytes_consumed = c->resp.payload_size;
}

if (working_buffer) {
flb_sds_destroy(working_buffer);
}

if (buf_data) {
flb_free(buf_data);
}

return ret;
}

Expand Down Expand Up @@ -889,6 +1016,13 @@ static int check_and_init_stream(struct k8s_events *ctx)
flb_upstream_conn_release(ctx->current_connection);
ctx->current_connection = NULL;
}

/* Clear any buffered incomplete data on failure */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}

return FLB_FALSE;
}

Expand All @@ -899,6 +1033,11 @@ static int k8s_events_collect(struct flb_input_instance *ins,
struct k8s_events *ctx = in_context;
size_t bytes_consumed;
int chunk_proc_ret;
int buf_ret;
int root_type;
size_t consumed;
char *buf_data;
size_t buf_size;

if (pthread_mutex_trylock(&ctx->lock) != 0) {
FLB_INPUT_RETURN(0);
Expand All @@ -921,12 +1060,37 @@ static int k8s_events_collect(struct flb_input_instance *ins,
}
/* NOTE: skipping any processing after streaming socket closes */

/* Safety check: streaming_client might be NULL after error/processing */
if (!ctx->streaming_client) {
pthread_mutex_unlock(&ctx->lock);
FLB_INPUT_RETURN(0);
}

if (ctx->streaming_client->resp.status != 200 || ret == FLB_HTTP_ERROR || ret == FLB_HTTP_OK) {
if (ret == FLB_HTTP_ERROR) {
flb_plg_warn(ins, "kubernetes chunked stream error.");
}
else if (ret == FLB_HTTP_OK) {
flb_plg_info(ins, "kubernetes stream closed by api server. Reconnect will happen on next interval.");

/*
* If there's buffered data when stream closes, try to process it.
* This handles the case where the last chunk doesn't end with a newline.
*/
if (ctx->chunk_buffer && flb_sds_len(ctx->chunk_buffer) > 0) {
consumed = 0;
buf_data = NULL;

buf_ret = flb_pack_json(ctx->chunk_buffer, flb_sds_len(ctx->chunk_buffer),
&buf_data, &buf_size, &root_type, &consumed);
if (buf_ret == 0) {
process_watched_event(ctx, buf_data, buf_size);
}

if (buf_data) {
flb_free(buf_data);
}
}
}
else {
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
Expand All @@ -938,6 +1102,12 @@ static int k8s_events_collect(struct flb_input_instance *ins,
flb_upstream_conn_release(ctx->current_connection);
ctx->streaming_client = NULL;
ctx->current_connection = NULL;

/* Clear any buffered incomplete data when stream closes */
if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}
}

pthread_mutex_unlock(&ctx->lock);
Expand Down
3 changes: 3 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,9 @@ struct k8s_events {
struct flb_connection *current_connection;
struct flb_http_client *streaming_client;

/* Buffer for incomplete JSON data from chunked responses */
flb_sds_t chunk_buffer;

/* limit for event queries */
int limit_request;
/* last highest seen resource_version */
Expand Down
18 changes: 18 additions & 0 deletions plugins/in_kubernetes_events/kubernetes_events_conf.c
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@ struct k8s_events *k8s_events_conf_create(struct flb_input_instance *ins)
pthread_mutexattr_init(&attr);
pthread_mutex_init(&ctx->lock, &attr);

/* Initialize buffer for incomplete chunk data */
ctx->chunk_buffer = NULL;

/* Load the config map */
ret = flb_input_config_map_set(ins, (void *) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -326,43 +329,58 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)

if (ctx->ra_resource_version) {
flb_ra_destroy(ctx->ra_resource_version);
ctx->ra_resource_version = NULL;
}

if (ctx->chunk_buffer) {
flb_sds_destroy(ctx->chunk_buffer);
ctx->chunk_buffer = NULL;
}

if(ctx->streaming_client) {
flb_http_client_destroy(ctx->streaming_client);
ctx->streaming_client = NULL;
}

if(ctx->current_connection) {
flb_upstream_conn_release(ctx->current_connection);
ctx->current_connection = NULL;
}

if (ctx->upstream) {
flb_upstream_destroy(ctx->upstream);
ctx->upstream = NULL;
}

if (ctx->encoder) {
flb_log_event_encoder_destroy(ctx->encoder);
ctx->encoder = NULL;
}

if (ctx->api_host) {
flb_free(ctx->api_host);
ctx->api_host = NULL;
}
if (ctx->token) {
flb_free(ctx->token);
ctx->token = NULL;
}
if (ctx->auth) {
flb_free(ctx->auth);
ctx->auth = NULL;
}

#ifdef FLB_HAVE_TLS
if (ctx->tls) {
flb_tls_destroy(ctx->tls);
ctx->tls = NULL;
}
#endif

#ifdef FLB_HAVE_SQLDB
if (ctx->db) {
flb_kubernetes_event_db_close(ctx->db);
ctx->db = NULL;
}
#endif

Expand Down
50 changes: 50 additions & 0 deletions tests/runtime/in_kubernetes_events.c
Original file line number Diff line number Diff line change
Expand Up @@ -619,10 +619,60 @@ void flb_test_config_db_locking_values()
}
#endif

/* Test with smaller chunks - splits single event into 3 chunks */
void flb_test_events_with_3chunks()
{
struct flb_lib_out_cb cb_data;
struct test_ctx *ctx;
int trys;

int ret;
int num;
const char *filename = "eventlist_v1_with_lastTimestamp";
const char *stream_filename = "watch_v1_with_lastTimestamp";

clear_output_num();

/* Use 400 byte chunks to split 1176-byte JSON into 3 chunks */
struct test_k8s_server_ctx* k8s_server = initialize_mock_k8s_api(
filename, stream_filename, 400
);

cb_data.cb = cb_check_result_json;
cb_data.data = (void *)k8s_server;

ctx = test_ctx_create(&cb_data);
if (!TEST_CHECK(ctx != NULL)) {
TEST_MSG("test_ctx_create failed");
exit(EXIT_FAILURE);
}

ret = flb_start(ctx->flb);
TEST_CHECK(ret == 0);

/* waiting to flush */
for (trys = 0; trys < 5 && get_output_num() <= 1; trys++) {
flb_time_msleep(1000);
}

num = get_output_num();
if (!TEST_CHECK(num >= 2)) {
TEST_MSG("2 output records are expected found %d", num);
}

/* Stop Fluent Bit before destroying mock server to properly close connections */
flb_stop(ctx->flb);
flb_time_msleep(500); /* Give threads time to shut down */

mock_k8s_api_destroy(k8s_server);
test_ctx_destroy(ctx);
}

TEST_LIST = {
{"events_v1_with_lastTimestamp", flb_test_events_v1_with_lastTimestamp},
{"events_v1_with_creationTimestamp", flb_test_events_v1_with_creationTimestamp},
//{"events_v1_with_chunkedrecv", flb_test_events_with_chunkedrecv},
{"events_v1_with_3chunks", flb_test_events_with_3chunks},
#ifdef FLB_HAVE_SQLDB
{"config_db_sync_values", flb_test_config_db_sync_values},
{"config_db_journal_mode_values", flb_test_config_db_journal_mode_values},
Expand Down