diff --git a/plugins/in_http/http.c b/plugins/in_http/http.c index ff9687ed04c..4503c588f8c 100644 --- a/plugins/in_http/http.c +++ b/plugins/in_http/http.c @@ -49,6 +49,13 @@ static int in_http_collect(struct flb_input_instance *ins, return -1; } + if (ctx->is_paused) { + flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", + connection->fd); + flb_downstream_conn_release(connection); + return -1; + } + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); @@ -79,6 +86,7 @@ static int in_http_init(struct flb_input_instance *ins, } ctx->collector_id = -1; + ctx->is_paused = FLB_FALSE; /* Populate context with config map defaults and incoming properties */ ret = flb_input_config_map_set(ins, (void *) ctx); @@ -199,6 +207,27 @@ static int in_http_exit(void *data, struct flb_config *config) return 0; } +static void in_http_pause(void *data, struct flb_config *config) +{ + struct flb_http *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_pause(ctx->collector_id, ctx->ins); + http_conn_release_all(ctx); + ctx->is_paused = FLB_TRUE; + } +} + +static void in_http_resume(void *data, struct flb_config *config) +{ + struct flb_http *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_resume(ctx->collector_id, ctx->ins); + ctx->is_paused = FLB_FALSE; + } +} + /* Configuration properties map */ static struct flb_config_map config_map[] = { { @@ -249,8 +278,8 @@ struct flb_input_plugin in_http_plugin = { .cb_pre_run = NULL, .cb_collect = in_http_collect, .cb_flush_buf = NULL, - .cb_pause = NULL, - .cb_resume = NULL, + .cb_pause = in_http_pause, + .cb_resume = in_http_resume, .cb_exit = in_http_exit, .config_map = config_map, .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS diff --git a/plugins/in_http/http.h b/plugins/in_http/http.h index 4298a370c9c..71ba0f5653a 100644 --- a/plugins/in_http/http.h +++ b/plugins/in_http/http.h @@ -61,6 +61,7 @@ struct flb_http { struct mk_server *server; int collector_id; + int is_paused; /* Plugin is paused */ }; diff --git a/plugins/in_opentelemetry/opentelemetry.c b/plugins/in_opentelemetry/opentelemetry.c index 3b02f210629..5f4d9d817fc 100644 --- a/plugins/in_opentelemetry/opentelemetry.c +++ b/plugins/in_opentelemetry/opentelemetry.c @@ -50,6 +50,13 @@ static int in_opentelemetry_collect(struct flb_input_instance *ins, return -1; } + if (ctx->is_paused) { + flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", + connection->fd); + flb_downstream_conn_release(connection); + return -1; + } + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); conn = opentelemetry_conn_add(connection, ctx); @@ -76,6 +83,7 @@ static int in_opentelemetry_init(struct flb_input_instance *ins, return -1; } ctx->collector_id = -1; + ctx->is_paused = FLB_FALSE; /* Populate context with config map defaults and incoming properties */ ret = flb_input_config_map_set(ins, (void *) ctx); @@ -195,6 +203,27 @@ static int in_opentelemetry_exit(void *data, struct flb_config *config) return 0; } +static void in_opentelemetry_pause(void *data, struct flb_config *config) +{ + struct flb_opentelemetry *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_pause(ctx->collector_id, ctx->ins); + opentelemetry_conn_release_all(ctx); + ctx->is_paused = FLB_TRUE; + } +} + +static void in_opentelemetry_resume(void *data, struct flb_config *config) +{ + struct flb_opentelemetry *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_resume(ctx->collector_id, ctx->ins); + ctx->is_paused = FLB_FALSE; + } +} + /* Configuration properties map */ static struct flb_config_map config_map[] = { { @@ -272,8 +301,8 @@ struct flb_input_plugin in_opentelemetry_plugin = { .cb_pre_run = NULL, .cb_collect = in_opentelemetry_collect, .cb_flush_buf = NULL, - .cb_pause = NULL, - .cb_resume = NULL, + .cb_pause = in_opentelemetry_pause, + .cb_resume = in_opentelemetry_resume, .cb_exit = in_opentelemetry_exit, .config_map = config_map, .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS diff --git a/plugins/in_opentelemetry/opentelemetry.h b/plugins/in_opentelemetry/opentelemetry.h index c7d8fd43467..23fb51d2c18 100644 --- a/plugins/in_opentelemetry/opentelemetry.h +++ b/plugins/in_opentelemetry/opentelemetry.h @@ -57,6 +57,7 @@ struct flb_opentelemetry { struct mk_list connections; /* linked list of connections */ struct mk_server *server; + int is_paused; /* Plugin is paused */ }; diff --git a/plugins/in_tcp/tcp.c b/plugins/in_tcp/tcp.c index 9720279e104..7c53bb132c2 100644 --- a/plugins/in_tcp/tcp.c +++ b/plugins/in_tcp/tcp.c @@ -47,6 +47,13 @@ static int in_tcp_collect(struct flb_input_instance *in, return -1; } + if (ctx->is_paused) { + flb_plg_trace(ctx->ins, "TCP connection will be closed FD=%i", + connection->fd); + flb_downstream_conn_release(connection); + return -1; + } + flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd); conn = tcp_conn_add(connection, ctx); @@ -80,6 +87,7 @@ static int in_tcp_init(struct flb_input_instance *in, ctx->collector_id = -1; ctx->ins = in; mk_list_init(&ctx->connections); + ctx->is_paused = FLB_FALSE; /* Set the context */ flb_input_set_context(in, ctx); @@ -145,6 +153,27 @@ static int in_tcp_exit(void *data, struct flb_config *config) return 0; } +static void in_tcp_pause(void *data, struct flb_config *config) +{ + struct flb_in_tcp_config *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_pause(ctx->collector_id, ctx->ins); + tcp_conn_release_all(ctx); + ctx->is_paused = FLB_TRUE; + } +} + +static void in_tcp_resume(void *data, struct flb_config *config) +{ + struct flb_in_tcp_config *ctx = data; + + if (config->is_running == FLB_TRUE) { + flb_input_collector_resume(ctx->collector_id, ctx->ins); + ctx->is_paused = FLB_FALSE; + } +} + static struct flb_config_map config_map[] = { { FLB_CONFIG_MAP_STR, "format", (char *)NULL, @@ -183,6 +212,8 @@ struct flb_input_plugin in_tcp_plugin = { .cb_pre_run = NULL, .cb_collect = in_tcp_collect, .cb_flush_buf = NULL, + .cb_pause = in_tcp_pause, + .cb_resume = in_tcp_resume, .cb_exit = in_tcp_exit, .config_map = config_map, .flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS diff --git a/plugins/in_tcp/tcp.h b/plugins/in_tcp/tcp.h index 8c039a6a079..ee6f2ce30d4 100644 --- a/plugins/in_tcp/tcp.h +++ b/plugins/in_tcp/tcp.h @@ -46,6 +46,7 @@ struct flb_in_tcp_config { struct mk_list connections; /* List of active connections */ struct flb_input_instance *ins; /* Input plugin instace */ struct flb_log_event_encoder *log_encoder; + int is_paused; /* Plugin is paused */ }; #endif diff --git a/plugins/in_tcp/tcp_conn.c b/plugins/in_tcp/tcp_conn.c index 4157f59638a..2991e719b57 100644 --- a/plugins/in_tcp/tcp_conn.c +++ b/plugins/in_tcp/tcp_conn.c @@ -458,3 +458,15 @@ int tcp_conn_del(struct tcp_conn *conn) return 0; } + +void tcp_conn_release_all(struct flb_in_tcp_config *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct tcp_conn *conn; + + mk_list_foreach_safe(head, tmp, &ctx->connections) { + conn = mk_list_entry(head, struct tcp_conn, _head); + tcp_conn_del(conn); + } +} diff --git a/plugins/in_tcp/tcp_conn.h b/plugins/in_tcp/tcp_conn.h index 25580c60bc6..da257bd0734 100644 --- a/plugins/in_tcp/tcp_conn.h +++ b/plugins/in_tcp/tcp_conn.h @@ -55,5 +55,6 @@ struct tcp_conn { struct tcp_conn *tcp_conn_add(struct flb_connection *connection, struct flb_in_tcp_config *ctx); int tcp_conn_del(struct tcp_conn *conn); +void tcp_conn_release_all(struct flb_in_tcp_config *ctx); #endif