Skip to content

Commit 7589a25

Browse files
committed
osc/pt2pt: do not repost receive from request callback
This commit fixes an issue that can occur if a target gets overwhelmed with requests. This can cause osc/pt2pt to go into deep recursion with a stack like req_complete_cb -> ompi_osc_pt2pt_callback -> start -> req_complete_cb -> ... . At small scale this is fine as the recursion depth stays small but at larger scale we can quickly exhaust the stack processing frag requests. To fix the issue the request callback now simply puts the request on a list and returns. The osc/pt2pt progress function then handles the processing and reposting of the request. As part of this change osc/pt2pt can now post multiple fragment receive requests per window. This should help prevent a target from being overwhelmed. Signed-off-by: Nathan Hjelm <[email protected]>
1 parent 8d0baf1 commit 7589a25

File tree

8 files changed

+184
-66
lines changed

8 files changed

+184
-66
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
BEGIN_C_DECLS
4848

4949
struct ompi_osc_pt2pt_frag_t;
50+
struct ompi_osc_pt2pt_receive_t;
5051

5152
struct ompi_osc_pt2pt_component_t {
5253
/** Extend the basic osc component interface */
@@ -61,6 +62,9 @@ struct ompi_osc_pt2pt_component_t {
6162
/** module count */
6263
int module_count;
6364

65+
/** number of buffers per window */
66+
int receive_count;
67+
6468
/** free list of ompi_osc_pt2pt_frag_t structures */
6569
opal_free_list_t frags;
6670

@@ -76,6 +80,12 @@ struct ompi_osc_pt2pt_component_t {
7680
/** List of operations that need to be processed */
7781
opal_list_t pending_operations;
7882

83+
/** List of receives to be processed */
84+
opal_list_t pending_receives;
85+
86+
/** Lock for pending_receives */
87+
opal_mutex_t pending_receives_lock;
88+
7989
/** Is the progress function enabled? */
8090
bool progress_enable;
8191
};
@@ -192,8 +202,11 @@ struct ompi_osc_pt2pt_module_t {
192202
/** origin side list of locks currently outstanding */
193203
opal_hash_table_t outstanding_locks;
194204

195-
unsigned char *incoming_buffer;
196-
ompi_request_t *frag_request;
205+
/** receive fragments */
206+
struct ompi_osc_pt2pt_receive_t *recv_frags;
207+
208+
/** number of receive fragments */
209+
int recv_frag_count;
197210

198211
/* enforce accumulate semantics */
199212
opal_atomic_lock_t accumulate_lock;
@@ -243,6 +256,15 @@ struct ompi_osc_pt2pt_pending_t {
243256
typedef struct ompi_osc_pt2pt_pending_t ompi_osc_pt2pt_pending_t;
244257
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_pending_t);
245258

259+
struct ompi_osc_pt2pt_receive_t {
260+
opal_list_item_t super;
261+
ompi_osc_pt2pt_module_t *module;
262+
ompi_request_t *pml_request;
263+
void *buffer;
264+
};
265+
typedef struct ompi_osc_pt2pt_receive_t ompi_osc_pt2pt_receive_t;
266+
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_receive_t);
267+
246268
#define GET_MODULE(win) ((ompi_osc_pt2pt_module_t*) win->w_osc_module)
247269

248270
extern bool ompi_osc_pt2pt_no_locks;

ompi/mca/osc/pt2pt/osc_pt2pt_component.c

Lines changed: 69 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -142,44 +142,62 @@ static int component_register (void)
142142
NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
143143
&mca_osc_pt2pt_component.buffer_size);
144144

145+
mca_osc_pt2pt_component.receive_count = 4;
146+
(void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "receive_count",
147+
"Number of receives to post for each window for incoming fragments "
148+
"(default: 4)", MCA_BASE_VAR_TYPE_INT, NULL, 0, 0, OPAL_INFO_LVL_4,
149+
MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_pt2pt_component.receive_count);
150+
145151
return OMPI_SUCCESS;
146152
}
147153

148154
static int component_progress (void)
149155
{
150-
int count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
156+
int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
157+
int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
151158
ompi_osc_pt2pt_pending_t *pending, *next;
152159

153-
if (0 == count) {
154-
return 0;
160+
if (recv_count) {
161+
for (int i = 0 ; i < recv_count ; ++i) {
162+
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
163+
ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) opal_list_remove_first (&mca_osc_pt2pt_component.pending_receives);
164+
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
165+
if (NULL == recv) {
166+
break;
167+
}
168+
169+
(void) ompi_osc_pt2pt_process_receive (recv);
170+
}
155171
}
156172

157173
/* process one incoming request */
158-
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
159-
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
160-
int ret;
161-
162-
switch (pending->header.base.type) {
163-
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
164-
ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
165-
&pending->header.flush);
166-
break;
167-
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
168-
ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
169-
&pending->header.unlock);
170-
break;
171-
default:
172-
/* shouldn't happen */
173-
assert (0);
174-
abort ();
175-
}
176-
177-
if (OMPI_SUCCESS == ret) {
178-
opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
179-
OBJ_RELEASE(pending);
180-
}
174+
if (pending_count) {
175+
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
176+
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
177+
int ret;
178+
179+
switch (pending->header.base.type) {
180+
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
181+
ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
182+
&pending->header.flush);
183+
break;
184+
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
185+
ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
186+
&pending->header.unlock);
187+
break;
188+
default:
189+
/* shouldn't happen */
190+
assert (0);
191+
abort ();
192+
}
193+
194+
if (OMPI_SUCCESS == ret) {
195+
opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
196+
OBJ_RELEASE(pending);
197+
}
198+
}
199+
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
181200
}
182-
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
183201

184202
return 1;
185203
}
@@ -193,6 +211,8 @@ component_init(bool enable_progress_threads,
193211
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
194212
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
195213
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
214+
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives, opal_list_t);
215+
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives_lock, opal_mutex_t);
196216

197217
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
198218
opal_hash_table_t);
@@ -253,6 +273,8 @@ component_finalize(void)
253273
OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
254274
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
255275
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
276+
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives);
277+
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives_lock);
256278

257279
return OMPI_SUCCESS;
258280
}
@@ -385,11 +407,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
385407
/* sync memory - make sure all initialization completed */
386408
opal_atomic_mb();
387409

388-
module->incoming_buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
389-
if (OPAL_UNLIKELY(NULL == module->incoming_buffer)) {
390-
goto cleanup;
391-
}
392-
393410
ret = ompi_osc_pt2pt_frag_start_receive (module);
394411
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
395412
goto cleanup;
@@ -449,6 +466,26 @@ ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct ompi_info_t **info_used)
449466

450467
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
451468

469+
void ompi_osc_pt2pt_receive_construct (ompi_osc_pt2pt_receive_t *recv)
470+
{
471+
recv->buffer = NULL;
472+
recv->pml_request = NULL;
473+
}
474+
475+
void ompi_osc_pt2pt_receive_destruct (ompi_osc_pt2pt_receive_t *recv)
476+
{
477+
free (recv->buffer);
478+
if (recv->pml_request && MPI_REQUEST_NULL != recv->pml_request) {
479+
recv->pml_request->req_complete_cb = NULL;
480+
ompi_request_cancel (recv->pml_request);
481+
ompi_request_free (&recv->pml_request);
482+
}
483+
}
484+
485+
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_receive_t, opal_list_item_t,
486+
ompi_osc_pt2pt_receive_construct,
487+
ompi_osc_pt2pt_receive_destruct);
488+
452489
static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
453490
{
454491
OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);

ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c

Lines changed: 63 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1616,11 +1616,38 @@ static inline int process_frag (ompi_osc_pt2pt_module_t *module,
16161616
/* dispatch for callback on message completion */
16171617
static int ompi_osc_pt2pt_callback (ompi_request_t *request)
16181618
{
1619-
ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) request->req_complete_cb_data;
1620-
ompi_osc_pt2pt_header_t *base_header =
1621-
(ompi_osc_pt2pt_header_t *) module->incoming_buffer;
1622-
size_t incoming_length = request->req_status._ucount;
1623-
int source = request->req_status.MPI_SOURCE;
1619+
ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) request->req_complete_cb_data;
1620+
1621+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output, "received pt2pt fragment"));
1622+
1623+
/* to avoid deep recursion from complet -> start -> complete -> ... we simply put this
1624+
* request on a list and let it be processed by opal_progress(). */
1625+
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1626+
opal_list_append (&mca_osc_pt2pt_component.pending_receives, &recv->super);
1627+
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
1628+
1629+
return OMPI_SUCCESS;
1630+
}
1631+
1632+
static int ompi_osc_pt2pt_receive_repost (ompi_osc_pt2pt_receive_t *recv)
1633+
{
1634+
/* wait until the request has been marked as complete */
1635+
ompi_request_wait_completion (recv->pml_request);
1636+
1637+
/* ompi_request_complete clears the callback */
1638+
recv->pml_request->req_complete_cb = ompi_osc_pt2pt_callback;
1639+
recv->pml_request->req_complete_cb_data = (void *) recv;
1640+
1641+
return MCA_PML_CALL(start(1, &recv->pml_request));
1642+
}
1643+
1644+
int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv)
1645+
{
1646+
ompi_osc_pt2pt_module_t *module = (ompi_osc_pt2pt_module_t *) recv->module;
1647+
ompi_osc_pt2pt_header_t *base_header = (ompi_osc_pt2pt_header_t *) recv->buffer;
1648+
size_t incoming_length = recv->pml_request->req_status._ucount;
1649+
int source = recv->pml_request->req_status.MPI_SOURCE;
1650+
int rc;
16241651

16251652
assert(incoming_length >= sizeof(ompi_osc_pt2pt_header_base_t));
16261653
(void)incoming_length; // silence compiler warning
@@ -1661,23 +1688,45 @@ static int ompi_osc_pt2pt_callback (ompi_request_t *request)
16611688

16621689
osc_pt2pt_gc_clean (module);
16631690

1664-
ompi_osc_pt2pt_frag_start_receive (module);
1665-
1666-
/* put this request on the garbage colletion list */
1667-
osc_pt2pt_gc_add_request (module, request);
1691+
rc = ompi_osc_pt2pt_receive_repost (recv);
16681692

16691693
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
1670-
"finished posting receive request"));
1694+
"finished posting receive request. rc: %d", rc));
16711695

16721696
return OMPI_SUCCESS;
16731697
}
16741698

16751699
int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module)
16761700
{
1677-
module->frag_request = MPI_REQUEST_NULL;
1678-
return ompi_osc_pt2pt_irecv_w_cb (module->incoming_buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
1679-
MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->frag_request,
1680-
ompi_osc_pt2pt_callback, module);
1701+
int rc;
1702+
1703+
module->recv_frag_count = mca_osc_pt2pt_component.receive_count;
1704+
if (module->recv_frag_count < 0) {
1705+
module->recv_frag_count = 1;
1706+
}
1707+
1708+
module->recv_frags = malloc (sizeof (module->recv_frags[0]) * module->recv_frag_count);
1709+
if (NULL == module->recv_frags) {
1710+
return OMPI_ERR_OUT_OF_RESOURCE;
1711+
}
1712+
1713+
for (int i = 0 ; i < module->recv_frag_count ; ++i) {
1714+
OBJ_CONSTRUCT(module->recv_frags + i, ompi_osc_pt2pt_receive_t);
1715+
module->recv_frags[i].module = module;
1716+
module->recv_frags[i].buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
1717+
if (NULL == module->recv_frags[i].buffer) {
1718+
return OMPI_ERR_OUT_OF_RESOURCE;
1719+
}
1720+
1721+
rc = ompi_osc_pt2pt_irecv_w_cb (module->recv_frags[i].buffer, mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t),
1722+
MPI_BYTE, OMPI_ANY_SOURCE, OSC_PT2PT_FRAG_TAG, module->comm, &module->recv_frags[i].pml_request,
1723+
ompi_osc_pt2pt_callback, module->recv_frags + i);
1724+
if (OMPI_SUCCESS != rc) {
1725+
return rc;
1726+
}
1727+
}
1728+
1729+
return OMPI_SUCCESS;
16811730
}
16821731

16831732
int ompi_osc_pt2pt_component_irecv (ompi_osc_pt2pt_module_t *module, void *buf,

ompi/mca/osc/pt2pt/osc_pt2pt_data_move.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,4 +144,16 @@ void ompi_osc_pt2pt_process_flush_ack (ompi_osc_pt2pt_module_t *module, int sour
144144
*/
145145
int ompi_osc_pt2pt_frag_start_receive (ompi_osc_pt2pt_module_t *module);
146146

147+
/**
148+
* ompi_osc_pt2pt_process_receive:
149+
*
150+
* @short Report a receive request
151+
*
152+
* @param[in] recv - Receive structure
153+
*
154+
* @long This function reposts a receive request. This function should not be called from
155+
* a pml request callback as it can lead to deep recursion during heavy load.
156+
*/
157+
int ompi_osc_pt2pt_process_receive (ompi_osc_pt2pt_receive_t *recv);
158+
147159
#endif

ompi/mca/osc/pt2pt/osc_pt2pt_module.c

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,18 +93,20 @@ int ompi_osc_pt2pt_free(ompi_win_t *win)
9393
OBJ_DESTRUCT(&module->peer_hash);
9494
OBJ_DESTRUCT(&module->peer_lock);
9595

96-
if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count);
96+
if (NULL != module->recv_frags) {
97+
for (int i = 0 ; i < module->recv_frag_count ; ++i) {
98+
OBJ_DESTRUCT(module->recv_frags + i);
99+
}
97100

98-
if (NULL != module->frag_request && MPI_REQUEST_NULL != module->frag_request) {
99-
module->frag_request->req_complete_cb = NULL;
100-
ompi_request_cancel (module->frag_request);
101-
ompi_request_free (&module->frag_request);
101+
free (module->recv_frags);
102102
}
103103

104+
if (NULL != module->epoch_outgoing_frag_count) free(module->epoch_outgoing_frag_count);
105+
104106
if (NULL != module->comm) {
105107
ompi_comm_free(&module->comm);
106108
}
107-
if (NULL != module->incoming_buffer) free (module->incoming_buffer);
109+
108110
if (NULL != module->free_after) free(module->free_after);
109111

110112
free (module);

ompi/mca/pml/cm/pml_cm_start.c

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ mca_pml_cm_start(size_t count, ompi_request_t** requests)
4242
continue;
4343
}
4444

45-
if (OMPI_REQUEST_ACTIVE == pml_request->req_ompi.req_state) {
46-
return OMPI_ERR_REQUEST;
47-
}
48-
4945
/* start the request */
5046
switch (pml_request->req_pml_type) {
5147
case MCA_PML_CM_REQUEST_SEND_HEAVY:

ompi/mca/pml/ob1/pml_ob1_start.c

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,6 @@ int mca_pml_ob1_start(size_t count, ompi_request_t** requests)
4949
opal_atomic_rmb();
5050
#endif
5151

52-
if (OMPI_REQUEST_ACTIVE == pml_request->req_ompi.req_state) {
53-
return OMPI_ERR_REQUEST;
54-
}
55-
5652
/* start the request */
5753
switch(pml_request->req_type) {
5854
case MCA_PML_REQUEST_SEND:

0 commit comments

Comments
 (0)