Skip to content

Commit 362ac8b

Browse files
committed
osc/pt2pt: fix threading issues
This commit fixes a number of threading issues discovered in osc/pt2pt. This includes: - Lock the synchronization object not the module in osc_pt2pt_start. This fixes a race between the start function and processing post messages. - Always lock before calling cond_broadcast. Fixes a race between the waiting thread and signaling thread. - Make all atomically updated values volatile. - Make the module lock recursive to protect against some deadlock conditions. Will roll this back once the locks have been re-designed. - Mark incoming complete *after* completing an accumulate not before. This was causing an incorrect answer under certain conditions. Signed-off-by: Nathan Hjelm <[email protected]>
1 parent a9d836b commit 362ac8b

File tree

8 files changed

+115
-92
lines changed

8 files changed

+115
-92
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ struct ompi_osc_pt2pt_peer_t {
118118
opal_list_t queued_frags;
119119

120120
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
121-
int32_t passive_incoming_frag_count;
121+
volatile int32_t passive_incoming_frag_count;
122122

123123
/** peer flags */
124124
volatile int32_t flags;
@@ -198,7 +198,7 @@ struct ompi_osc_pt2pt_module_t {
198198
int disp_unit;
199199

200200
/** Mutex lock protecting module data */
201-
opal_mutex_t lock;
201+
opal_recursive_mutex_t lock;
202202

203203
/** condition variable associated with lock */
204204
opal_condition_t cond;
@@ -214,19 +214,13 @@ struct ompi_osc_pt2pt_module_t {
214214
uint32_t *epoch_outgoing_frag_count;
215215

216216
/** cyclic counter for a unique tage for long messages. */
217-
uint32_t tag_counter;
217+
volatile uint32_t tag_counter;
218218

219-
/* Number of outgoing fragments that have completed since the
220-
begining of time */
221-
volatile uint32_t outgoing_frag_count;
222-
/* Next outgoing fragment count at which we want a signal on cond */
223-
volatile uint32_t outgoing_frag_signal_count;
219+
/** number of outgoing fragments still to be completed */
220+
volatile int32_t outgoing_frag_count;
224221

225-
/* Number of incoming fragments that have completed since the
226-
begining of time */
227-
volatile uint32_t active_incoming_frag_count;
228-
/* Next incoming buffer count at which we want a signal on cond */
229-
volatile uint32_t active_incoming_frag_signal_count;
222+
/** number of incoming fragments */
223+
volatile int32_t active_incoming_frag_count;
230224

231225
/** Number of targets locked/being locked */
232226
unsigned int passive_target_access_epoch;
@@ -239,7 +233,7 @@ struct ompi_osc_pt2pt_module_t {
239233

240234
/** Number of "count" messages from the remote complete group
241235
we've received */
242-
int32_t num_complete_msgs;
236+
volatile int32_t num_complete_msgs;
243237

244238
/* ********************* LOCK data ************************ */
245239

@@ -264,7 +258,12 @@ struct ompi_osc_pt2pt_module_t {
264258

265259
/* enforce accumulate semantics */
266260
opal_atomic_lock_t accumulate_lock;
267-
opal_list_t pending_acc;
261+
262+
/** accumulate operations pending the accumulation lock */
263+
opal_list_t pending_acc;
264+
265+
/** lock for pending_acc */
266+
opal_mutex_t pending_acc_lock;
268267

269268
/** Lock for garbage collection lists */
270269
opal_mutex_t gc_lock;
@@ -512,23 +511,29 @@ int ompi_osc_pt2pt_progress_pending_acc (ompi_osc_pt2pt_module_t *module);
512511
*/
513512
static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, int source)
514513
{
514+
int32_t new_value;
515+
515516
if (MPI_PROC_NULL == source) {
516517
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
517-
"mark_incoming_completion marking active incoming complete. count = %d. signal = %d",
518-
(int) module->active_incoming_frag_count + 1, module->active_incoming_frag_signal_count));
519-
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_count, 1);
520-
if (module->active_incoming_frag_count >= module->active_incoming_frag_signal_count) {
518+
"mark_incoming_completion marking active incoming complete. module %p, count = %d",
519+
(void *) module, (int) module->active_incoming_frag_count + 1));
520+
new_value = OPAL_THREAD_ADD32(&module->active_incoming_frag_count, 1);
521+
if (new_value >= 0) {
522+
OPAL_THREAD_LOCK(&module->lock);
521523
opal_condition_broadcast(&module->cond);
524+
OPAL_THREAD_UNLOCK(&module->lock);
522525
}
523526
} else {
524527
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, source);
525528

526529
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
527-
"mark_incoming_completion marking passive incoming complete. source = %d, count = %d",
528-
source, (int) peer->passive_incoming_frag_count + 1));
529-
OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
530-
if (0 == peer->passive_incoming_frag_count) {
530+
"mark_incoming_completion marking passive incoming complete. module %p, source = %d, count = %d",
531+
(void *) module, source, (int) peer->passive_incoming_frag_count + 1));
532+
new_value = OPAL_THREAD_ADD32((int32_t *) &peer->passive_incoming_frag_count, 1);
533+
if (0 == new_value) {
534+
OPAL_THREAD_LOCK(&module->lock);
531535
opal_condition_broadcast(&module->cond);
536+
OPAL_THREAD_UNLOCK(&module->lock);
532537
}
533538
}
534539
}
@@ -548,9 +553,13 @@ static inline void mark_incoming_completion (ompi_osc_pt2pt_module_t *module, in
548553
*/
549554
static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module)
550555
{
551-
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1);
552-
if (module->outgoing_frag_count >= module->outgoing_frag_signal_count) {
556+
int32_t new_value = OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, 1);
557+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
558+
"mark_outgoing_completion: outgoing_frag_count = %d", new_value));
559+
if (new_value >= 0) {
560+
OPAL_THREAD_LOCK(&module->lock);
553561
opal_condition_broadcast(&module->cond);
562+
OPAL_THREAD_UNLOCK(&module->lock);
554563
}
555564
}
556565

@@ -568,7 +577,7 @@ static inline void mark_outgoing_completion (ompi_osc_pt2pt_module_t *module)
568577
*/
569578
static inline void ompi_osc_signal_outgoing (ompi_osc_pt2pt_module_t *module, int target, int count)
570579
{
571-
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_signal_count, count);
580+
OPAL_THREAD_ADD32((int32_t *) &module->outgoing_frag_count, -count);
572581
if (MPI_PROC_NULL != target) {
573582
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
574583
"ompi_osc_signal_outgoing_passive: target = %d, count = %d, total = %d", target,

ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,6 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
168168
MPI_SUM, module->comm,
169169
module->comm->c_coll.coll_reduce_scatter_block_module);
170170
if (OMPI_SUCCESS != ret) {
171-
OPAL_THREAD_UNLOCK(&module->lock);
172171
return ret;
173172
}
174173

@@ -181,11 +180,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
181180
incoming_reqs));
182181

183182
/* set our complete condition for incoming requests */
184-
module->active_incoming_frag_signal_count += incoming_reqs;
183+
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -incoming_reqs);
185184

186185
/* wait for completion */
187-
while (module->outgoing_frag_count != module->outgoing_frag_signal_count ||
188-
module->active_incoming_frag_count < module->active_incoming_frag_signal_count) {
186+
while (module->outgoing_frag_count < 0 || module->active_incoming_frag_count < 0) {
189187
opal_condition_wait(&module->cond, &module->lock);
190188
}
191189

@@ -196,10 +194,10 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
196194
}
197195

198196
module->all_sync.epoch_active = false;
199-
200-
opal_condition_broadcast (&module->cond);
201197
OPAL_THREAD_UNLOCK(&module->lock);
202198

199+
module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier_module);
200+
203201
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
204202
"osc pt2pt: fence end: %d", ret));
205203

@@ -212,11 +210,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
212210
ompi_osc_pt2pt_module_t *module = GET_MODULE(win);
213211
ompi_osc_pt2pt_sync_t *sync = &module->all_sync;
214212

215-
OPAL_THREAD_LOCK(&module->lock);
213+
OPAL_THREAD_LOCK(&sync->lock);
216214

217215
/* check if we are already in an access epoch */
218216
if (ompi_osc_pt2pt_access_epoch_active (module)) {
219-
OPAL_THREAD_UNLOCK(&module->lock);
217+
OPAL_THREAD_UNLOCK(&sync->lock);
220218
return OMPI_ERR_RMA_SYNC;
221219
}
222220

@@ -249,7 +247,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
249247
if (0 == ompi_group_size (group)) {
250248
/* nothing more to do. this is an empty start epoch */
251249
sync->eager_send_active = true;
252-
OPAL_THREAD_UNLOCK(&module->lock);
250+
OPAL_THREAD_UNLOCK(&sync->lock);
253251
return OMPI_SUCCESS;
254252
}
255253

@@ -258,12 +256,11 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
258256
/* translate the group ranks into the communicator */
259257
sync->peer_list.peers = ompi_osc_pt2pt_get_peers (module, group);
260258
if (NULL == sync->peer_list.peers) {
261-
OPAL_THREAD_UNLOCK(&module->lock);
259+
OPAL_THREAD_UNLOCK(&sync->lock);
262260
return OMPI_ERR_OUT_OF_RESOURCE;
263261
}
264262

265263
if (!(assert & MPI_MODE_NOCHECK)) {
266-
OPAL_THREAD_LOCK(&sync->lock);
267264
for (int i = 0 ; i < sync->num_peers ; ++i) {
268265
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
269266

@@ -276,7 +273,6 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
276273
ompi_osc_pt2pt_peer_set_unex (peer, false);
277274
}
278275
}
279-
OPAL_THREAD_UNLOCK(&sync->lock);
280276
} else {
281277
sync->sync_expected = 0;
282278
}
@@ -295,7 +291,7 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
295291
"ompi_osc_pt2pt_start complete. eager sends active: %d",
296292
sync->eager_send_active));
297293

298-
OPAL_THREAD_UNLOCK(&module->lock);
294+
OPAL_THREAD_UNLOCK(&sync->lock);
299295
return OMPI_SUCCESS;
300296
}
301297

@@ -313,14 +309,14 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
313309
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
314310
"ompi_osc_pt2pt_complete entering..."));
315311

316-
OPAL_THREAD_LOCK(&module->lock);
312+
OPAL_THREAD_LOCK(&sync->lock);
317313
if (OMPI_OSC_PT2PT_SYNC_TYPE_PSCW != sync->type) {
318-
OPAL_THREAD_UNLOCK(&module->lock);
314+
OPAL_THREAD_UNLOCK(&sync->lock);
319315
return OMPI_ERR_RMA_SYNC;
320316
}
321317

322318
/* wait for all the post messages */
323-
ompi_osc_pt2pt_sync_wait (sync);
319+
ompi_osc_pt2pt_sync_wait_nolock (sync);
324320

325321
/* phase 1 cleanup sync object */
326322
group = sync->sync.pscw.group;
@@ -330,8 +326,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
330326

331327
/* need to reset the sync here to avoid processing incorrect post messages */
332328
ompi_osc_pt2pt_sync_reset (sync);
333-
334-
OPAL_THREAD_UNLOCK(&module->lock);
329+
OPAL_THREAD_UNLOCK(&sync->lock);
335330

336331
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
337332
"ompi_osc_pt2pt_complete all posts received. sending complete messages..."));
@@ -403,7 +398,7 @@ int ompi_osc_pt2pt_complete (ompi_win_t *win)
403398
OPAL_THREAD_LOCK(&module->lock);
404399
/* wait for outgoing requests to complete. Don't wait for incoming, as
405400
we're only completing the access epoch, not the exposure epoch */
406-
while (module->outgoing_frag_count != module->outgoing_frag_signal_count) {
401+
while (module->outgoing_frag_count < 0) {
407402
opal_condition_wait(&module->cond, &module->lock);
408403
}
409404

@@ -513,15 +508,13 @@ int ompi_osc_pt2pt_wait (ompi_win_t *win)
513508
}
514509

515510
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
516-
"ompi_osc_pt2pt_wait entering..."));
511+
"ompi_osc_pt2pt_wait entering... module %p", (void *) module));
517512

518513
OPAL_THREAD_LOCK(&module->lock);
519-
while (0 != module->num_complete_msgs ||
520-
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
521-
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "num_complete_msgs = %d, "
522-
"active_incoming_frag_count = %d, active_incoming_frag_signal_count = %d",
523-
module->num_complete_msgs, module->active_incoming_frag_count,
524-
module->active_incoming_frag_signal_count));
514+
while (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
515+
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output, "module %p, num_complete_msgs = %d, "
516+
"active_incoming_frag_count = %d", (void *) module, module->num_complete_msgs,
517+
module->active_incoming_frag_count));
525518
opal_condition_wait(&module->cond, &module->lock);
526519
}
527520

@@ -554,21 +547,15 @@ int ompi_osc_pt2pt_test (ompi_win_t *win, int *flag)
554547

555548
OPAL_THREAD_LOCK(&(module->lock));
556549

557-
if (0 != module->num_complete_msgs ||
558-
module->active_incoming_frag_count != module->active_incoming_frag_signal_count) {
550+
if (0 != module->num_complete_msgs || module->active_incoming_frag_count < 0) {
559551
*flag = 0;
560-
ret = OMPI_SUCCESS;
561552
} else {
562553
*flag = 1;
563554

564555
group = module->pw_group;
565556
module->pw_group = NULL;
566557

567-
OPAL_THREAD_UNLOCK(&(module->lock));
568-
569558
OBJ_RELEASE(group);
570-
571-
return OMPI_SUCCESS;
572559
}
573560

574561
OPAL_THREAD_UNLOCK(&(module->lock));
@@ -580,15 +567,19 @@ void osc_pt2pt_incoming_complete (ompi_osc_pt2pt_module_t *module, int source, i
580567
{
581568
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
582569
"osc pt2pt: process_complete got complete message from %d. expected fragment count %d. "
583-
"current signal count %d. current incomming count: %d. expected complete msgs: %d",
584-
source, frag_count, module->active_incoming_frag_signal_count,
585-
module->active_incoming_frag_count, module->num_complete_msgs));
570+
"current incomming count: %d. expected complete msgs: %d", source,
571+
frag_count, module->active_incoming_frag_count, module->num_complete_msgs));
586572

587573
/* the current fragment is not part of the frag_count so we need to add it here */
588-
OPAL_THREAD_ADD32((int32_t *) &module->active_incoming_frag_signal_count, frag_count);
574+
OPAL_THREAD_ADD32(&module->active_incoming_frag_count, -frag_count);
589575

590-
if (0 == OPAL_THREAD_ADD32((int32_t *) &module->num_complete_msgs, 1)) {
576+
/* make sure the signal count is written before changing the complete message count */
577+
opal_atomic_wmb ();
578+
579+
if (0 == OPAL_THREAD_ADD32(&module->num_complete_msgs, 1)) {
580+
OPAL_THREAD_LOCK(&module->lock);
591581
opal_condition_broadcast (&module->cond);
582+
OPAL_THREAD_UNLOCK(&module->lock);
592583
}
593584
}
594585

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -501,6 +501,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
501501

502502
is_long_msg = true;
503503
tag = get_tag (module);
504+
} else {
505+
/* still need to set the tag for the active/passive logic on the target */
506+
tag = !!(module->passive_target_access_epoch);
504507
}
505508

506509
if (is_long_msg) {
@@ -523,6 +526,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
523526
header->count = target_count;
524527
header->displacement = target_disp;
525528
header->op = op->o_f_to_c_index;
529+
header->tag = tag;
526530
ptr += sizeof (*header);
527531

528532
do {
@@ -565,7 +569,6 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
565569
}
566570
} else {
567571
header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_ACC_LONG;
568-
header->tag = tag;
569572
osc_pt2pt_hton(header, proc);
570573

571574
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,

ompi/mca/osc/pt2pt/osc_pt2pt_component.c

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -314,12 +314,13 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
314314
sizeof(ompi_osc_base_module_t));
315315

316316
/* initialize the objects, so that always free in cleanup */
317-
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);
317+
OBJ_CONSTRUCT(&module->lock, opal_recursive_mutex_t);
318318
OBJ_CONSTRUCT(&module->cond, opal_condition_t);
319319
OBJ_CONSTRUCT(&module->locks_pending, opal_list_t);
320320
OBJ_CONSTRUCT(&module->locks_pending_lock, opal_mutex_t);
321321
OBJ_CONSTRUCT(&module->outstanding_locks, opal_hash_table_t);
322322
OBJ_CONSTRUCT(&module->pending_acc, opal_list_t);
323+
OBJ_CONSTRUCT(&module->pending_acc_lock, opal_mutex_t);
323324
OBJ_CONSTRUCT(&module->buffer_gc, opal_list_t);
324325
OBJ_CONSTRUCT(&module->gc_lock, opal_mutex_t);
325326
OBJ_CONSTRUCT(&module->all_sync, ompi_osc_pt2pt_sync_t);

0 commit comments

Comments
 (0)