Skip to content

Commit 0e8f267

Browse files
committed
osc/pt2pt: bug fixes
This commit fixes several bugs identified by @ggouaillardet and MTT: - Fix SEGV in long send completion caused by missing update to the request callback data. - Add an MPI_Barrier to the fence short-cut. This fixes potential semantic issues where messages may be received before fence is reached. - Ensure fragments are flushed when using request-based RMA. This allows MPI_Test/MPI_Wait/etc to work as expected. - Restore the tag space back to 16-bits. It was intended that the space be expanded to 32-bits but the required change to the fragment headers was not committed. The tag space may be expanded in a later commit. Signed-off-by: Nathan Hjelm <[email protected]> (cherry picked from commit open-mpi/ompi@5b9c82a) Signed-off-by: Nathan Hjelm <[email protected]>
1 parent 2451473 commit 0e8f267

File tree

5 files changed

+100
-66
lines changed

5 files changed

+100
-66
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -631,8 +631,8 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
631631
opal_list_append (&mca_osc_pt2pt_component.pending_operations, &pending->super));
632632
}
633633

634-
#define OSC_PT2PT_FRAG_TAG 0x80000
635-
#define OSC_PT2PT_FRAG_MASK 0x7ffff
634+
#define OSC_PT2PT_FRAG_TAG 0x10000
635+
#define OSC_PT2PT_FRAG_MASK 0x0ffff
636636

637637
/**
638638
* get_tag:

ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -147,6 +147,7 @@ int ompi_osc_pt2pt_fence(int assert, ompi_win_t *win)
147147

148148
/* short-circuit the noprecede case */
149149
if (0 != (assert & MPI_MODE_NOPRECEDE)) {
150+
module->comm->c_coll.coll_barrier (module->comm, module->comm->c_coll.coll_barrier);
150151
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
151152
"osc pt2pt: fence end (short circuit)"));
152153
return ret;

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@ static int ompi_osc_pt2pt_req_comm_complete (ompi_request_t *request)
5858
"ompi_osc_pt2pt_req_comm_complete called tag = %d",
5959
request->req_status.MPI_TAG));
6060

61+
/* update the cbdata for ompi_osc_pt2pt_comm_complete */
62+
request->req_complete_cb_data = pt2pt_request->module;
63+
6164
if (0 == OPAL_THREAD_ADD32(&pt2pt_request->outstanding_requests, -1)) {
6265
ompi_osc_pt2pt_request_complete (pt2pt_request, request->req_status.MPI_ERROR);
6366
}
@@ -218,8 +221,8 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c
218221
((unsigned long) target_disp * module->disp_unit);
219222
int ret;
220223

221-
/* if we are in active target mode wait until all post messages arrive */
222-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
224+
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: starting local "
225+
"get accumulate"));
223226

224227
ompi_osc_pt2pt_accumulate_lock (module);
225228

@@ -250,6 +253,9 @@ static inline int ompi_osc_pt2pt_gacc_self (ompi_osc_pt2pt_sync_t *pt2pt_sync, c
250253

251254
ompi_osc_pt2pt_accumulate_unlock (module);
252255

256+
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output, "ompi_osc_pt2pt_gacc_self: local get "
257+
"accumulate complete"));
258+
253259
if (request) {
254260
/* NTH: is it ok to use an ompi error code here? */
255261
ompi_osc_pt2pt_request_complete (request, ret);
@@ -310,14 +316,14 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
310316
payload_len = origin_dt->super.size * origin_count;
311317
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len + payload_len;
312318

313-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
319+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
314320
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
315321
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + ddt_len;
316-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
322+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
317323
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
318324
/* allocate space for the header plus space to store ddt_len */
319325
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
320-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
326+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, false);
321327
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
322328
return OMPI_ERR_OUT_OF_RESOURCE;
323329
}
@@ -469,14 +475,14 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
469475
payload_len = origin_dt->super.size * origin_count;
470476

471477
frag_len = sizeof(*header) + ddt_len + payload_len;
472-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
478+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, true);
473479
if (OMPI_SUCCESS != ret) {
474480
frag_len = sizeof(*header) + ddt_len;
475-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
481+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
476482
if (OMPI_SUCCESS != ret) {
477483
/* allocate space for the header plus space to store ddt_len */
478484
frag_len = sizeof(*header) + 8;
479-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true);
485+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, true, !request);
480486
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
481487
return OMPI_ERR_OUT_OF_RESOURCE;
482488
}
@@ -488,7 +494,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
488494
tag = get_rtag (module);
489495
}
490496

491-
if (is_long_msg || is_long_datatype) {
497+
if (is_long_msg) {
492498
/* wait for synchronization before posting a long message */
493499
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
494500
}
@@ -631,7 +637,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
631637
}
632638

633639
frag_len = sizeof(ompi_osc_pt2pt_header_cswap_t) + ddt_len + payload_len;
634-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
640+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, false);
635641
if (OMPI_SUCCESS != ret) {
636642
return OMPI_ERR_OUT_OF_RESOURCE;
637643
}
@@ -663,9 +669,7 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
663669
return ret;
664670
}
665671

666-
ret = ompi_osc_pt2pt_frag_finish(module, frag);
667-
668-
return ret;
672+
return ompi_osc_pt2pt_frag_finish (module, frag);
669673
}
670674

671675

@@ -779,11 +783,11 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
779783
ddt_len = ompi_datatype_pack_description_length(target_dt);
780784

781785
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + ddt_len;
782-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
786+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
783787
if (OMPI_SUCCESS != ret) {
784788
/* allocate space for the header plus space to store ddt_len */
785789
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
786-
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false);
790+
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
787791
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
788792
return OMPI_ERR_OUT_OF_RESOURCE;
789793
}
@@ -961,6 +965,11 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
961965
return OMPI_SUCCESS;
962966
}
963967

968+
if (!release_req) {
969+
/* wait for epoch to begin before starting operation */
970+
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
971+
}
972+
964973
/* optimize the self case. TODO: optimize the local case */
965974
if (ompi_comm_rank (module->comm) == target_rank) {
966975
*request = &pt2pt_request->super;
@@ -987,14 +996,14 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
987996
}
988997

989998
frag_len = sizeof(*header) + ddt_len + payload_len;
990-
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false);
999+
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, false, release_req);
9911000
if (OMPI_SUCCESS != ret) {
9921001
frag_len = sizeof(*header) + ddt_len;
993-
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
1002+
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
9941003
if (OMPI_SUCCESS != ret) {
9951004
/* allocate space for the header plus space to store ddt_len */
9961005
frag_len = sizeof(*header) + 8;
997-
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true);
1006+
ret = ompi_osc_pt2pt_frag_alloc(module, target_rank, frag_len, &frag, &ptr, true, release_req);
9981007
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
9991008
return OMPI_ERR_OUT_OF_RESOURCE;
10001009
}
@@ -1014,11 +1023,6 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
10141023
/* increment the number of outgoing fragments */
10151024
ompi_osc_signal_outgoing (module, target_rank, pt2pt_request->outstanding_requests);
10161025

1017-
if (!release_req) {
1018-
/* wait for epoch to begin before starting operation */
1019-
ompi_osc_pt2pt_sync_wait (pt2pt_sync);
1020-
}
1021-
10221026
header = (ompi_osc_pt2pt_header_acc_t *) ptr;
10231027
header->base.flags = 0;
10241028
header->len = frag_len;

ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ int ompi_osc_pt2pt_control_send (ompi_osc_pt2pt_module_t *module, int target,
213213
char *ptr;
214214
int ret;
215215

216-
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false);
216+
ret = ompi_osc_pt2pt_frag_alloc(module, target, len, &frag, &ptr, false, true);
217217
if (OPAL_LIKELY(OMPI_SUCCESS == ret)) {
218218
memcpy (ptr, data, len);
219219

ompi/mca/osc/pt2pt/osc_pt2pt_frag.h

Lines changed: 69 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -57,16 +57,62 @@ static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
5757
return OMPI_SUCCESS;
5858
}
5959

60+
static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (ompi_osc_pt2pt_module_t *module,
61+
ompi_osc_pt2pt_peer_t *peer,
62+
size_t request_len)
63+
{
64+
ompi_osc_pt2pt_frag_t *curr;
65+
66+
/* to ensure ordering flush the buffer on the peer */
67+
curr = peer->active_frag;
68+
if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
69+
/* If there's something pending, the pending finish will
70+
start the buffer. Otherwise, we need to start it now. */
71+
int ret = ompi_osc_pt2pt_frag_finish (module, curr);
72+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
73+
return NULL;
74+
}
75+
}
76+
77+
curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
78+
if (OPAL_UNLIKELY(NULL == curr)) {
79+
return NULL;
80+
}
81+
82+
curr->target = peer->rank;
83+
84+
curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
85+
curr->top = (char*) (curr->header + 1);
86+
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
87+
curr->module = module;
88+
curr->pending = 1;
89+
90+
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
91+
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
92+
if (module->passive_target_access_epoch) {
93+
curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
94+
}
95+
curr->header->source = ompi_comm_rank(module->comm);
96+
curr->header->num_ops = 1;
97+
98+
return curr;
99+
}
100+
60101
/*
61-
* Note: module lock must be held during this operation
102+
* Note: this function takes the module lock
103+
*
104+
* buffered sends will cache the fragment on the peer object associated with the
105+
* target. unbuffered-sends will cause the target fragment to be flushed and
106+
* will not be cached on the peer. this causes the fragment to be flushed as
107+
* soon as it is sent. this allows request-based rma fragments to be completed
108+
* so MPI_Test/MPI_Wait/etc will work as expected.
62109
*/
63110
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
64111
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
65-
char **ptr, bool long_send)
112+
char **ptr, bool long_send, bool buffered)
66113
{
67114
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
68115
ompi_osc_pt2pt_frag_t *curr;
69-
int ret;
70116

71117
/* osc pt2pt headers can have 64-bit values. these will need to be aligned
72118
* on an 8-byte boundary on some architectures so we up align the allocation
@@ -77,51 +123,34 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
77123
return OMPI_ERR_OUT_OF_RESOURCE;
78124
}
79125

126+
OPAL_OUTPUT_VERBOSE((MCA_BASE_VERBOSE_TRACE, ompi_osc_base_framework.framework_output,
127+
"attempting to allocate buffer for %lu bytes to target %d. long send: %d, "
128+
"buffered: %d", (unsigned long) request_len, target, long_send, buffered));
129+
80130
OPAL_THREAD_LOCK(&module->lock);
81-
curr = peer->active_frag;
82-
if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
83-
if (NULL != curr && opal_atomic_cmpset (&peer->active_frag, curr, NULL)) {
84-
/* If there's something pending, the pending finish will
85-
start the buffer. Otherwise, we need to start it now. */
86-
ret = ompi_osc_pt2pt_frag_finish (module, curr);
87-
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
131+
if (buffered) {
132+
curr = peer->active_frag;
133+
if (NULL == curr || curr->remain_len < request_len || (long_send && curr->pending_long_sends == 32)) {
134+
curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
135+
if (OPAL_UNLIKELY(NULL == curr)) {
88136
OPAL_THREAD_UNLOCK(&module->lock);
89-
return ret;
137+
return OMPI_ERR_OUT_OF_RESOURCE;
90138
}
91-
}
92-
93-
curr = (ompi_osc_pt2pt_frag_t *) opal_free_list_get (&mca_osc_pt2pt_component.frags);
94-
if (OPAL_UNLIKELY(NULL == curr)) {
95-
return OMPI_ERR_OUT_OF_RESOURCE;
96-
}
97-
98-
curr->target = target;
99139

100-
curr->header = (ompi_osc_pt2pt_frag_header_t*) curr->buffer;
101-
curr->top = (char*) (curr->header + 1);
102-
curr->remain_len = mca_osc_pt2pt_component.buffer_size;
103-
curr->module = module;
104-
curr->pending = 2;
105-
curr->pending_long_sends = long_send;
106-
107-
curr->header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_FRAG;
108-
curr->header->base.flags = OMPI_OSC_PT2PT_HDR_FLAG_VALID;
109-
if (module->passive_target_access_epoch) {
110-
curr->header->base.flags |= OMPI_OSC_PT2PT_HDR_FLAG_PASSIVE_TARGET;
140+
curr->pending_long_sends = long_send;
141+
peer->active_frag = curr;
142+
} else {
143+
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
144+
curr->pending_long_sends += long_send;
111145
}
112-
curr->header->source = ompi_comm_rank(module->comm);
113-
curr->header->num_ops = 1;
114146

115-
if (curr->remain_len < request_len) {
147+
OPAL_THREAD_ADD32(&curr->pending, 1);
148+
} else {
149+
curr = ompi_osc_pt2pt_frag_alloc_non_buffered (module, peer, request_len);
150+
if (OPAL_UNLIKELY(NULL == curr)) {
116151
OPAL_THREAD_UNLOCK(&module->lock);
117-
return OMPI_ERR_TEMP_OUT_OF_RESOURCE;
152+
return OMPI_ERR_OUT_OF_RESOURCE;
118153
}
119-
120-
peer->active_frag = curr;
121-
} else {
122-
OPAL_THREAD_ADD32(&curr->pending, 1);
123-
OPAL_THREAD_ADD32(&curr->header->num_ops, 1);
124-
curr->pending_long_sends += long_send;
125154
}
126155

127156
*ptr = curr->top;

0 commit comments

Comments
 (0)