Skip to content

Commit c082068

Browse files
authored
Merge pull request #2006 from hjelmn/osc_pt2pt_fix
osc/pt2pt: fix several bugs
2 parents 17a210f + 70f8a6e commit c082068

File tree

5 files changed

+68
-20
lines changed

5 files changed

+68
-20
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -936,13 +936,14 @@ static inline bool ompi_osc_pt2pt_access_epoch_active (ompi_osc_pt2pt_module_t *
936936
static inline bool ompi_osc_pt2pt_peer_sends_active (ompi_osc_pt2pt_module_t *module, int rank)
937937
{
938938
ompi_osc_pt2pt_sync_t *sync;
939+
ompi_osc_pt2pt_peer_t *peer;
939940

940-
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, NULL);
941+
sync = ompi_osc_pt2pt_module_sync_lookup (module, rank, &peer);
941942
if (!sync) {
942943
return false;
943944
}
944945

945-
return sync->eager_send_active;
946+
return sync->eager_send_active || ompi_osc_pt2pt_peer_eager_active (peer);
946947
}
947948

948949
END_C_DECLS

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ static inline int ompi_osc_pt2pt_put_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
116116
int ret;
117117

118118
/* if we are in active target mode wait until all post messages arrive */
119-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
119+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
120120

121121
ret = ompi_datatype_sndrcv ((void *)source, source_count, source_datatype,
122122
target, target_count, target_datatype);
@@ -140,7 +140,7 @@ static inline int ompi_osc_pt2pt_get_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, vo
140140
int ret;
141141

142142
/* if we are in active target mode wait until all post messages arrive */
143-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
143+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
144144

145145
ret = ompi_datatype_sndrcv (source, source_count, source_datatype,
146146
target, target_count, target_datatype);
@@ -162,7 +162,7 @@ static inline int ompi_osc_pt2pt_cas_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
162162
((unsigned long) target_disp * module->disp_unit);
163163

164164
/* if we are in active target mode wait until all post messages arrive */
165-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
165+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
166166

167167
ompi_osc_pt2pt_accumulate_lock (module);
168168

@@ -186,7 +186,7 @@ static inline int ompi_osc_pt2pt_acc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, co
186186
int ret;
187187

188188
/* if we are in active target mode wait until all post messages arrive */
189-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
189+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
190190

191191
ompi_osc_pt2pt_accumulate_lock (module);
192192

@@ -336,7 +336,7 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
336336

337337
if (is_long_msg) {
338338
/* wait for eager sends to be active before starting a long put */
339-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
339+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
340340
}
341341

342342
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
@@ -495,7 +495,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
495495

496496
if (is_long_msg) {
497497
/* wait for synchronization before posting a long message */
498-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
498+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
499499
}
500500

501501
header = (ompi_osc_pt2pt_header_acc_t*) ptr;
@@ -802,7 +802,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
802802

803803
if (!release_req) {
804804
/* wait for epoch to begin before starting rget operation */
805-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
805+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
806806
}
807807

808808
header = (ompi_osc_pt2pt_header_get_t*) ptr;
@@ -968,7 +968,7 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
968968

969969
if (!release_req) {
970970
/* wait for epoch to begin before starting operation */
971-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
971+
ompi_osc_pt2pt_sync_wait_expected (pt2pt_sync);
972972
}
973973

974974
/* optimize the self case. TODO: optimize the local case */

ompi/mca/osc/pt2pt/osc_pt2pt_frag.c

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,37 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
151151
return ret;
152152
}
153153

154+
int ompi_osc_pt2pt_frag_flush_target_locked (ompi_osc_pt2pt_module_t *module, int target)
155+
{
156+
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
157+
ompi_osc_pt2pt_frag_t *frag;
158+
int ret = OMPI_SUCCESS;
159+
160+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
161+
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
162+
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
163+
164+
/* walk through the pending list and send */
165+
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
166+
ret = frag_send(module, frag);
167+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
168+
break;
169+
}
170+
}
171+
172+
/* XXX -- TODO -- better error handling */
173+
if (OMPI_SUCCESS != ret) {
174+
return ret;
175+
}
176+
177+
/* flush the active frag */
178+
ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
179+
180+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
181+
"osc pt2pt: frag flush target %d finished", target));
182+
183+
return ret;
184+
}
154185

155186
int ompi_osc_pt2pt_frag_flush_all (ompi_osc_pt2pt_module_t *module)
156187
{

ompi/mca/osc/pt2pt/osc_pt2pt_frag.h

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,10 @@ struct ompi_osc_pt2pt_frag_t {
4141
typedef struct ompi_osc_pt2pt_frag_t ompi_osc_pt2pt_frag_t;
4242
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
4343

44-
extern int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45-
extern int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46-
extern int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
44+
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
45+
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
46+
int ompi_osc_pt2pt_frag_flush_target_locked(ompi_osc_pt2pt_module_t *module, int target);
47+
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
4748

4849
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
4950
ompi_osc_pt2pt_frag_t* buffer)

ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c

Lines changed: 22 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
122122

123123
int ret;
124124

125+
OPAL_THREAD_LOCK(&peer->lock);
126+
if (ompi_osc_pt2pt_peer_locked (peer)) {
127+
OPAL_THREAD_UNLOCK(&peer->lock);
128+
return OMPI_SUCCESS;
129+
}
130+
125131
(void) OPAL_THREAD_ADD32(&lock->sync_expected, 1);
126132

127133
assert (lock->type == OMPI_OSC_PT2PT_SYNC_TYPE_LOCK);
@@ -137,16 +143,23 @@ int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, omp
137143
lock_req.lock_ptr = (uint64_t) (uintptr_t) lock;
138144
OSC_PT2PT_HTON(&lock_req, module, target);
139145

140-
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
146+
do {
147+
ret = ompi_osc_pt2pt_control_send (module, target, &lock_req, sizeof (lock_req));
148+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
149+
break;
150+
}
151+
152+
/* make sure the request gets sent, so we can start eager sending... */
153+
ret = ompi_osc_pt2pt_frag_flush_target_locked (module, target);
154+
} while (0);
155+
141156
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
142-
return ret;
157+
OPAL_THREAD_ADD32(&lock->sync_expected, -1);
143158
}
144159

145-
/* make sure the request gets sent, so we can start eager sending... */
146-
ret = ompi_osc_pt2pt_frag_flush_target (module, target);
147-
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
148-
ompi_osc_pt2pt_peer_set_locked (peer, true);
149-
}
160+
ompi_osc_pt2pt_peer_set_locked (peer, true);
161+
162+
OPAL_THREAD_UNLOCK(&peer->lock);
150163

151164
return ret;
152165
}
@@ -316,6 +329,8 @@ static int ompi_osc_pt2pt_lock_internal (int lock_type, int target, int assert,
316329
if (OPAL_UNLIKELY(NULL == lock)) {
317330
return OMPI_ERR_OUT_OF_RESOURCE;
318331
}
332+
333+
lock->peer_list.peer = ompi_osc_pt2pt_peer_lookup (module, target);
319334
} else {
320335
lock = &module->all_sync;
321336
}

0 commit comments

Comments
 (0)