Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 64764ab

Browse files
committed
osc/pt2pt: fix several bugs
This commit fixes some bugs uncovered during thread testing of 2.0.1rc1. With these fixes the component is running cleanly with threads. Signed-off-by: Nathan Hjelm <[email protected]> (cherry picked from commit open-mpi/ompi@70f8a6e) Signed-off-by: Nathan Hjelm <[email protected]>
1 parent edb35d5 commit 64764ab

File tree

5 files changed

+68
-20
lines changed

5 files changed

+68
-20
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,13 +955,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *
955955
static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
956956
{
957957
ompi_osc_pt2pt_sync_t *sync;
958+
ompi_osc_pt2pt_peer_t *peer;
958959

959-
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL);
960+
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer);
960961
if (!sync) {
961962
return false;
962963
}
963964

964-
return sync->eager_send_active;
965+
return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer);
965966
}
966967

967968
END_C_DECLS

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
118118
int ret;
119119

120120
/* if we are in active target mode wait until all post messages arrive */
121-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
121+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
122122

123123
ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
124124
target, target_count, target_datatype);
@@ -142,7 +142,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo
142142
int ret;
143143

144144
/* if we are in active target mode wait until all post messages arrive */
145-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
145+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
146146

147147
ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
148148
target, target_count, target_datatype);
@@ -164,7 +164,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
164164
((unsigned long) target_disp * module->disp_unit);
165165

166166
/* if we are in active target mode wait until all post messages arrive */
167-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
167+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
168168

169169
ompi_osc_pt2pt_accumulate_lock (module);
170170

@@ -188,7 +188,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
188188
int ret;
189189

190190
/* if we are in active target mode wait until all post messages arrive */
191-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
191+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
192192

193193
ompi_osc_pt2pt_accumulate_lock (module);
194194

@@ -338,7 +338,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
338338

339339
if (is_long_msg) {
340340
/* wait for eager sends to be active before starting a long put */
341-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
341+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
342342
}
343343

344344
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@@ -497,7 +497,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
497497

498498
if (is_long_msg) {
499499
/* wait for synchronization before posting a long message */
500-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
500+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
501501
}
502502

503503
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
@@ -804,7 +804,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
804804

805805
if (!release_req) {
806806
/* wait for epoch to begin before starting rget operation */
807-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
807+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
808808
}
809809

810810
header = (ompi_osc_pt2pt_header_get_t*) ptr;
@@ -970,7 +970,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
970970

971971
if (!release_req) {
972972
/* wait for epoch to begin before starting operation */
973-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
973+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
974974
}
975975

976976
/* optimize the self case. TODO: optimize the local case */

ompi/mca/osc/pt2pt/osc_pt2pt_frag.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,37 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
153153
return ret;
154154
}
155155

156+
int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target)
157+
{
158+
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
159+
ompi_osc_pt2pt_frag_t *frag;
160+
int ret = OMPI_SUCCESS;
161+
162+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
163+
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
164+
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
165+
166+
/* walk through the pending list and send */
167+
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
168+
ret = frag_send(module, frag);
169+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
170+
break;
171+
}
172+
}
173+
174+
/* XXX -- TODO -- better error handling */
175+
if (OMPI_SUCCESS != ret) {
176+
return ret;
177+
}
178+
179+
/* flush the active frag */
180+
ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
181+
182+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
183+
"osc pt2pt: frag flush target %d finished", target));
184+
185+
return ret;
186+
}
156187

157188
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
158189
{

ompi/mca/osc/pt2pt/osc_pt2pt_frag.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ struct ompi_osc_pt2pt_frag_t {
4141
typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
4242
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
4343

44-
extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45-
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46-
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
44+
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45+
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46+
int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target);
47+
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
4748

4849
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
4950
ompi_osc_pt2pt_frag_t* buffer)

ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
122122

123123
int ret;
124124

125+
OPAL_THREAD_LOCK(&peer->lock);
126+
if (ompi_osc_pt2pt_peer_locked (peer)) {
127+
OPAL_THREAD_UNLOCK(&peer->lock);
128+
return OMPI_SUCCESS;
129+
}
130+
125131
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
126132

127133
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
@@ -137,16 +143,23 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
137143
lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
138144
OSC_PT2PT_HTON(&lock_req, module, target);
139145

140-
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
146+
do {
147+
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
148+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
149+
break;
150+
}
151+
152+
/* make sure the request gets sent, so we can start eager sending... */
153+
ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target);
154+
} while (0);
155+
141156
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
142-
return ret;
157+
OPAL_THREAD_ADD32(&lock->sync_expected, -1);
143158
}
144159

145-
/* make sure the request gets sent, so we can start eager sending... */
146-
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
147-
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
148-
ompi_osc_pt2pt_peer_set_locked (peer, true);
149-
}
160+
ompi_osc_pt2pt_peer_set_locked (peer, true);
161+
162+
OPAL_THREAD_UNLOCK(&peer->lock);
150163

151164
return ret;
152165
}
@@ -316,6 +329,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
316329
if (OPAL_UNLIKELY(NULL == lock)) {
317330
return OMPI_ERR_OUT_OF_RESOURCE;
318331
}
332+
333+
lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
319334
} else {
320335
lock = &module->all_sync;
321336
}

0 commit comments

Comments
 (0)