Skip to content

Commit e1c7c2d

Browse files
committed
upstream: monitor idle keepalive connections for 'disconnections' (#1704)
Signed-off-by: Eduardo Silva <[email protected]>
1 parent 516dfba commit e1c7c2d

File tree

1 file changed

+52
-1
lines changed

1 file changed

+52
-1
lines changed

src/flb_upstream.c

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ static struct flb_upstream_conn *create_conn(struct flb_upstream *u)
156156
u->n_connections++;
157157

158158
if (conn->u->flags & FLB_IO_TCP_KA) {
159-
flb_debug("[upstream] KA connection #%i to %s:%i created",
159+
flb_debug("[upstream] KA connection #%i to %s:%i is connected",
160160
conn->fd, u->tcp_host, u->tcp_port);
161161
}
162162

@@ -252,6 +252,16 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
252252
mk_list_add(&conn->_head, &u->busy_queue);
253253
flb_debug("[upstream] KA connection #%i to %s:%i has been assigned (recycled)",
254254
conn->fd, u->tcp_host, u->tcp_port);
255+
256+
/*
257+
* Note: since we are in a keepalive connection, the socket is already being
258+
* monitored for possible disconnections while idle. Upon re-use by the caller
259+
* when it try to send some data, the I/O interface (flb_io.c) will put the
260+
* proper event mask and reuse, there is no need to remove the socket from
261+
* the event loop and re-add it again.
262+
*
263+
* So... just return the connection context.
264+
*/
255265
return conn;
256266
}
257267

@@ -263,9 +273,30 @@ struct flb_upstream_conn *flb_upstream_conn_get(struct flb_upstream *u)
263273
return conn;
264274
}
265275

276+
/*
277+
* An 'idle' and keepalive might be disconnected, if so, this callback will perform
278+
* the proper connection cleanup.
279+
*/
280+
static int cb_upstream_conn_ka_dropped(void *data)
281+
{
282+
struct flb_upstream_conn *conn;
283+
284+
conn = (struct flb_upstream_conn *) data;
285+
286+
flb_debug("[upstream] KA connection #%i to %s:%i has been disconnected "
287+
"by the remote service",
288+
conn->fd, conn->u->tcp_host, conn->u->tcp_port);
289+
return destroy_conn(conn);
290+
}
291+
266292
int flb_upstream_conn_release(struct flb_upstream_conn *conn)
267293
{
294+
int ret;
268295
time_t ts;
296+
struct flb_upstream *u;
297+
298+
/* Upstream context */
299+
u = conn->u;
269300

270301
/* If this is a valid KA connection just recycle */
271302
if (conn->u->flags & FLB_IO_TCP_KA) {
@@ -285,6 +316,26 @@ int flb_upstream_conn_release(struct flb_upstream_conn *conn)
285316
mk_list_del(&conn->_head);
286317
mk_list_add(&conn->_head, &conn->u->av_queue);
287318
conn->ts_available = time(NULL);
319+
320+
/*
321+
* The socket at this point is not longer monitored, so if we want to be
322+
* notified if the 'available keepalive connection' gets disconnected by
323+
* the remote endpoint we need to add it again.
324+
*/
325+
conn->event.handler = cb_upstream_conn_ka_dropped;
326+
conn->event.data = &conn;
327+
328+
ret = mk_event_add(u->evl, conn->fd,
329+
FLB_ENGINE_EV_CUSTOM,
330+
MK_EVENT_CLOSE, &conn->event);
331+
if (ret == -1) {
332+
/* We failed the registration, for safety just destroy the connection */
333+
flb_debug("[upstream] KA connection #%i to %s:%i could not be "
334+
"registered, closing.",
335+
conn->fd, conn->u->tcp_host, conn->u->tcp_port);
336+
return destroy_conn(conn);
337+
}
338+
288339
flb_debug("[upstream] KA connection #%i to %s:%i is now available",
289340
conn->fd, conn->u->tcp_host, conn->u->tcp_port);
290341
return 0;

0 commit comments

Comments
 (0)