Skip to content

Commit 7f40367

Browse files
authored
proxy: fix memory bugs when use same golang output plugin multiple times (#6469)
Fix fluent/fluent-bit-go#49 Signed-off-by: jzajic <[email protected]>
1 parent 5949473 commit 7f40367

File tree

7 files changed

+98
-33
lines changed

7 files changed

+98
-33
lines changed

include/fluent-bit/flb_input.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,9 @@ struct flb_input_plugin {
134134
/* Exit */
135135
int (*cb_exit) (void *, struct flb_config *);
136136

137+
/* Destroy */
138+
void (*cb_destroy) (struct flb_input_plugin *);
139+
137140
void *instance;
138141

139142
struct mk_list _head;

include/fluent-bit/flb_output.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,9 @@ struct flb_output_plugin {
205205
/* Exit */
206206
int (*cb_exit) (void *, struct flb_config *);
207207

208+
/* Destroy */
209+
void (*cb_destroy) (struct flb_output_plugin *);
210+
208211
/* Default number of worker threads */
209212
int workers;
210213

include/fluent-bit/flb_plugins.h.in

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,12 +64,18 @@ void flb_plugins_unregister(struct flb_config *config)
6464

6565
mk_list_foreach_safe(head, tmp, &config->in_plugins) {
6666
in = mk_list_entry(head, struct flb_input_plugin, _head);
67+
if(in->cb_destroy) {
68+
in->cb_destroy(in);
69+
}
6770
mk_list_del(&in->_head);
6871
flb_free(in);
6972
}
7073

7174
mk_list_foreach_safe(head, tmp, &config->out_plugins) {
7275
out = mk_list_entry(head, struct flb_output_plugin, _head);
76+
if(out->cb_destroy) {
77+
out->cb_destroy(out);
78+
}
7379
mk_list_del(&out->_head);
7480
flb_free(out);
7581
}

src/flb_output.c

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -448,12 +448,7 @@ void flb_output_exit(struct flb_config *config)
448448

449449
/* Check a exit callback */
450450
if (p->cb_exit) {
451-
if (!p->proxy) {
452-
p->cb_exit(ins->context, config);
453-
}
454-
else {
455-
p->cb_exit(p, ins->context);
456-
}
451+
p->cb_exit(ins->context, config);
457452
}
458453
flb_output_instance_destroy(ins);
459454
}

src/flb_plugin_proxy.c

Lines changed: 60 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -183,37 +183,85 @@ static void flb_proxy_input_cb_resume(void *data, struct flb_config *config)
183183
}
184184

185185
static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy);
186-
static int flb_proxy_output_cb_exit(void *data, struct flb_config *config)
186+
187+
static int flb_proxy_output_cb_exit(void *out_context, struct flb_config *config)
187188
{
188-
struct flb_output_plugin *instance = data;
189-
struct flb_plugin_proxy *proxy = (instance->proxy);
189+
struct flb_plugin_proxy_context *ctx = out_context;
190+
struct flb_plugin_proxy *proxy = (ctx->proxy);
191+
192+
if (!out_context) {
193+
return 0;
194+
}
190195

191196
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
192-
proxy_go_output_destroy(proxy->data);
197+
#ifdef FLB_HAVE_PROXY_GO
198+
proxy_go_output_destroy(ctx);
199+
#endif
193200
}
194-
flb_plugin_proxy_destroy(proxy);
201+
202+
flb_free(ctx);
195203
return 0;
196204
}
197205

206+
static void flb_proxy_output_cb_destroy(struct flb_output_plugin *plugin)
207+
{
208+
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
209+
/* cleanup */
210+
void (*cb_unregister)(struct flb_plugin_proxy_def *def);
211+
212+
cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
213+
if (cb_unregister != NULL) {
214+
cb_unregister(proxy->def);
215+
}
216+
217+
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
218+
#ifdef FLB_HAVE_PROXY_GO
219+
proxy_go_output_unregister(proxy->data);
220+
#endif
221+
}
222+
223+
flb_plugin_proxy_destroy(proxy);
224+
}
225+
198226
static int flb_proxy_input_cb_exit(void *in_context, struct flb_config *config)
199227
{
200228
struct flb_plugin_input_proxy_context *ctx = in_context;
229+
struct flb_plugin_proxy *proxy = (ctx->proxy);
201230

202231
if (!in_context) {
203232
return 0;
204233
}
205234

206-
if (ctx->proxy->def->proxy == FLB_PROXY_GOLANG) {
207-
proxy_go_input_destroy(ctx->proxy->data);
235+
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
236+
#ifdef FLB_HAVE_PROXY_GO
237+
proxy_go_input_destroy(ctx);
238+
#endif
208239
}
209240

210-
flb_plugin_proxy_destroy(ctx->proxy);
211-
212241
flb_free(ctx);
213-
214242
return 0;
215243
}
216244

245+
static void flb_proxy_input_cb_destroy(struct flb_input_plugin *plugin)
246+
{
247+
struct flb_plugin_proxy *proxy = (struct flb_plugin_proxy *) plugin->proxy;
248+
/* cleanup */
249+
void (*cb_unregister)(struct flb_plugin_proxy_def *def);
250+
251+
cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
252+
if (cb_unregister != NULL) {
253+
cb_unregister(proxy->def);
254+
}
255+
256+
if (proxy->def->proxy == FLB_PROXY_GOLANG) {
257+
#ifdef FLB_HAVE_PROXY_GO
258+
proxy_go_input_unregister(proxy->data);
259+
#endif
260+
}
261+
262+
flb_plugin_proxy_destroy(proxy);
263+
}
264+
217265
static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
218266
struct flb_plugin_proxy_def *def,
219267
struct flb_config *config)
@@ -241,6 +289,7 @@ static int flb_proxy_register_output(struct flb_plugin_proxy *proxy,
241289
*/
242290
out->cb_flush = proxy_cb_flush;
243291
out->cb_exit = flb_proxy_output_cb_exit;
292+
out->cb_destroy = flb_proxy_output_cb_destroy;
244293
return 0;
245294
}
246295

@@ -273,6 +322,7 @@ static int flb_proxy_register_input(struct flb_plugin_proxy *proxy,
273322
in->cb_collect = flb_proxy_input_cb_collect;
274323
in->cb_flush_buf = NULL;
275324
in->cb_exit = flb_proxy_input_cb_exit;
325+
in->cb_destroy = flb_proxy_input_cb_destroy;
276326
in->cb_pause = flb_proxy_input_cb_pause;
277327
in->cb_resume = flb_proxy_input_cb_resume;
278328
return 0;
@@ -427,13 +477,6 @@ struct flb_plugin_proxy *flb_plugin_proxy_create(const char *dso_path, int type,
427477

428478
static void flb_plugin_proxy_destroy(struct flb_plugin_proxy *proxy)
429479
{
430-
/* cleanup */
431-
void (*cb_unregister)(struct flb_plugin_proxy_def *def);
432-
433-
cb_unregister = flb_plugin_proxy_symbol(proxy, "FLBPluginUnregister");
434-
if (cb_unregister != NULL) {
435-
cb_unregister(proxy->def);
436-
}
437480
flb_free(proxy->def);
438481
flb_api_destroy(proxy->api);
439482
dlclose(proxy->dso_handler);

src/proxy/go/go.c

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -145,23 +145,29 @@ int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
145145
return ret;
146146
}
147147

148-
int proxy_go_output_destroy(void *data)
148+
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx)
149149
{
150150
int ret = 0;
151151
struct flbgo_output_plugin *plugin;
152152

153-
plugin = (struct flbgo_output_plugin *) data;
153+
plugin = (struct flbgo_output_plugin *) ctx->proxy->data;
154154
flb_debug("[GO] running exit callback");
155155

156156
if (plugin->cb_exit_ctx) {
157-
ret = plugin->cb_exit_ctx(plugin->context->remote_context);
157+
ret = plugin->cb_exit_ctx(ctx->remote_context);
158158
}
159159
else if (plugin->cb_exit) {
160160
ret = plugin->cb_exit();
161161
}
162+
return ret;
163+
}
164+
165+
void proxy_go_output_unregister(void *data) {
166+
struct flbgo_output_plugin *plugin;
167+
168+
plugin = (struct flbgo_output_plugin *) data;
162169
flb_free(plugin->name);
163170
flb_free(plugin);
164-
return ret;
165171
}
166172

167173
int proxy_go_input_register(struct flb_plugin_proxy *proxy,
@@ -253,17 +259,24 @@ int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
253259
return ret;
254260
}
255261

256-
int proxy_go_input_destroy(void *data)
262+
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx)
257263
{
258264
int ret = 0;
259265
struct flbgo_input_plugin *plugin;
260266

261-
plugin = (struct flbgo_input_plugin *) data;
267+
plugin = (struct flbgo_input_plugin *) ctx->proxy->data;
262268
flb_debug("[GO] running exit callback");
263269

264-
ret = plugin->cb_exit();
270+
if (plugin->cb_exit) {
271+
ret = plugin->cb_exit();
272+
}
273+
return ret;
274+
}
265275

276+
void proxy_go_input_unregister(void *data) {
277+
struct flbgo_input_plugin *plugin;
278+
279+
plugin = (struct flbgo_input_plugin *) data;
266280
flb_free(plugin->name);
267281
flb_free(plugin);
268-
return ret;
269282
}

src/proxy/go/go.h

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ int proxy_go_output_init(struct flb_plugin_proxy *proxy);
5656
int proxy_go_output_flush(struct flb_plugin_proxy_context *ctx,
5757
const void *data, size_t size,
5858
const char *tag, int tag_len);
59-
int proxy_go_output_destroy(void *data);
59+
int proxy_go_output_destroy(struct flb_plugin_proxy_context *ctx);
60+
void proxy_go_output_unregister(void *data);
6061

6162
int proxy_go_input_register(struct flb_plugin_proxy *proxy,
6263
struct flb_plugin_proxy_def *def);
@@ -66,5 +67,6 @@ int proxy_go_input_collect(struct flb_plugin_proxy *ctx,
6667
void **collected_data, size_t *len);
6768
int proxy_go_input_cleanup(struct flb_plugin_proxy *ctx,
6869
void *allocated_data);
69-
int proxy_go_input_destroy(void *data);
70+
int proxy_go_input_destroy(struct flb_plugin_input_proxy_context *ctx);
71+
void proxy_go_input_unregister(void *data);
7072
#endif

0 commit comments

Comments
 (0)