Skip to content

Commit 927489b

Browse files
nokute78edsiper
authored andcommitted
in_docker_events: try to retry using flb_input_set_collector_time
Signed-off-by: Takahiro Yamashita <[email protected]>
1 parent 97d7f5d commit 927489b

File tree

2 files changed

+107
-16
lines changed

2 files changed

+107
-16
lines changed

plugins/in_docker_events/docker_events.c

Lines changed: 100 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -94,13 +94,19 @@ static int reconnect_docker_sock(struct flb_input_instance *ins,
9494
ctx->coll_id = -1;
9595
}
9696
if (ctx->fd > 0) {
97+
flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd);
9798
close(ctx->fd);
9899
ctx->fd = -1;
99100
}
100101

101102
/* create socket again */
102103
if (de_unix_create(ctx) < 0) {
103104
flb_plg_error(ctx->ins, "failed to re-initialize socket");
105+
if (ctx->fd > 0) {
106+
flb_plg_debug(ctx->ins, "close socket fd=%d", ctx->fd);
107+
close(ctx->fd);
108+
ctx->fd = -1;
109+
}
104110
return -1;
105111
}
106112
/* set event */
@@ -124,29 +130,95 @@ static int reconnect_docker_sock(struct flb_input_instance *ins,
124130
ctx->fd = -1;
125131
return -1;
126132
}
133+
134+
flb_plg_info(ctx->ins, "Reconnect successful");
127135
return 0;
128136
}
129137

130-
static int reconnect_docker_sock_retry(struct flb_input_instance *ins,
131-
struct flb_config *config,
132-
struct flb_in_de_config *ctx)
138+
static int cb_reconnect(struct flb_input_instance *ins,
139+
struct flb_config *config,
140+
void *in_context)
133141
{
134-
int i;
142+
struct flb_in_de_config *ctx = in_context;
135143
int ret;
136-
for (i=0; i<ctx->reconnect_retry_limits; i++) {
137-
ret = reconnect_docker_sock(ins, config, ctx);
138-
if (ret >= 0) {
139-
return ret;
144+
145+
flb_plg_info(ctx->ins, "Retry(%d/%d)",
146+
ctx->current_retries, ctx->reconnect_retry_limits);
147+
ret = reconnect_docker_sock(ins, config, ctx);
148+
if (ret < 0) {
149+
/* Failed to reconnect */
150+
ctx->current_retries++;
151+
if (ctx->current_retries > ctx->reconnect_retry_limits) {
152+
/* give up */
153+
flb_plg_error(ctx->ins, "Failed to retry. Giving up...");
154+
goto cb_reconnect_end;
140155
}
141-
flb_plg_info(ctx->ins, "Retry(%d) after %d seconds.",i+1,
142-
ctx->reconnect_retry_interval);
143-
sleep(ctx->reconnect_retry_interval);
156+
flb_plg_info(ctx->ins, "Failed. Waiting for next retry..");
157+
return 0;
144158
}
145159

146-
flb_plg_error(ctx->ins, "Failed to reconnect docker.");
147-
return -1;
160+
cb_reconnect_end:
161+
if(flb_input_collector_delete(ctx->retry_coll_id, ins) < 0) {
162+
flb_plg_error(ctx->ins, "failed to delete timer event");
163+
}
164+
ctx->current_retries = 0;
165+
ctx->retry_coll_id = -1;
166+
return ret;
148167
}
149168

169+
static int create_reconnect_event(struct flb_input_instance *ins,
170+
struct flb_config *config,
171+
struct flb_in_de_config *ctx)
172+
{
173+
int ret;
174+
175+
if (ctx->retry_coll_id >= 0) {
176+
flb_plg_debug(ctx->ins, "already retring ?");
177+
return 0;
178+
}
179+
180+
/* try before creating event to stop incoming event */
181+
ret = reconnect_docker_sock(ins, config, ctx);
182+
if (ret == 0) {
183+
return 0;
184+
}
185+
186+
ctx->current_retries = 1;
187+
ctx->retry_coll_id = flb_input_set_collector_time(ins,
188+
cb_reconnect,
189+
ctx->reconnect_retry_interval,
190+
0,
191+
config);
192+
if (ctx->retry_coll_id < 0) {
193+
flb_plg_error(ctx->ins, "failed to create timer event");
194+
return -1;
195+
}
196+
ret = flb_input_collector_start(ctx->retry_coll_id, ins);
197+
if (ret < 0) {
198+
flb_plg_error(ctx->ins, "failed to start timer event");
199+
flb_input_collector_delete(ctx->retry_coll_id, ins);
200+
ctx->retry_coll_id = -1;
201+
return -1;
202+
}
203+
flb_plg_info(ctx->ins, "create reconnect event. interval=%d second",
204+
ctx->reconnect_retry_interval);
205+
206+
return 0;
207+
}
208+
209+
static int is_recoverable_error(int error)
210+
{
211+
/* ENOTTY:
212+
It reports on Docker in Docker mode.
213+
https://github.com/fluent/fluent-bit/issues/3439#issuecomment-831424674
214+
*/
215+
if (error == ENOTTY || error == EBADF) {
216+
return FLB_TRUE;
217+
}
218+
return FLB_FALSE;
219+
}
220+
221+
150222
/**
151223
* Callback function to process events recieved on the unix
152224
* socket.
@@ -232,15 +304,25 @@ static int in_de_collect(struct flb_input_instance *ins,
232304

233305
/* docker service may be restarted */
234306
flb_plg_info(ctx->ins, "EOF detected. Re-initialize");
235-
ret = reconnect_docker_sock_retry(ins, config, ctx);
236-
if (ret < 0) {
237-
return ret;
307+
if (ctx->reconnect_retry_limits > 0) {
308+
ret = create_reconnect_event(ins, config, ctx);
309+
if (ret < 0) {
310+
return ret;
311+
}
238312
}
239313
}
240314
else {
241315
error = errno;
242316
flb_plg_error(ctx->ins, "read returned error: %d, %s", error,
243317
strerror(error));
318+
if (is_recoverable_error(error)) {
319+
if (ctx->reconnect_retry_limits > 0) {
320+
ret = create_reconnect_event(ins, config, ctx);
321+
if (ret < 0) {
322+
return ret;
323+
}
324+
}
325+
}
244326
}
245327

246328
return 0;
@@ -267,6 +349,8 @@ static int in_de_init(struct flb_input_instance *ins,
267349
return -1;
268350
}
269351
ctx->ins = ins;
352+
ctx->retry_coll_id = -1;
353+
ctx->current_retries = 0;
270354

271355
/* Set the context */
272356
flb_input_set_context(ins, ctx);

plugins/in_docker_events/docker_events.h

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,15 @@ struct flb_in_de_config
3838
char *buf;
3939
size_t buf_size;
4040
flb_sds_t key;
41+
42+
/* retries */
4143
int reconnect_retry_limits;
4244
int reconnect_retry_interval;
45+
46+
/* retries (internal) */
47+
int current_retries;
48+
int retry_coll_id;
49+
4350
struct flb_parser *parser;
4451
struct flb_input_instance *ins; /* Input plugin instace */
4552

0 commit comments

Comments
 (0)