Skip to content

Commit efa99fd

Browse files
committed
Use event_base with libevent 2.0+ to avoid thread-unsafe event_init in gpfdist
The legacy event_init() function is not thread-safe and can cause issues when gpfdist is run in multi-threaded environments. This patch updates gpfdist to use libevent 2.0+'s thread-safe APIs, specifically `event_base` along with `event_assign()` and `evtimer_assign()`. A new global `gcb.event_base` is introduced and used when compiled with libevent ≥ 2.0.1. This avoids the need for the deprecated and non-thread-safe `event_set()` / `evtimer_set()` APIs, and prepares gpfdist for better thread safety.
1 parent d13a8cb commit efa99fd

File tree

1 file changed

+42
-17
lines changed

1 file changed

+42
-17
lines changed

src/bin/gpfdist/gpfdist.c

Lines changed: 42 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -233,6 +233,7 @@ static struct
233233
SSL_CTX *server_ctx;/* for SSL */
234234
#endif
235235
int wdtimer; /* Kill gpfdist after k seconds of inactivity. 0 to disable. */
236+
struct event_base *event_base; /* for libevent 2.0+ */
236237
} gcb;
237238

238239
/* A session */
@@ -1600,7 +1601,7 @@ static void session_detach(request_t* r)
16001601
}
16011602

16021603
event_del(&session->ev);
1603-
evtimer_set(&session->ev, free_session_cb, session);
1604+
evtimer_assign(&session->ev, gcb.event_base, free_session_cb, session);
16041605
session->tm.tv_sec = opt.w;
16051606
session->tm.tv_usec = 0;
16061607
(void)evtimer_add(&session->ev, &session->tm);
@@ -1811,7 +1812,7 @@ static int session_attach(request_t* r)
18111812
session->active_segids[r->segid] = 1; /* mark this segid as active */
18121813
session->maxsegs = r->totalsegs;
18131814
session->requests = apr_hash_make(pool);
1814-
event_set(&session->ev, 0, 0, 0, 0);
1815+
event_assign(&session->ev, gcb.event_base, -1, 0, NULL, NULL);
18151816

18161817
if (session->tid == 0 || session->path == 0 || session->key == 0)
18171818
gfatal(r, "out of memory in session_attach");
@@ -2368,7 +2369,7 @@ static void do_accept(int fd, short event, void* arg)
23682369
r->pool = pool;
23692370
r->sock = sock;
23702371

2371-
event_set(&r->ev, 0, 0, 0, 0);
2372+
event_assign(&r->ev, gcb.event_base, -1, 0, NULL, NULL);
23722373

23732374
/* use the block size specified by -m option */
23742375
r->outblock.data = palloc_safe(r, pool, opt.m, "out of memory when allocating buffer: %d bytes", opt.m);
@@ -2421,7 +2422,7 @@ static int setup_write(request_t* r)
24212422
if (r->sock < 0)
24222423
gwarning(r, "internal error in setup_write - no socket to use");
24232424
event_del(&r->ev);
2424-
event_set(&r->ev, r->sock, EV_WRITE, do_write, r);
2425+
event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, do_write, r);
24252426
return (event_add(&r->ev, 0));
24262427
}
24272428

@@ -2445,7 +2446,7 @@ static int setup_read(request_t* r)
24452446
gwarning(r, "internal error in setup_read - no socket to use");
24462447

24472448
event_del(&r->ev);
2448-
event_set(&r->ev, r->sock, EV_READ, do_read_request, r);
2449+
event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_read_request, r);
24492450

24502451
if(opt.t == 0)
24512452
{
@@ -2552,18 +2553,32 @@ static void
25522553
signal_register()
25532554
{
25542555
/* when SIGTERM raised invoke process_term_signal */
2555-
signal_set(&gcb.signal_event,SIGTERM,process_term_signal,0);
2556+
evsignal_assign(&gcb.signal_event, gcb.event_base, SIGTERM, process_term_signal, 0);
25562557

25572558
/* high priority so we accept as fast as possible */
25582559
if(event_priority_set(&gcb.signal_event, 0))
25592560
gwarning(NULL,"signal event priority set failed");
25602561

25612562
/* start watching this event */
2562-
if(signal_add(&gcb.signal_event, 0))
2563+
if(evsignal_add(&gcb.signal_event, 0))
25632564
gfatal(NULL,"cannot set up event on signal register");
25642565

25652566
}
25662567

2568+
/*
2569+
* gpfdist_cleanup
2570+
*
2571+
* Clean up all resources before exiting
2572+
*/
2573+
static void gpfdist_cleanup(void)
2574+
{
2575+
/* Clean up event_base if initialized */
2576+
if (gcb.event_base) {
2577+
event_base_free(gcb.event_base);
2578+
gcb.event_base = NULL;
2579+
}
2580+
}
2581+
25672582
static void clear_listen_sock(void)
25682583
{
25692584
SOCKET sock = -1;
@@ -2616,9 +2631,8 @@ http_setup(void)
26162631
hostaddr = opt.b;
26172632

26182633
/* setup event priority */
2619-
if (event_priority_init(10))
2620-
gwarning(NULL, "event_priority_init failed");
2621-
2634+
if (event_base_priority_init(gcb.event_base, 10))
2635+
gwarning(NULL, "event_base_priority_init failed");
26222636

26232637
/* Try each possible port from opt.p to opt.last_port */
26242638
for (;;)
@@ -2811,8 +2825,8 @@ http_setup(void)
28112825
for (i = 0; i < gcb.listen_sock_count; i++)
28122826
{
28132827
/* when this socket is ready, do accept */
2814-
event_set(&gcb.listen_events[i], gcb.listen_socks[i], EV_READ | EV_PERSIST,
2815-
do_accept, 0);
2828+
event_assign(&gcb.listen_events[i], gcb.event_base, gcb.listen_socks[i],
2829+
EV_READ | EV_PERSIST, do_accept, 0);
28162830

28172831
/* only signal process function priority higher than socket handler */
28182832
if (event_priority_set(&gcb.listen_events[i], 1))
@@ -2838,6 +2852,9 @@ process_term_signal(int sig,short event,void* arg)
28382852
{
28392853
closesocket(gcb.listen_socks[i]);
28402854
}
2855+
2856+
/* Clean up resources before exiting */
2857+
gpfdist_cleanup();
28412858
_exit(1);
28422859
}
28432860

@@ -3913,7 +3930,10 @@ int gpfdist_init(int argc, const char* const argv[])
39133930
putenv("EVENT_SHOW_METHOD=1");
39143931
putenv("EVENT_NOKQUEUE=1");
39153932

3916-
event_init();
3933+
/* libevent 2.0+ */
3934+
gcb.event_base = event_base_new();
3935+
if (!gcb.event_base)
3936+
gfatal(NULL, "event_base_new failed");
39173937

39183938
signal_register();
39193939
http_setup();
@@ -3991,16 +4011,19 @@ int gpfdist_init(int argc, const char* const argv[])
39914011

39924012
int gpfdist_run()
39934013
{
3994-
return event_dispatch();
4014+
return event_base_dispatch(gcb.event_base);
39954015
}
39964016

39974017
#ifndef WIN32
39984018

39994019
int main(int argc, const char* const argv[])
40004020
{
4021+
int ret;
40014022
if (gpfdist_init(argc, argv) == -1)
40024023
gfatal(NULL, "Initialization failed");
4003-
return gpfdist_run();
4024+
ret = gpfdist_run();
4025+
gpfdist_cleanup();
4026+
return ret;
40044027
}
40054028

40064029

@@ -4175,6 +4198,7 @@ int main(int argc, const char* const argv[])
41754198
if (gpfdist_init(argc, argv) == -1)
41764199
gfatal(NULL, "Initialization failed");
41774200
main_ret = gpfdist_run();
4201+
gpfdist_cleanup();
41784202
}
41794203

41804204

@@ -4264,6 +4288,7 @@ void ServiceMain(int argc, char** argv)
42644288
* actual service work
42654289
*/
42664290
gpfdist_run();
4291+
gpfdist_cleanup();
42674292
}
42684293

42694294
void ControlHandler(DWORD request)
@@ -4566,7 +4591,7 @@ static void flush_ssl_buffer(int fd, short event, void* arg)
45664591
static void setup_flush_ssl_buffer(request_t* r)
45674592
{
45684593
event_del(&r->ev);
4569-
event_set(&r->ev, r->sock, EV_WRITE, flush_ssl_buffer, r);
4594+
event_assign(&r->ev, gcb.event_base, r->sock, EV_WRITE, flush_ssl_buffer, r);
45704595
r->tm.tv_sec = 5;
45714596
r->tm.tv_usec = 0;
45724597
(void)event_add(&r->ev, &r->tm);
@@ -4678,7 +4703,7 @@ static void request_cleanup(request_t *r)
46784703
static void setup_do_close(request_t* r)
46794704
{
46804705
event_del(&r->ev);
4681-
event_set(&r->ev, r->sock, EV_READ, do_close, r);
4706+
event_assign(&r->ev, gcb.event_base, r->sock, EV_READ, do_close, r);
46824707

46834708
r->tm.tv_sec = 60;
46844709
r->tm.tv_usec = 0;

0 commit comments

Comments
 (0)