Skip to content

Commit 3adff9d

Browse files
committed
Fixes #1793.
Reshape the tearing down process (connection close) to prevent race conditions between the main thread and the progress thread. Minor cleanups.
1 parent 6de64dd commit 3adff9d

File tree

3 files changed

+60
-54
lines changed

3 files changed

+60
-54
lines changed

opal/mca/btl/tcp/btl_tcp.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ int mca_btl_tcp_del_procs(struct mca_btl_base_module_t* btl,
149149
{
150150
mca_btl_tcp_module_t* tcp_btl = (mca_btl_tcp_module_t*)btl;
151151
size_t i;
152-
for(i=0; i<nprocs; i++) {
152+
for( i = 0; i < nprocs; i++ ) {
153153
mca_btl_tcp_endpoint_t* tcp_endpoint = endpoints[i];
154154
if(tcp_endpoint->endpoint_proc != mca_btl_tcp_proc_local()) {
155155
opal_list_remove_item(&tcp_btl->tcp_endpoints, (opal_list_item_t*)tcp_endpoint);

opal/mca/btl/tcp/btl_tcp_component.c

Lines changed: 50 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,8 @@ static int mca_btl_tcp_component_register(void)
259259
" used to reduce the number of syscalls, by replacing them with memcpy."
260260
" Every read will read the expected data plus the amount of the"
261261
" endpoint_cache", 30*1024, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_endpoint_cache);
262-
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)", 0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
262+
mca_btl_tcp_param_register_int ("use_nagle", "Whether to use Nagle's algorithm or not (using Nagle's algorithm may increase short message latency)",
263+
0, OPAL_INFO_LVL_4, &mca_btl_tcp_component.tcp_not_use_nodelay);
263264
mca_btl_tcp_param_register_int( "port_min_v4",
264265
"The minimum port where the TCP BTL will try to bind (default 1024)",
265266
1024, OPAL_INFO_LVL_2, &mca_btl_tcp_component.tcp_port_min);
@@ -299,9 +300,8 @@ static int mca_btl_tcp_component_register(void)
299300
opal_process_info.nodename,
300301
mca_btl_tcp_component.tcp_if_seq,
301302
"Progress thread support compiled out");
302-
}
303+
}
303304
#endif /* !defined(MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD) */
304-
305305
mca_btl_tcp_component.report_all_unfound_interfaces = false;
306306
(void) mca_base_component_var_register(&mca_btl_tcp_component.super.btl_version,
307307
"warn_all_unfound_interfaces",
@@ -394,8 +394,46 @@ static int mca_btl_tcp_component_open(void)
394394

395395
static int mca_btl_tcp_component_close(void)
396396
{
397-
opal_list_item_t* item;
398-
opal_list_item_t* next;
397+
opal_list_item_t *item;
398+
399+
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
400+
/**
401+
* If we have a progress thread we should shut it down before
402+
* moving forward with the TCP tearing down process.
403+
*/
404+
if( (NULL != mca_btl_tcp_event_base) &&
405+
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
406+
/* Turn of the progress thread before moving forward */
407+
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
408+
void* ret = NULL; /* not currently used */
409+
410+
mca_btl_tcp_progress_thread_trigger = 0;
411+
/* Let the progress thread know that we're going away */
412+
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
413+
close(mca_btl_tcp_pipe_to_progress[1]);
414+
mca_btl_tcp_pipe_to_progress[1] = -1;
415+
}
416+
/* wait until the TCP progress thread completes */
417+
opal_thread_join(&mca_btl_tcp_progress_thread, &ret);
418+
assert( -1 == mca_btl_tcp_progress_thread_trigger );
419+
}
420+
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
421+
opal_event_base_free(mca_btl_tcp_event_base);
422+
mca_btl_tcp_event_base = NULL;
423+
424+
/* Close the remaining pipes */
425+
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
426+
close(mca_btl_tcp_pipe_to_progress[0]);
427+
mca_btl_tcp_pipe_to_progress[0] = -1;
428+
}
429+
}
430+
431+
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
432+
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
433+
434+
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
435+
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
436+
#endif
399437

400438
if (NULL != mca_btl_tcp_component.tcp_btls) {
401439
free(mca_btl_tcp_component.tcp_btls);
@@ -416,11 +454,8 @@ static int mca_btl_tcp_component_close(void)
416454

417455
/* remove all pending events. Do not lock the tcp_events list as
418456
the event themselves will unregister during the destructor. */
419-
for(item = opal_list_get_first(&mca_btl_tcp_component.tcp_events);
420-
item != opal_list_get_end(&mca_btl_tcp_component.tcp_events);
421-
item = next) {
457+
while( NULL != (item = opal_list_remove_first(&mca_btl_tcp_component.tcp_events)) ) {
422458
mca_btl_tcp_event_t* event = (mca_btl_tcp_event_t*)item;
423-
next = opal_list_get_next(item);
424459
opal_event_del(&event->event);
425460
OBJ_RELEASE(event);
426461
}
@@ -436,40 +471,6 @@ static int mca_btl_tcp_component_close(void)
436471
mca_common_cuda_fini();
437472
#endif /* OPAL_CUDA_SUPPORT */
438473

439-
#if MCA_BTL_TCP_SUPPORT_PROGRESS_THREAD
440-
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_eager_mutex);
441-
OBJ_DESTRUCT(&mca_btl_tcp_component.tcp_frag_max_mutex);
442-
443-
if( (NULL != mca_btl_tcp_event_base) &&
444-
(mca_btl_tcp_event_base != opal_sync_event_base) ) {
445-
/* Turn of the progress thread before moving forward */
446-
if( -1 != mca_btl_tcp_progress_thread_trigger ) {
447-
mca_btl_tcp_progress_thread_trigger = 0;
448-
/* Let the progress thread know that we're going away */
449-
if( -1 != mca_btl_tcp_pipe_to_progress[1] ) {
450-
close(mca_btl_tcp_pipe_to_progress[1]);
451-
mca_btl_tcp_pipe_to_progress[1] = -1;
452-
}
453-
while( -1 != mca_btl_tcp_progress_thread_trigger ) {
454-
/*event_base_loopbreak(mca_btl_tcp_event_base);*/
455-
sched_yield();
456-
usleep(100); /* give app a chance to re-enter library */
457-
}
458-
}
459-
opal_event_del(&mca_btl_tcp_component.tcp_recv_thread_async_event);
460-
opal_event_base_free(mca_btl_tcp_event_base);
461-
mca_btl_tcp_event_base = NULL;
462-
463-
/* Close the remaining pipes */
464-
if( -1 != mca_btl_tcp_pipe_to_progress[0] ) {
465-
close(mca_btl_tcp_pipe_to_progress[0]);
466-
mca_btl_tcp_pipe_to_progress[0] = -1;
467-
}
468-
}
469-
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_mutex);
470-
OBJ_DESTRUCT(&mca_btl_tcp_ready_frag_pending_queue);
471-
#endif
472-
473474
return OPAL_SUCCESS;
474475
}
475476

@@ -1032,6 +1033,8 @@ static int mca_btl_tcp_component_create_listen(uint16_t af_family)
10321033
mca_btl_tcp_progress_thread_trigger = -1; /* thread not started */
10331034
goto move_forward_with_no_thread;
10341035
}
1036+
/* We have async progress, the rest of the library should now protect itself against races */
1037+
opal_set_using_threads(true);
10351038
}
10361039
}
10371040
else {
@@ -1295,12 +1298,12 @@ static void mca_btl_tcp_component_accept_handler( int incoming_sd,
12951298
*/
12961299
static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
12971300
{
1301+
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
12981302
opal_process_name_t guid;
12991303
struct sockaddr_storage addr;
1300-
int retval;
1301-
mca_btl_tcp_proc_t* btl_proc;
13021304
opal_socklen_t addr_len = sizeof(addr);
1303-
mca_btl_tcp_event_t *event = (mca_btl_tcp_event_t *)user;
1305+
mca_btl_tcp_proc_t* btl_proc;
1306+
int retval;
13041307

13051308
OBJ_RELEASE(event);
13061309

@@ -1339,6 +1342,6 @@ static void mca_btl_tcp_component_recv_handler(int sd, short flags, void* user)
13391342
return;
13401343
}
13411344

1342-
/* are there any existing peer instances will to accept this connection */
1345+
/* are there any existing peer instances willing to accept this connection */
13431346
(void)mca_btl_tcp_proc_accept(btl_proc, (struct sockaddr*)&addr, sd);
13441347
}

opal/mca/btl/tcp/btl_tcp_endpoint.c

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,7 @@ mca_btl_tcp_endpoint_dump(int level,
251251
if (used >= DEBUG_LENGTH) goto out;
252252
#if MCA_BTL_TCP_ENDPOINT_CACHE
253253
used += snprintf(&outmsg[used], DEBUG_LENGTH - used, "\n\t[cache %p used %lu/%lu]",
254-
btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
254+
(void*)btl_endpoint->endpoint_cache, btl_endpoint->endpoint_cache_pos - btl_endpoint->endpoint_cache,
255255
btl_endpoint->endpoint_cache_length);
256256
if (used >= DEBUG_LENGTH) goto out;
257257
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
@@ -510,21 +510,24 @@ void mca_btl_tcp_endpoint_accept(mca_btl_base_endpoint_t* btl_endpoint,
510510
*/
511511
void mca_btl_tcp_endpoint_close(mca_btl_base_endpoint_t* btl_endpoint)
512512
{
513+
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "[close]");
513514
if(btl_endpoint->endpoint_sd < 0)
514515
return;
515516
btl_endpoint->endpoint_retries++;
516517
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(recv) [close]");
517518
opal_event_del(&btl_endpoint->endpoint_recv_event);
518519
MCA_BTL_TCP_ENDPOINT_DUMP(1, btl_endpoint, false, "event_del(send) [close]");
519520
opal_event_del(&btl_endpoint->endpoint_send_event);
520-
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
521-
btl_endpoint->endpoint_sd = -1;
521+
522522
#if MCA_BTL_TCP_ENDPOINT_CACHE
523523
free( btl_endpoint->endpoint_cache );
524524
btl_endpoint->endpoint_cache = NULL;
525525
btl_endpoint->endpoint_cache_pos = NULL;
526526
btl_endpoint->endpoint_cache_length = 0;
527527
#endif /* MCA_BTL_TCP_ENDPOINT_CACHE */
528+
529+
CLOSE_THE_SOCKET(btl_endpoint->endpoint_sd);
530+
btl_endpoint->endpoint_sd = -1;
528531
/**
529532
* If we keep failing to connect to the peer let the caller know about
530533
* this situation by triggering all the pending fragments callback and
@@ -675,9 +678,9 @@ void mca_btl_tcp_set_socket_options(int sd)
675678
/*
676679
* Start a connection to the endpoint. This will likely not complete,
677680
* as the socket is set to non-blocking, so register for event
678-
* notification of connect completion. On connection we send
679-
* our globally unique process identifier to the endpoint and wait for
680-
* the endpoints response.
681+
* notification of connect completion. On connection we send our
682+
* globally unique process identifier to the endpoint and wait for
683+
* the endpoint response.
681684
*/
682685
static int mca_btl_tcp_endpoint_start_connect(mca_btl_base_endpoint_t* btl_endpoint)
683686
{

0 commit comments

Comments
 (0)