Skip to content

Commit e48dddb

Browse files
ryanohnemusedsiper
authored andcommitted
in_kubernetes_events: add tests and move connection stream up to context
Signed-off-by: ryanohnemus <[email protected]>
1 parent a9a440c commit e48dddb

File tree

10 files changed

+564
-50
lines changed

10 files changed

+564
-50
lines changed

plugins/in_kubernetes_events/kubernetes_events.c

Lines changed: 81 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -586,7 +586,6 @@ static int process_event_list(struct k8s_events *ctx, char *in_data, size_t in_s
586586
}
587587

588588
static struct flb_http_client *make_event_watch_api_request(struct k8s_events *ctx,
589-
struct flb_connection *u_conn,
590589
uint64_t max_resource_version)
591590
{
592591
flb_sds_t url;
@@ -603,21 +602,20 @@ static struct flb_http_client *make_event_watch_api_request(struct k8s_events *c
603602

604603
flb_sds_printf(&url, "?watch=1&resourceVersion=%llu", max_resource_version);
605604
flb_plg_info(ctx->ins, "Requesting %s", url);
606-
c = flb_http_client(u_conn, FLB_HTTP_GET, url,
605+
c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
607606
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
608607
flb_sds_destroy(url);
609608
return c;
610609
}
611610

612611
static struct flb_http_client *make_event_list_api_request(struct k8s_events *ctx,
613-
struct flb_connection *u_conn,
614612
flb_sds_t continue_token)
615613
{
616614
flb_sds_t url;
617615
struct flb_http_client *c;
618616

619617
if (continue_token == NULL && ctx->limit_request == 0 && ctx->namespace == NULL) {
620-
return flb_http_client(u_conn, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
618+
return flb_http_client(ctx->current_connection, FLB_HTTP_GET, K8S_EVENTS_KUBE_API_URI,
621619
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
622620
}
623621

@@ -637,7 +635,7 @@ static struct flb_http_client *make_event_list_api_request(struct k8s_events *ct
637635
}
638636
flb_sds_printf(&url, "limit=%d", ctx->limit_request);
639637
}
640-
c = flb_http_client(u_conn, FLB_HTTP_GET, url,
638+
c = flb_http_client(ctx->current_connection, FLB_HTTP_GET, url,
641639
NULL, 0, ctx->api_host, ctx->api_port, NULL, 0);
642640
flb_sds_destroy(url);
643641
return c;
@@ -788,50 +786,50 @@ static void initialize_http_client(struct flb_http_client* c, struct k8s_events*
788786
}
789787
}
790788

791-
static int k8s_events_collect(struct flb_input_instance *ins,
792-
struct flb_config *config, void *in_context)
789+
static int check_and_init_stream(struct k8s_events *ctx)
793790
{
794-
int ret;
795-
size_t b_sent;
796-
struct flb_connection *u_conn = NULL;
797-
struct flb_http_client *c = NULL;
798-
struct k8s_events *ctx = in_context;
791+
/* Returns FLB_TRUE if stream has been initialized */
799792
flb_sds_t continue_token = NULL;
800793
uint64_t max_resource_version = 0;
801-
size_t bytes_consumed;
802-
int chunk_proc_ret;
794+
size_t b_sent;
795+
int ret;
796+
struct flb_http_client *c = NULL;
803797

804-
if (pthread_mutex_trylock(&ctx->lock) != 0) {
805-
FLB_INPUT_RETURN(0);
798+
/* if the streaming client is already active, just return it */
799+
if(ctx->streaming_client) {
800+
return FLB_TRUE;
806801
}
807802

808-
u_conn = flb_upstream_conn_get(ctx->upstream);
809-
if (!u_conn) {
810-
flb_plg_error(ins, "upstream connection initialization error");
811-
goto exit;
812-
}
803+
/* setup connection if one does not exist */
804+
if(!ctx->current_connection) {
805+
ctx->current_connection = flb_upstream_conn_get(ctx->upstream);
806+
if (!ctx->current_connection) {
807+
flb_plg_error(ctx->ins, "upstream connection initialization error");
808+
goto failure;
809+
}
813810

814-
ret = refresh_token_if_needed(ctx);
815-
if (ret == -1) {
816-
flb_plg_error(ctx->ins, "failed to refresh token");
817-
goto exit;
811+
ret = refresh_token_if_needed(ctx);
812+
if (ret == -1) {
813+
flb_plg_error(ctx->ins, "failed to refresh token");
814+
goto failure;
815+
}
818816
}
819817

820818
do {
821-
c = make_event_list_api_request(ctx, u_conn, continue_token);
819+
c = make_event_list_api_request(ctx, continue_token);
822820
if (continue_token != NULL) {
823821
flb_sds_destroy(continue_token);
824822
continue_token = NULL;
825823
}
826824
if (!c) {
827-
flb_plg_error(ins, "unable to create http client");
828-
goto exit;
825+
flb_plg_error(ctx->ins, "unable to create http client");
826+
goto failure;
829827
}
830828
initialize_http_client(c, ctx);
831829
ret = flb_http_do(c, &b_sent);
832830
if (ret != 0) {
833-
flb_plg_error(ins, "http do error");
834-
goto exit;
831+
flb_plg_error(ctx->ins, "http do error");
832+
goto failure;
835833
}
836834

837835
if (c->resp.status == 200 && c->resp.payload_size > 0) {
@@ -846,7 +844,7 @@ static int k8s_events_collect(struct flb_input_instance *ins,
846844
else {
847845
flb_plg_error(ctx->ins, "http_status=%i", c->resp.status);
848846
}
849-
goto exit;
847+
goto failure;
850848
}
851849
flb_http_client_destroy(c);
852850
c = NULL;
@@ -860,48 +858,81 @@ static int k8s_events_collect(struct flb_input_instance *ins,
860858
/* Now that we've done a full list, we can use the resource version and do a watch
861859
* to stream updates efficiently
862860
*/
863-
c = make_event_watch_api_request(ctx, u_conn, max_resource_version);
864-
if (!c) {
865-
flb_plg_error(ins, "unable to create http client");
866-
goto exit;
861+
ctx->streaming_client = make_event_watch_api_request(ctx, max_resource_version);
862+
if (!ctx->streaming_client) {
863+
flb_plg_error(ctx->ins, "unable to create http client");
864+
goto failure;
867865
}
868-
initialize_http_client(c, ctx);
866+
initialize_http_client(ctx->streaming_client, ctx);
869867

870868
/* Watch will stream chunked json data, so we only send
871869
* the http request, then use flb_http_get_response_data
872870
* to attempt processing on available streamed data
873871
*/
874872
b_sent = 0;
875-
ret = flb_http_do_request(c, &b_sent);
873+
ret = flb_http_do_request(ctx->streaming_client, &b_sent);
876874
if (ret != 0) {
877-
flb_plg_error(ins, "http do request error");
878-
goto exit;
875+
flb_plg_error(ctx->ins, "http do request error");
876+
goto failure;
877+
}
878+
879+
return FLB_TRUE;
880+
881+
failure:
882+
if (c) {
883+
flb_http_client_destroy(c);
884+
}
885+
if (ctx->streaming_client) {
886+
flb_http_client_destroy(ctx->streaming_client);
887+
ctx->streaming_client = NULL;
888+
}
889+
if (ctx->current_connection) {
890+
flb_upstream_conn_release(ctx->current_connection);
891+
ctx->current_connection = NULL;
892+
}
893+
return FLB_FALSE;
894+
}
895+
896+
static int k8s_events_collect(struct flb_input_instance *ins,
897+
struct flb_config *config, void *in_context)
898+
{
899+
int ret;
900+
struct k8s_events *ctx = in_context;
901+
size_t bytes_consumed;
902+
int chunk_proc_ret;
903+
904+
if (pthread_mutex_trylock(&ctx->lock) != 0) {
905+
FLB_INPUT_RETURN(0);
906+
}
907+
908+
if (check_and_init_stream(ctx) == FLB_FALSE) {
909+
FLB_INPUT_RETURN(0);
879910
}
880911

881912
ret = FLB_HTTP_MORE;
882913
bytes_consumed = 0;
883914
chunk_proc_ret = 0;
884915
while ((ret == FLB_HTTP_MORE || ret == FLB_HTTP_CHUNK_AVAILABLE) && chunk_proc_ret == 0) {
885-
ret = flb_http_get_response_data(c, bytes_consumed);
916+
ret = flb_http_get_response_data(ctx->streaming_client, bytes_consumed);
886917
bytes_consumed = 0;
887-
if( c->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
888-
chunk_proc_ret = process_http_chunk(ctx, c, &bytes_consumed);
918+
if(ctx->streaming_client->resp.status == 200 && ret == FLB_HTTP_CHUNK_AVAILABLE ) {
919+
chunk_proc_ret = process_http_chunk(ctx, ctx->streaming_client, &bytes_consumed);
889920
}
890921
}
891922
/* NOTE: skipping any processing after streaming socket closes */
892923

893-
if (c->resp.status != 200) {
894-
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s", c->resp.status, c->resp.payload);
924+
if (ctx->streaming_client->resp.status != 200) {
925+
flb_plg_warn(ins, "events watch failure, http_status=%d payload=%s",
926+
ctx->streaming_client->resp.status, ctx->streaming_client->resp.payload);
927+
928+
flb_http_client_destroy(ctx->streaming_client);
929+
flb_upstream_conn_release(ctx->current_connection);
930+
ctx->streaming_client = NULL;
931+
ctx->current_connection = NULL;
895932
}
896933

897934
exit:
898935
pthread_mutex_unlock(&ctx->lock);
899-
if (c) {
900-
flb_http_client_destroy(c);
901-
}
902-
if (u_conn) {
903-
flb_upstream_conn_release(u_conn);
904-
}
905936
FLB_INPUT_RETURN(0);
906937
}
907938

plugins/in_kubernetes_events/kubernetes_events.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ struct k8s_events {
8282
struct flb_upstream *upstream;
8383
struct flb_input_instance *ins;
8484

85+
struct flb_connection *current_connection;
86+
struct flb_http_client *streaming_client;
87+
8588
/* limit for event queries */
8689
int limit_request;
8790
/* last highest seen resource_version */

plugins/in_kubernetes_events/kubernetes_events_conf.c

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ static int network_init(struct k8s_events *ctx, struct flb_config *config)
9494
int io_type = FLB_IO_TCP;
9595

9696
ctx->upstream = NULL;
97+
ctx->current_connection = NULL;
98+
ctx->streaming_client = NULL;
9799

98100
if (ctx->api_https == FLB_TRUE) {
99101
if (!ctx->tls_ca_path && !ctx->tls_ca_file) {
@@ -280,6 +282,14 @@ void k8s_events_conf_destroy(struct k8s_events *ctx)
280282
flb_ra_destroy(ctx->ra_resource_version);
281283
}
282284

285+
if(ctx->streaming_client) {
286+
flb_http_client_destroy(ctx->streaming_client);
287+
}
288+
289+
if(ctx->current_connection) {
290+
flb_upstream_conn_release(ctx->current_connection);
291+
}
292+
283293
if (ctx->upstream) {
284294
flb_upstream_destroy(ctx->upstream);
285295
}

tests/runtime/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ if(FLB_OUT_LIB)
5757
FLB_RT_TEST(FLB_IN_TCP "in_tcp.c")
5858
FLB_RT_TEST(FLB_IN_FORWARD "in_forward.c")
5959
FLB_RT_TEST(FLB_IN_FLUENTBIT_METRICS "in_fluentbit_metrics.c")
60+
FLB_RT_TEST(FLB_IN_KUBERNETES_EVENTS "in_kubernetes_events.c")
6061
endif()
6162

6263
# Filter Plugins
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"kind": "EventList",
3+
"apiVersion": "v1",
4+
"metadata": {
5+
"resourceVersion": "177157"
6+
},
7+
"items": [
8+
{
9+
"metadata": {
10+
"name": "fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee",
11+
"namespace": "default",
12+
"uid": "6e3013d5-a79b-4dc4-b6c0-6b652302672e",
13+
"resourceVersion": "176761",
14+
"creationTimestamp": "2023-12-24T13:37:16Z",
15+
"managedFields": [
16+
{
17+
"manager": "kube-scheduler",
18+
"operation": "Update",
19+
"apiVersion": "events.k8s.io/v1",
20+
"time": "2023-12-24T13:37:16Z",
21+
"fieldsType": "FieldsV1",
22+
"fieldsV1": {
23+
"f:action": {},
24+
"f:eventTime": {},
25+
"f:note": {},
26+
"f:reason": {},
27+
"f:regarding": {},
28+
"f:reportingController": {},
29+
"f:reportingInstance": {},
30+
"f:type": {}
31+
}
32+
}
33+
]
34+
},
35+
"involvedObject": {
36+
"kind": "Pod",
37+
"namespace": "default",
38+
"name": "fluent-bit-78945dccd8-2g7qg",
39+
"uid": "ed7de8ff-61fb-40bb-9ecb-55a801a4cd89",
40+
"apiVersion": "v1",
41+
"resourceVersion": "176749"
42+
},
43+
"reason": "FailedScheduling",
44+
"message": "0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..",
45+
"source": {},
46+
"firstTimestamp": null,
47+
"lastTimestamp": null,
48+
"type": "Warning",
49+
"eventTime": "2023-12-24T13:37:16.335172Z",
50+
"action": "Scheduling",
51+
"reportingComponent": "default-scheduler",
52+
"reportingInstance": "default-scheduler-minikube"
53+
}
54+
]
55+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[1703425036.000000,{"metadata":{"name":"fluent-bit-78945dccd8-2g7qg.17a3c80ba0453aee","namespace":"default","uid":"6e3013d5-a79b-4dc4-b6c0-6b652302672e","resourceVersion":"176761","creationTimestamp":"2023-12-24T13:37:16Z","managedFields":[{"manager":"kube-scheduler","operation":"Update","apiVersion":"events.k8s.io/v1","time":"2023-12-24T13:37:16Z","fieldsType":"FieldsV1","fieldsV1":{"f:action":{},"f:eventTime":{},"f:note":{},"f:reason":{},"f:regarding":{},"f:reportingController":{},"f:reportingInstance":{},"f:type":{}}}]},"involvedObject":{"kind":"Pod","namespace":"default","name":"fluent-bit-78945dccd8-2g7qg","uid":"ed7de8ff-61fb-40bb-9ecb-55a801a4cd89","apiVersion":"v1","resourceVersion":"176749"},"reason":"FailedScheduling","message":"0/1 nodes are available: 1 Insufficient cpu. preemption: 0/1 nodes are available: 1 No preemption victims found for incoming pod..","source":{},"firstTimestamp":null,"lastTimestamp":null,"type":"Warning","eventTime":"2023-12-24T13:37:16.335172Z","action":"Scheduling","reportingComponent":"default-scheduler","reportingInstance":"default-scheduler-minikube"}]
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
{
2+
"kind": "EventList",
3+
"apiVersion": "v1",
4+
"metadata": {
5+
"resourceVersion": "177157"
6+
},
7+
"items": [
8+
{
9+
"metadata": {
10+
"name": ".17a3ba8b4aa36c81",
11+
"namespace": "default",
12+
"uid": "ec5546b7-f1b9-4e61-a90c-a1f3b611edbc",
13+
"resourceVersion": "174688",
14+
"creationTimestamp": "2023-12-24T09:30:07Z",
15+
"managedFields": [
16+
{
17+
"manager": "storage-provisioner",
18+
"operation": "Update",
19+
"apiVersion": "v1",
20+
"time": "2023-12-24T09:30:07Z",
21+
"fieldsType": "FieldsV1",
22+
"fieldsV1": {
23+
"f:count": {},
24+
"f:firstTimestamp": {},
25+
"f:involvedObject": {},
26+
"f:lastTimestamp": {},
27+
"f:message": {},
28+
"f:reason": {},
29+
"f:source": {
30+
"f:component": {}
31+
},
32+
"f:type": {}
33+
}
34+
}
35+
]
36+
},
37+
"involvedObject": {
38+
"kind": "Endpoints",
39+
"apiVersion": "v1"
40+
},
41+
"reason": "LeaderElection",
42+
"message": "minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading",
43+
"source": {
44+
"component": "k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"
45+
},
46+
"firstTimestamp": "2023-12-24T09:29:51Z",
47+
"lastTimestamp": "2023-12-24T09:29:51Z",
48+
"count": 1,
49+
"type": "Normal",
50+
"eventTime": null,
51+
"reportingComponent": "",
52+
"reportingInstance": ""
53+
}
54+
]
55+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
[1703410191.000000,{"metadata":{"name":".17a3ba8b4aa36c81","namespace":"default","uid":"ec5546b7-f1b9-4e61-a90c-a1f3b611edbc","resourceVersion":"174688","creationTimestamp":"2023-12-24T09:30:07Z","managedFields":[{"manager":"storage-provisioner","operation":"Update","apiVersion":"v1","time":"2023-12-24T09:30:07Z","fieldsType":"FieldsV1","fieldsV1":{"f:count":{},"f:firstTimestamp":{},"f:involvedObject":{},"f:lastTimestamp":{},"f:message":{},"f:reason":{},"f:source":{"f:component":{}},"f:type":{}}}]},"involvedObject":{"kind":"Endpoints","apiVersion":"v1"},"reason":"LeaderElection","message":"minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f stopped leading","source":{"component":"k8s.io/minikube-hostpath_minikube_31f5cdfb-29b0-4f84-9f9c-585088e9235f"},"firstTimestamp":"2023-12-24T09:29:51Z","lastTimestamp":"2023-12-24T09:29:51Z","count":1,"type":"Normal","eventTime":null,"reportingComponent":"","reportingInstance":""}]
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
fakeTokenFile

0 commit comments

Comments
 (0)