Skip to content

Commit 0195d15

Browse files
committed
osc/pt2pt: flush pending fragments on lock ack
This commit addresses an issue that can occur in cases where a lot of fragments are outstanding. Signed-off-by: Nathan Hjelm <[email protected]>
1 parent 79540fe commit 0195d15

File tree

3 files changed

+54
-7
lines changed

3 files changed

+54
-7
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt_frag.c

Lines changed: 31 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -117,16 +117,12 @@ static int ompi_osc_pt2pt_flush_active_frag (ompi_osc_pt2pt_module_t *module, om
117117
return ret;
118118
}
119119

120-
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
120+
int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target)
121121
{
122122
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
123123
ompi_osc_pt2pt_frag_t *frag;
124124
int ret = OMPI_SUCCESS;
125125

126-
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
127-
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
128-
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
129-
130126
/* walk through the pending list and send */
131127
OPAL_THREAD_LOCK(&peer->lock);
132128
while (NULL != (frag = ((ompi_osc_pt2pt_frag_t *) opal_list_remove_first (&peer->queued_frags)))) {
@@ -137,11 +133,40 @@ int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int targe
137133
}
138134
OPAL_THREAD_UNLOCK(&peer->lock);
139135

140-
/* XXX -- TODO -- better error handling */
136+
return ret;
137+
}
138+
139+
int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module)
140+
{
141+
int ret;
142+
143+
for (int i = 0 ; i < ompi_comm_size (module->comm) ; ++i) {
144+
ret = ompi_osc_pt2pt_frag_flush_pending (module, i);
145+
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
146+
return ret;
147+
}
148+
}
149+
150+
return ret;
151+
}
152+
153+
int ompi_osc_pt2pt_frag_flush_target (ompi_osc_pt2pt_module_t *module, int target)
154+
{
155+
ompi_osc_pt2pt_peer_t *peer = ompi_osc_pt2pt_peer_lookup (module, target);
156+
ompi_osc_pt2pt_frag_t *frag;
157+
int ret = OMPI_SUCCESS;
158+
159+
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
160+
"osc pt2pt: frag flush to target target %d. queue fragments: %lu",
161+
target, (unsigned long) opal_list_get_size (&peer->queued_frags)));
162+
163+
ret = ompi_osc_pt2pt_frag_flush_pending (module, target);
141164
if (OMPI_SUCCESS != ret) {
165+
/* XXX -- TODO -- better error handling */
142166
return ret;
143167
}
144168

169+
145170
/* flush the active frag */
146171
ret = ompi_osc_pt2pt_flush_active_frag (module, peer);
147172

ompi/mca/osc/pt2pt/osc_pt2pt_frag.h

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_frag_t);
4444
int ompi_osc_pt2pt_frag_start(ompi_osc_pt2pt_module_t *module, ompi_osc_pt2pt_frag_t *buffer);
4545
int ompi_osc_pt2pt_frag_flush_target(ompi_osc_pt2pt_module_t *module, int target);
4646
int ompi_osc_pt2pt_frag_flush_all(ompi_osc_pt2pt_module_t *module);
47+
int ompi_osc_pt2pt_frag_flush_pending (ompi_osc_pt2pt_module_t *module, int target);
48+
int ompi_osc_pt2pt_frag_flush_pending_all (ompi_osc_pt2pt_module_t *module);
4749

4850
static inline int ompi_osc_pt2pt_frag_finish (ompi_osc_pt2pt_module_t *module,
4951
ompi_osc_pt2pt_frag_t* buffer)
@@ -107,7 +109,7 @@ static inline ompi_osc_pt2pt_frag_t *ompi_osc_pt2pt_frag_alloc_non_buffered (omp
107109
* soon as it is sent. this allows request-based rma fragments to be completed
108110
* so MPI_Test/MPI_Wait/etc will work as expected.
109111
*/
110-
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
112+
static inline int _ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
111113
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
112114
char **ptr, bool long_send, bool buffered)
113115
{
@@ -164,4 +166,23 @@ static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, in
164166
return OMPI_SUCCESS;
165167
}
166168

169+
static inline int ompi_osc_pt2pt_frag_alloc (ompi_osc_pt2pt_module_t *module, int target,
170+
size_t request_len, ompi_osc_pt2pt_frag_t **buffer,
171+
char **ptr, bool long_send, bool buffered)
172+
{
173+
int ret;
174+
175+
do {
176+
ret = ompi_osc_pt2pt_frag_alloc (module, target, request_len , buffer, ptr, long_send, buffered);
177+
if (OPAL_LIKELY(OMPI_SUCCESS == ret || OMPI_ERR_OUT_OF_RESOURCE != ret)) {
178+
break;
179+
}
180+
181+
ompi_osc_pt2pt_frag_flush_pending_all (module);
182+
opal_progress ();
183+
} while (1);
184+
185+
return ret;
186+
}
187+
167188
#endif

ompi/mca/osc/pt2pt/osc_pt2pt_passive_target.c

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -819,6 +819,7 @@ void ompi_osc_pt2pt_process_lock_ack (ompi_osc_pt2pt_module_t *module,
819819
assert (NULL != lock);
820820

821821
ompi_osc_pt2pt_peer_set_eager_active (peer, true);
822+
ompi_osc_pt2pt_frag_flush_pending (module, peer->rank);
822823

823824
ompi_osc_pt2pt_sync_expected (lock);
824825
}

0 commit comments

Comments
 (0)