Skip to content

Commit 21fba8b

Browse files
author
Ralph Castain
authored
Merge pull request #3659 from rhc54/topic/threads
Update OPAL and ORTE for thread safety
2 parents 7be09f8 + 93cf3c7 commit 21fba8b

File tree

84 files changed

+1078
-1412
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

84 files changed

+1078
-1412
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,6 +475,7 @@ orte/test/system/opal_db
475475
orte/test/system/ulfm
476476
orte/test/system/pmixtool
477477
orte/test/system/orte_notify
478+
orte/test/system/threads
478479

479480
orte/tools/orte-checkpoint/orte-checkpoint
480481
orte/tools/orte-checkpoint/orte-checkpoint.1

opal/mca/pmix/pmix2x/pmix2x.c

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include "opal/mca/hwloc/base/base.h"
3232
#include "opal/runtime/opal.h"
3333
#include "opal/runtime/opal_progress_threads.h"
34+
#include "opal/threads/threads.h"
3435
#include "opal/util/argv.h"
3536
#include "opal/util/error.h"
3637
#include "opal/util/output.h"
@@ -164,6 +165,7 @@ static void return_local_event_hdlr(int status, opal_list_t *results,
164165
pmix_status_t pstatus;
165166
size_t n;
166167

168+
OPAL_ACQUIRE_OBJECT(cd);
167169
if (NULL != cd->pmixcbfunc) {
168170
op = OBJ_NEW(pmix2x_opcaddy_t);
169171

@@ -203,6 +205,8 @@ static void _event_hdlr(int sd, short args, void *cbdata)
203205
pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata;
204206
opal_pmix2x_event_t *event;
205207

208+
OPAL_ACQUIRE_OBJECT(cd);
209+
206210
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
207211
"%s _EVENT_HDLR RECEIVED NOTIFICATION FOR HANDLER %d OF STATUS %d",
208212
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME), (int)cd->id, cd->status);
@@ -312,6 +316,7 @@ void pmix2x_event_hdlr(size_t evhdlr_registration_id,
312316
/* now push it into the local thread */
313317
opal_event_assign(&cd->ev, opal_pmix_base.evbase,
314318
-1, EV_WRITE, _event_hdlr, cd);
319+
OPAL_POST_OBJECT(cd);
315320
opal_event_active(&cd->ev, EV_WRITE, 1);
316321
}
317322

@@ -986,6 +991,7 @@ static void errreg_cbfunc (pmix_status_t status,
986991
{
987992
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
988993

994+
OPAL_ACQUIRE_OBJECT(op);
989995
op->event->index = errhandler_ref;
990996
opal_output_verbose(5, opal_pmix_base_framework.framework_output,
991997
"PMIX2x errreg_cbfunc - error handler registered status=%d, reference=%lu",
@@ -1003,6 +1009,7 @@ static void _reg_hdlr(int sd, short args, void *cbdata)
10031009
opal_value_t *kv;
10041010
size_t n;
10051011

1012+
OPAL_ACQUIRE_OBJECT(cd);
10061013
opal_output_verbose(2, opal_pmix_base_framework.framework_output,
10071014
"%s REGISTER HANDLER CODES %s",
10081015
OPAL_NAME_PRINT(OPAL_PROC_MY_NAME),
@@ -1067,6 +1074,7 @@ static void _dereg_hdlr(int sd, short args, void *cbdata)
10671074
pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata;
10681075
opal_pmix2x_event_t *event;
10691076

1077+
OPAL_ACQUIRE_OBJECT(cd);
10701078
/* look for this event */
10711079
OPAL_LIST_FOREACH(event, &mca_pmix_pmix2x_component.events, opal_pmix2x_event_t) {
10721080
if (cd->handler == event->index) {
@@ -1116,6 +1124,8 @@ static void _notify(int sd, short args, void *cbdata)
11161124
pmix_data_range_t prange;
11171125
opal_pmix2x_jobid_trkr_t *job, *jptr;
11181126

1127+
OPAL_ACQUIRE_OBJECT(cd);
1128+
11191129
op = OBJ_NEW(pmix2x_opcaddy_t);
11201130

11211131
/* convert the status */
@@ -1204,6 +1214,8 @@ static void infocbfunc(pmix_status_t status,
12041214
opal_value_t *iptr;
12051215
size_t n;
12061216

1217+
OPAL_ACQUIRE_OBJECT(cd);
1218+
12071219
/* convert the array of pmix_info_t to the list of info */
12081220
if (NULL != info) {
12091221
results = OBJ_NEW(opal_list_t);
@@ -1294,6 +1306,8 @@ static void opcbfunc(pmix_status_t status, void *cbdata)
12941306
{
12951307
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
12961308

1309+
OPAL_ACQUIRE_OBJECT(op);
1310+
12971311
if (NULL != op->opcbfunc) {
12981312
op->opcbfunc(pmix2x_convert_rc(status), op->cbdata);
12991313
}

opal/mca/pmix/pmix2x/pmix2x.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t);
156156
_cd->cbdata = (cd); \
157157
opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \
158158
-1, EV_WRITE, (fn), (_cd)); \
159+
OPAL_POST_OBJECT(_cd); \
159160
opal_event_active(&((_cd)->ev), EV_WRITE, 1); \
160161
} while(0)
161162

@@ -170,6 +171,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t);
170171
_cd->cbdata = (cd); \
171172
opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \
172173
-1, EV_WRITE, (fn), (_cd)); \
174+
OPAL_POST_OBJECT(_cd); \
173175
opal_event_active(&((_cd)->ev), EV_WRITE, 1); \
174176
} while(0)
175177

@@ -185,6 +187,7 @@ OBJ_CLASS_DECLARATION(pmix2x_threadshift_t);
185187
_cd->cbdata = (cd); \
186188
opal_event_assign(&((_cd)->ev), opal_pmix_base.evbase, \
187189
-1, EV_WRITE, (fn), (_cd)); \
190+
OPAL_POST_OBJECT(_cd); \
188191
opal_event_active(&((_cd)->ev), EV_WRITE, 1); \
189192
} while(0)
190193

opal/mca/pmix/pmix2x/pmix2x_client.c

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
#endif
2828

2929
#include "opal/hash_string.h"
30+
#include "opal/threads/threads.h"
3031
#include "opal/util/argv.h"
3132
#include "opal/util/proc.h"
3233

@@ -44,6 +45,7 @@ static bool initialized = false;
4445
while ((a)) { \
4546
usleep(10); \
4647
} \
48+
OPAL_ACQUIRE_OBJECT(a); \
4749
} while (0)
4850

4951

@@ -53,11 +55,14 @@ static void errreg_cbfunc (pmix_status_t status,
5355
{
5456
opal_pmix2x_event_t *event = (opal_pmix2x_event_t*)cbdata;
5557

58+
OPAL_ACQUIRE_OBJECT(event);
59+
5660
event->index = errhandler_ref;
5761
opal_output_verbose(5, opal_pmix_base_framework.framework_output,
5862
"PMIX client errreg_cbfunc - error handler registered status=%d, reference=%lu",
5963
status, (unsigned long)errhandler_ref);
6064
regactive = false;
65+
OPAL_POST_OBJECT(regactive);
6166
}
6267

6368
int pmix2x_client_init(opal_list_t *ilist)
@@ -272,6 +277,7 @@ static void opcbfunc(pmix_status_t status, void *cbdata)
272277
{
273278
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
274279

280+
OPAL_ACQUIRE_OBJECT(op);
275281
if (NULL != op->opcbfunc) {
276282
op->opcbfunc(pmix2x_convert_rc(status), op->cbdata);
277283
}
@@ -521,6 +527,8 @@ static void val_cbfunc(pmix_status_t status,
521527
int rc;
522528
opal_value_t val, *v=NULL;
523529

530+
OPAL_ACQUIRE_OBJECT(op);
531+
524532
rc = pmix2x_convert_opalrc(status);
525533
if (PMIX_SUCCESS == status && NULL != kv) {
526534
rc = pmix2x_value_unload(&val, kv);
@@ -768,6 +776,8 @@ static void lk_cbfunc(pmix_status_t status,
768776
size_t n;
769777
opal_pmix2x_jobid_trkr_t *job, *jptr;
770778

779+
OPAL_ACQUIRE_OBJECT(op);
780+
771781
/* this is in the PMIx local thread - need to threadshift to
772782
* our own thread as we will be accessing framework-global
773783
* lists and objects */
@@ -817,7 +827,7 @@ static void lk_cbfunc(pmix_status_t status,
817827
}
818828
r = &results;
819829
}
820-
release:
830+
release:
821831
/* execute the callback */
822832
op->lkcbfunc(rc, r, op->cbdata);
823833

@@ -994,6 +1004,8 @@ static void spcbfunc(pmix_status_t status,
9941004
opal_jobid_t jobid=OPAL_JOBID_INVALID;
9951005
opal_pmix2x_jobid_trkr_t *job;
9961006

1007+
OPAL_ACQUIRE_OBJECT(op);
1008+
9971009
/* this is in the PMIx local thread - need to threadshift to
9981010
* our own thread as we will be accessing framework-global
9991011
* lists and objects */

opal/mca/pmix/pmix2x/pmix2x_server_north.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
#include "opal/mca/hwloc/base/base.h"
3030
#include "opal/runtime/opal.h"
3131
#include "opal/runtime/opal_progress_threads.h"
32+
#include "opal/threads/threads.h"
3233
#include "opal/util/argv.h"
3334
#include "opal/util/error.h"
3435
#include "opal/util/output.h"
@@ -142,6 +143,7 @@ static void opal_opcbfunc(int status, void *cbdata)
142143
{
143144
pmix2x_opalcaddy_t *opalcaddy = (pmix2x_opalcaddy_t*)cbdata;
144145

146+
OPAL_ACQUIRE_OBJECT(opalcaddy);
145147
if (NULL != opalcaddy->opcbfunc) {
146148
opalcaddy->opcbfunc(pmix2x_convert_opalrc(status), opalcaddy->cbdata);
147149
}

opal/mca/pmix/pmix2x/pmix2x_server_south.c

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
#include "opal/mca/hwloc/base/base.h"
3333
#include "opal/runtime/opal.h"
3434
#include "opal/runtime/opal_progress_threads.h"
35+
#include "opal/threads/threads.h"
3536
#include "opal/util/argv.h"
3637
#include "opal/util/error.h"
3738
#include "opal/util/output.h"
@@ -58,6 +59,7 @@ static size_t errhdler_ref = 0;
5859
while ((a)) { \
5960
usleep(10); \
6061
} \
62+
OPAL_ACQUIRE_OBJECT(a); \
6163
} while (0)
6264

6365
static void errreg_cbfunc (pmix_status_t status,
@@ -66,22 +68,27 @@ static void errreg_cbfunc (pmix_status_t status,
6668
{
6769
volatile bool *active = (volatile bool*)cbdata;
6870

71+
OPAL_ACQUIRE_OBJECT(active);
6972
errhdler_ref = errhandler_ref;
7073
opal_output_verbose(5, opal_pmix_base_framework.framework_output,
7174
"PMIX server errreg_cbfunc - error handler registered status=%d, reference=%lu",
7275
status, (unsigned long)errhandler_ref);
76+
OPAL_POST_OBJECT(active);
7377
*active = false;
7478
}
7579

7680
static void opcbfunc(pmix_status_t status, void *cbdata)
7781
{
7882
pmix2x_opcaddy_t *op = (pmix2x_opcaddy_t*)cbdata;
7983

84+
OPAL_ACQUIRE_OBJECT(op);
85+
8086
if (NULL != op->opcbfunc) {
8187
op->opcbfunc(pmix2x_convert_rc(status), op->cbdata);
8288
}
8389
if (op->active) {
8490
op->status = status;
91+
OPAL_POST_OBJECT(op);
8592
op->active = false;
8693
} else {
8794
OBJ_RELEASE(op);
@@ -92,6 +99,7 @@ static void op2cbfunc(pmix_status_t status, void *cbdata)
9299
{
93100
volatile bool *active = (volatile bool*)cbdata;
94101

102+
OPAL_POST_OBJECT(active);
95103
*active = false;
96104
}
97105

@@ -165,6 +173,7 @@ int pmix2x_server_init(opal_pmix_server_module_t *module,
165173
static void fincb(pmix_status_t status, void *cbdata)
166174
{
167175
volatile bool *active = (volatile bool*)cbdata;
176+
OPAL_POST_OBJECT(active);
168177
*active = false;
169178
}
170179

@@ -211,6 +220,8 @@ static void _reg_nspace(int sd, short args, void *cbdata)
211220
opal_pmix2x_jobid_trkr_t *job;
212221
pmix2x_opcaddy_t op;
213222

223+
OPAL_ACQUIRE_OBJECT(cd);
224+
214225
/* we must threadshift this request as we might not be in an event
215226
* and we are going to access framework-global lists/objects */
216227

@@ -301,6 +312,7 @@ int pmix2x_server_register_nspace(opal_jobid_t jobid,
301312
} else {
302313
opal_event_assign(&cd->ev, opal_pmix_base.evbase,
303314
-1, EV_WRITE, _reg_nspace, cd);
315+
OPAL_POST_OBJECT(cd);
304316
opal_event_active(&cd->ev, EV_WRITE, 1);
305317
}
306318

@@ -311,10 +323,12 @@ static void tdcbfunc(pmix_status_t status, void *cbdata)
311323
{
312324
pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata;
313325

326+
OPAL_ACQUIRE_OBJECT(cd);
314327
if (NULL != cd->opcbfunc) {
315328
cd->opcbfunc(pmix2x_convert_rc(status), cd->cbdata);
316329
}
317330
if (cd->active) {
331+
OPAL_POST_OBJECT(cd);
318332
cd->active = false;
319333
} else {
320334
OBJ_RELEASE(cd);
@@ -326,6 +340,7 @@ static void _dereg_nspace(int sd, short args, void *cbdata)
326340
pmix2x_threadshift_t *cd = (pmix2x_threadshift_t*)cbdata;
327341
opal_pmix2x_jobid_trkr_t *jptr;
328342

343+
OPAL_ACQUIRE_OBJECT(cd);
329344
/* if we don't already have it, we can ignore this */
330345
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
331346
if (jptr->jobid == cd->jobid) {
@@ -361,6 +376,7 @@ void pmix2x_server_deregister_nspace(opal_jobid_t jobid,
361376
} else {
362377
opal_event_assign(&cd->ev, opal_pmix_base.evbase,
363378
-1, EV_WRITE, _dereg_nspace, cd);
379+
OPAL_POST_OBJECT(cd);
364380
opal_event_active(&cd->ev, EV_WRITE, 1);
365381
}
366382
}
@@ -397,6 +413,7 @@ static void _dereg_client(int sd, short args, void *cbdata)
397413
opal_pmix2x_jobid_trkr_t *jptr;
398414
pmix_proc_t p;
399415

416+
OPAL_ACQUIRE_OBJECT(cd);
400417
/* if we don't already have it, we can ignore this */
401418
OPAL_LIST_FOREACH(jptr, &mca_pmix_pmix2x_component.jobids, opal_pmix2x_jobid_trkr_t) {
402419
if (jptr->jobid == cd->source->jobid) {
@@ -431,6 +448,7 @@ void pmix2x_server_deregister_client(const opal_process_name_t *proc,
431448
} else {
432449
opal_event_assign(&cd->ev, opal_pmix_base.evbase,
433450
-1, EV_WRITE, _dereg_client, cd);
451+
OPAL_POST_OBJECT(cd);
434452
opal_event_active(&cd->ev, EV_WRITE, 1);
435453
}
436454
}

opal/threads/threads.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
1414
* Copyright (c) 2015-2017 Research Organization for Information Science
1515
* and Technology (RIST). All rights reserved.
16+
* Copyright (c) 2017 Intel, Inc. All rights reserved.
1617
* $COPYRIGHT$
1718
*
1819
* Additional copyrights may follow
@@ -114,6 +115,19 @@ OPAL_DECLSPEC OBJ_CLASS_DECLARATION(opal_thread_t);
114115
opal_condition_broadcast((cnd)); \
115116
} while(0);
116117

118+
/* provide a macro for forward-proofing the shifting
119+
* of objects between libevent threads - at some point, we
120+
* may revamp that threading model */
121+
122+
/* post an object to another thread - for now, we
123+
* only have a memory barrier */
124+
#define OPAL_POST_OBJECT(o) opal_atomic_wmb()
125+
126+
/* acquire an object from another thread - for now,
127+
* we only have a memory barrier */
128+
#define OPAL_ACQUIRE_OBJECT(o) opal_atomic_rmb()
129+
130+
117131

118132
OPAL_DECLSPEC int opal_thread_start(opal_thread_t *);
119133
OPAL_DECLSPEC int opal_thread_join(opal_thread_t *, void **thread_return);

0 commit comments

Comments
 (0)