Skip to content

Commit 567ce06

Browse files
edsipercosmo0920
authored andcommitted
in_tcp: implement pause and resume callbacks
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 8c44185 commit 567ce06

File tree

3 files changed

+68
-9
lines changed

3 files changed

+68
-9
lines changed

plugins/in_tcp/tcp.c

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,8 @@ static int in_tcp_collect(struct flb_input_instance *in,
4242
connection = flb_downstream_conn_get(ctx->downstream);
4343

4444
if (connection == NULL) {
45-
flb_plg_error(ctx->ins, "could not accept new connection");
46-
47-
return -1;
45+
/* connection dropped (e.g. paused) */
46+
return 0;
4847
}
4948

5049
flb_plg_trace(ctx->ins, "new TCP connection arrived FD=%i", connection->fd);
@@ -145,6 +144,37 @@ static int in_tcp_exit(void *data, struct flb_config *config)
145144
return 0;
146145
}
147146

147+
static void in_tcp_pause(void *data, struct flb_config *config)
148+
{
149+
struct flb_in_tcp_config *ctx = data;
150+
struct mk_list *head;
151+
struct mk_list *tmp;
152+
struct tcp_conn *conn;
153+
154+
(void) config;
155+
156+
flb_downstream_pause(ctx->downstream);
157+
158+
mk_list_foreach_safe(head, tmp, &ctx->connections) {
159+
conn = mk_list_entry(head, struct tcp_conn, _head);
160+
if (conn->busy) {
161+
conn->pending_close = FLB_TRUE;
162+
continue;
163+
}
164+
165+
tcp_conn_del(conn);
166+
}
167+
}
168+
169+
static void in_tcp_resume(void *data, struct flb_config *config)
170+
{
171+
struct flb_in_tcp_config *ctx = data;
172+
173+
(void) config;
174+
175+
flb_downstream_resume(ctx->downstream);
176+
}
177+
148178
static struct flb_config_map config_map[] = {
149179
{
150180
FLB_CONFIG_MAP_STR, "format", (char *)NULL,
@@ -183,6 +213,8 @@ struct flb_input_plugin in_tcp_plugin = {
183213
.cb_pre_run = NULL,
184214
.cb_collect = in_tcp_collect,
185215
.cb_flush_buf = NULL,
216+
.cb_pause = in_tcp_pause,
217+
.cb_resume = in_tcp_resume,
186218
.cb_exit = in_tcp_exit,
187219
.config_map = config_map,
188220
.flags = FLB_INPUT_NET_SERVER | FLB_IO_OPT_TLS

plugins/in_tcp/tcp_conn.c

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,7 @@ int tcp_conn_event(void *data)
260260
struct tcp_conn *conn;
261261
struct flb_connection *connection;
262262
struct flb_in_tcp_config *ctx;
263+
int ret = 0;
263264

264265
connection = (struct flb_connection *) data;
265266

@@ -269,20 +270,24 @@ int tcp_conn_event(void *data)
269270

270271
event = &connection->event;
271272

273+
conn->busy = FLB_TRUE;
274+
272275
if (event->mask & MK_EVENT_READ) {
273276
available = (conn->buf_size - conn->buf_len) - 1;
274277
if (available < 1) {
275278
if (conn->buf_size + ctx->chunk_size > ctx->buffer_size) {
276279
flb_plg_warn(ctx->ins,
277280
"fd=%i incoming data exceeds 'Buffer_Size' (%zu KB)",
278281
event->fd, (ctx->buffer_size / 1024));
282+
conn->busy = FLB_FALSE;
279283
tcp_conn_del(conn);
280284
return -1;
281285
}
282286

283287
size = conn->buf_size + ctx->chunk_size;
284288
tmp = flb_realloc(conn->buf_data, size);
285289
if (!tmp) {
290+
conn->busy = FLB_FALSE;
286291
flb_errno();
287292
return -1;
288293
}
@@ -301,6 +306,7 @@ int tcp_conn_event(void *data)
301306

302307
if (bytes <= 0) {
303308
flb_plg_trace(ctx->ins, "fd=%i closed connection", event->fd);
309+
conn->busy = FLB_FALSE;
304310
tcp_conn_del(conn);
305311
return -1;
306312
}
@@ -325,23 +331,27 @@ int tcp_conn_event(void *data)
325331
ret_payload = parse_payload_json(conn);
326332
if (ret_payload == 0) {
327333
/* Incomplete JSON message, we need more data */
328-
return -1;
334+
ret = -1;
335+
goto cleanup;
329336
}
330337
else if (ret_payload == -1) {
331338
flb_pack_state_reset(&conn->pack_state);
332339
flb_pack_state_init(&conn->pack_state);
333340
conn->pack_state.multiple = FLB_TRUE;
334-
return -1;
341+
ret = -1;
342+
goto cleanup;
335343
}
336344
}
337345
else if (ctx->format == FLB_TCP_FMT_NONE) {
338346
ret_payload = parse_payload_none(conn);
339347
if (ret_payload == 0) {
340-
return -1;
348+
ret = -1;
349+
goto cleanup;
341350
}
342351
else if (ret_payload == -1) {
343352
conn->buf_len = 0;
344-
return -1;
353+
ret = -1;
354+
goto cleanup;
345355
}
346356
}
347357

@@ -357,16 +367,27 @@ int tcp_conn_event(void *data)
357367
conn->pack_state.buf_len = 0;
358368
}
359369

360-
return bytes;
370+
ret = bytes;
371+
goto cleanup;
361372
}
362373

363374
if (event->mask & MK_EVENT_CLOSE) {
364375
flb_plg_trace(ctx->ins, "fd=%i hangup", event->fd);
376+
conn->busy = FLB_FALSE;
365377
tcp_conn_del(conn);
366378
return -1;
367379
}
368380

369-
return 0;
381+
ret = 0;
382+
383+
cleanup:
384+
conn->busy = FLB_FALSE;
385+
if (conn->pending_close) {
386+
tcp_conn_del(conn);
387+
return -1;
388+
}
389+
390+
return ret;
370391
}
371392

372393
/* Create a new mqtt request instance */
@@ -432,6 +453,9 @@ struct tcp_conn *tcp_conn_add(struct flb_connection *connection,
432453

433454
mk_list_add(&conn->_head, &ctx->connections);
434455

456+
conn->busy = FLB_FALSE;
457+
conn->pending_close = FLB_FALSE;
458+
435459
return conn;
436460
}
437461

plugins/in_tcp/tcp_conn.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,9 @@ struct tcp_conn {
5050
struct flb_pack_state pack_state; /* Internal JSON parser */
5151
struct flb_connection *connection;
5252

53+
int busy; /* Connection is being processed */
54+
int pending_close; /* Defer closing until processing ends */
55+
5356
struct mk_list _head;
5457
};
5558

0 commit comments

Comments
 (0)