Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 0958bc4

Browse files
authored
Merge pull request #1321 from hjelmn/v2.x_osc_pt2pt_fixes_2s
v2.x osc/pt2pt fixes
2 parents 0ead34e + efe274b commit 0958bc4

File tree

11 files changed

+292
-105
lines changed

11 files changed

+292
-105
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 88 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
* reserved.
1313
* Copyright (c) 2010 Cisco Systems, Inc. All rights reserved.
1414
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
15-
* Copyright (c) 2015 Research Organization for Information Science
15+
* Copyright (c) 2015-2016 Research Organization for Information Science
1616
* and Technology (RIST). All rights reserved.
1717
* Copyright (c) 2016 FUJITSU LIMITED. All rights reserved.
1818
* $COPYRIGHT$
@@ -47,6 +47,7 @@
4747
BEGIN_C_DECLS
4848

4949
struct ompi_osc_pt2pt_frag_t;
50+
struct ompi_osc_pt2pt_receive_t;
5051

5152
struct ompi_osc_pt2pt_component_t {
5253
/** Extend the basic osc component interface */
@@ -61,6 +62,9 @@ struct ompi_osc_pt2pt_component_t {
6162
/** module count */
6263
int module_count;
6364

65+
/** number of buffers per window */
66+
int receive_count;
67+
6468
/** free list of ompi_osc_pt2pt_frag_t structures */
6569
opal_free_list_t frags;
6670

@@ -76,11 +80,26 @@ struct ompi_osc_pt2pt_component_t {
7680
/** List of operations that need to be processed */
7781
opal_list_t pending_operations;
7882

83+
/** List of receives to be processed */
84+
opal_list_t pending_receives;
85+
86+
/** Lock for pending_receives */
87+
opal_mutex_t pending_receives_lock;
88+
7989
/** Is the progress function enabled? */
8090
bool progress_enable;
8191
};
8292
typedef struct ompi_osc_pt2pt_component_t ompi_osc_pt2pt_component_t;
8393

94+
enum {
95+
/** peer has sent an unexpected post message (no matching start) */
96+
OMPI_OSC_PT2PT_PEER_FLAG_UNEX = 1,
97+
/** eager sends are active on this peer */
98+
OMPI_OSC_PT2PT_PEER_FLAG_EAGER = 2,
99+
/** peer has been locked (on-demand locking for lock_all) */
100+
OMPI_OSC_PT2PT_PEER_FLAG_LOCK = 4,
101+
};
102+
84103

85104
struct ompi_osc_pt2pt_peer_t {
86105
/** make this an opal object */
@@ -101,13 +120,54 @@ struct ompi_osc_pt2pt_peer_t {
101120
/** number of fragments incomming (negative - expected, positive - unsynchronized) */
102121
int32_t passive_incoming_frag_count;
103122

104-
/** unexpected post message arrived */
105-
bool unexpected_post;
123+
/** peer flags */
124+
int32_t flags;
106125
};
107126
typedef struct ompi_osc_pt2pt_peer_t ompi_osc_pt2pt_peer_t;
108127

109128
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
110129

130+
static inline bool ompi_osc_pt2pt_peer_locked (ompi_osc_pt2pt_peer_t *peer)
131+
{
132+
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_LOCK);
133+
}
134+
135+
static inline bool ompi_osc_pt2pt_peer_unex (ompi_osc_pt2pt_peer_t *peer)
136+
{
137+
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_UNEX);
138+
}
139+
140+
static inline bool ompi_osc_pt2pt_peer_eager_active (ompi_osc_pt2pt_peer_t *peer)
141+
{
142+
return !!(peer->flags & OMPI_OSC_PT2PT_PEER_FLAG_EAGER);
143+
}
144+
145+
static inline void ompi_osc_pt2pt_peer_set_flag (ompi_osc_pt2pt_peer_t *peer, int32_t flag, bool value)
146+
{
147+
if (value) {
148+
peer->flags |= flag;
149+
} else {
150+
peer->flags &= ~flag;
151+
}
152+
}
153+
154+
static inline void ompi_osc_pt2pt_peer_set_locked (ompi_osc_pt2pt_peer_t *peer, bool value)
155+
{
156+
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_LOCK, value);
157+
}
158+
159+
static inline void ompi_osc_pt2pt_peer_set_unex (ompi_osc_pt2pt_peer_t *peer, bool value)
160+
{
161+
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_UNEX, value);
162+
}
163+
164+
static inline void ompi_osc_pt2pt_peer_set_eager_active (ompi_osc_pt2pt_peer_t *peer, bool value)
165+
{
166+
ompi_osc_pt2pt_peer_set_flag (peer, OMPI_OSC_PT2PT_PEER_FLAG_EAGER, value);
167+
}
168+
169+
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_peer_t);
170+
111171
/** Module structure. Exactly one of these is associated with each
112172
PT2PT window */
113173
struct ompi_osc_pt2pt_module_t {
@@ -192,8 +252,11 @@ struct ompi_osc_pt2pt_module_t {
192252
/** origin side list of locks currently outstanding */
193253
opal_hash_table_t outstanding_locks;
194254

195-
unsigned char *incoming_buffer;
196-
ompi_request_t *frag_request;
255+
/** receive fragments */
256+
struct ompi_osc_pt2pt_receive_t *recv_frags;
257+
258+
/** number of receive fragments */
259+
unsigned int recv_frag_count;
197260

198261
/* enforce accumulate semantics */
199262
opal_atomic_lock_t accumulate_lock;
@@ -243,6 +306,15 @@ struct ompi_osc_pt2pt_pending_t {
243306
typedef struct ompi_osc_pt2pt_pending_t ompi_osc_pt2pt_pending_t;
244307
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_pending_t);
245308

309+
struct ompi_osc_pt2pt_receive_t {
310+
opal_list_item_t super;
311+
ompi_osc_pt2pt_module_t *module;
312+
ompi_request_t *pml_request;
313+
void *buffer;
314+
};
315+
typedef struct ompi_osc_pt2pt_receive_t ompi_osc_pt2pt_receive_t;
316+
OBJ_CLASS_DECLARATION(ompi_osc_pt2pt_receive_t);
317+
246318
#define GET_MODULE(win) ((ompi_osc_pt2pt_module_t*) win->w_osc_module)
247319

248320
extern bool ompi_osc_pt2pt_no_locks;
@@ -409,6 +481,8 @@ int ompi_osc_pt2pt_component_irecv(ompi_osc_pt2pt_module_t *module,
409481
int tag,
410482
struct ompi_communicator_t *comm);
411483

484+
int ompi_osc_pt2pt_lock_remote (ompi_osc_pt2pt_module_t *module, int target, ompi_osc_pt2pt_sync_t *lock);
485+
412486
/**
413487
* ompi_osc_pt2pt_progress_pending_acc:
414488
*
@@ -823,6 +897,12 @@ static inline void ompi_osc_pt2pt_module_lock_remove (struct ompi_osc_pt2pt_modu
823897
static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc_pt2pt_module_t *module, int target,
824898
struct ompi_osc_pt2pt_peer_t **peer)
825899
{
900+
ompi_osc_pt2pt_peer_t *tmp;
901+
902+
if (NULL == peer) {
903+
peer = &tmp;
904+
}
905+
826906
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
827907
"osc/pt2pt: looking for synchronization object for target %d", target));
828908

@@ -840,8 +920,9 @@ static inline ompi_osc_pt2pt_sync_t *ompi_osc_pt2pt_module_sync_lookup (ompi_osc
840920

841921
/* fence epoch is now active */
842922
module->all_sync.epoch_active = true;
843-
if (peer) {
844-
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
923+
*peer = ompi_osc_pt2pt_peer_lookup (module, target);
924+
if (OMPI_OSC_PT2PT_SYNC_TYPE_LOCK == module->all_sync.type && !ompi_osc_pt2pt_peer_locked (*peer)) {
925+
(void) ompi_osc_pt2pt_lock_remote (module, target, &module->all_sync);
845926
}
846927

847928
return &module->all_sync;

ompi/mca/osc/pt2pt/osc_pt2pt_active_target.c

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,13 +261,13 @@ int ompi_osc_pt2pt_start (ompi_group_t *group, int assert, ompi_win_t *win)
261261
for (int i = 0 ; i < sync->num_peers ; ++i) {
262262
ompi_osc_pt2pt_peer_t *peer = sync->peer_list.peers[i];
263263

264-
if (peer->unexpected_post) {
264+
if (ompi_osc_pt2pt_peer_unex (peer)) {
265265
/* the peer already sent a post message for this pscw access epoch */
266266
OPAL_OUTPUT_VERBOSE((50, ompi_osc_base_framework.framework_output,
267267
"found unexpected post from %d",
268268
peer->rank));
269269
OPAL_THREAD_ADD32 (&sync->sync_expected, -1);
270-
peer->unexpected_post = false;
270+
ompi_osc_pt2pt_peer_set_unex (peer, false);
271271
}
272272
}
273273
OPAL_THREAD_UNLOCK(&sync->lock);
@@ -600,7 +600,7 @@ void osc_pt2pt_incoming_post (ompi_osc_pt2pt_module_t *module, int source)
600600
"received unexpected post message from %d for future PSCW synchronization",
601601
source));
602602

603-
peer->unexpected_post = true;
603+
ompi_osc_pt2pt_peer_set_unex (peer, true);
604604
OPAL_THREAD_UNLOCK(&sync->lock);
605605
} else {
606606
OPAL_THREAD_UNLOCK(&sync->lock);

ompi/mca/osc/pt2pt/osc_pt2pt_component.c

Lines changed: 71 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
* Copyright (c) 2006-2008 University of Houston. All rights reserved.
1515
* Copyright (c) 2010 Oracle and/or its affiliates. All rights reserved.
1616
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
17-
* Copyright (c) 2015 Research Organization for Information Science
17+
* Copyright (c) 2015-2016 Research Organization for Information Science
1818
* and Technology (RIST). All rights reserved.
1919
* $COPYRIGHT$
2020
*
@@ -142,44 +142,62 @@ static int component_register (void)
142142
NULL, 0, 0, OPAL_INFO_LVL_9, MCA_BASE_VAR_SCOPE_READONLY,
143143
&mca_osc_pt2pt_component.buffer_size);
144144

145+
mca_osc_pt2pt_component.receive_count = 4;
146+
(void) mca_base_component_var_register (&mca_osc_pt2pt_component.super.osc_version, "receive_count",
147+
"Number of receives to post for each window for incoming fragments "
148+
"(default: 4)", MCA_BASE_VAR_TYPE_UNSIGNED_INT, NULL, 0, 0, OPAL_INFO_LVL_4,
149+
MCA_BASE_VAR_SCOPE_READONLY, &mca_osc_pt2pt_component.receive_count);
150+
145151
return OMPI_SUCCESS;
146152
}
147153

148154
static int component_progress (void)
149155
{
150-
int count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
156+
int pending_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_operations);
157+
int recv_count = opal_list_get_size (&mca_osc_pt2pt_component.pending_receives);
151158
ompi_osc_pt2pt_pending_t *pending, *next;
152159

153-
if (0 == count) {
154-
return 0;
160+
if (recv_count) {
161+
for (int i = 0 ; i < recv_count ; ++i) {
162+
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_receives_lock);
163+
ompi_osc_pt2pt_receive_t *recv = (ompi_osc_pt2pt_receive_t *) opal_list_remove_first (&mca_osc_pt2pt_component.pending_receives);
164+
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_receives_lock);
165+
if (NULL == recv) {
166+
break;
167+
}
168+
169+
(void) ompi_osc_pt2pt_process_receive (recv);
170+
}
155171
}
156172

157173
/* process one incoming request */
158-
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
159-
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
160-
int ret;
161-
162-
switch (pending->header.base.type) {
163-
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
164-
ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
165-
&pending->header.flush);
166-
break;
167-
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
168-
ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
169-
&pending->header.unlock);
170-
break;
171-
default:
172-
/* shouldn't happen */
173-
assert (0);
174-
abort ();
175-
}
176-
177-
if (OMPI_SUCCESS == ret) {
178-
opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
179-
OBJ_RELEASE(pending);
180-
}
174+
if (pending_count) {
175+
OPAL_THREAD_LOCK(&mca_osc_pt2pt_component.pending_operations_lock);
176+
OPAL_LIST_FOREACH_SAFE(pending, next, &mca_osc_pt2pt_component.pending_operations, ompi_osc_pt2pt_pending_t) {
177+
int ret;
178+
179+
switch (pending->header.base.type) {
180+
case OMPI_OSC_PT2PT_HDR_TYPE_FLUSH_REQ:
181+
ret = ompi_osc_pt2pt_process_flush (pending->module, pending->source,
182+
&pending->header.flush);
183+
break;
184+
case OMPI_OSC_PT2PT_HDR_TYPE_UNLOCK_REQ:
185+
ret = ompi_osc_pt2pt_process_unlock (pending->module, pending->source,
186+
&pending->header.unlock);
187+
break;
188+
default:
189+
/* shouldn't happen */
190+
assert (0);
191+
abort ();
192+
}
193+
194+
if (OMPI_SUCCESS == ret) {
195+
opal_list_remove_item (&mca_osc_pt2pt_component.pending_operations, &pending->super);
196+
OBJ_RELEASE(pending);
197+
}
198+
}
199+
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
181200
}
182-
OPAL_THREAD_UNLOCK(&mca_osc_pt2pt_component.pending_operations_lock);
183201

184202
return 1;
185203
}
@@ -193,6 +211,8 @@ component_init(bool enable_progress_threads,
193211
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.lock, opal_mutex_t);
194212
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations, opal_list_t);
195213
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_operations_lock, opal_mutex_t);
214+
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives, opal_list_t);
215+
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.pending_receives_lock, opal_mutex_t);
196216

197217
OBJ_CONSTRUCT(&mca_osc_pt2pt_component.modules,
198218
opal_hash_table_t);
@@ -253,6 +273,8 @@ component_finalize(void)
253273
OBJ_DESTRUCT(&mca_osc_pt2pt_component.requests);
254274
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations);
255275
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_operations_lock);
276+
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives);
277+
OBJ_DESTRUCT(&mca_osc_pt2pt_component.pending_receives_lock);
256278

257279
return OMPI_SUCCESS;
258280
}
@@ -385,11 +407,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
385407
/* sync memory - make sure all initialization completed */
386408
opal_atomic_mb();
387409

388-
module->incoming_buffer = malloc (mca_osc_pt2pt_component.buffer_size + sizeof (ompi_osc_pt2pt_frag_header_t));
389-
if (OPAL_UNLIKELY(NULL == module->incoming_buffer)) {
390-
goto cleanup;
391-
}
392-
393410
ret = ompi_osc_pt2pt_frag_start_receive (module);
394411
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
395412
goto cleanup;
@@ -449,13 +466,33 @@ ompi_osc_pt2pt_get_info(struct ompi_win_t *win, struct ompi_info_t **info_used)
449466

450467
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_pending_t, opal_list_item_t, NULL, NULL);
451468

469+
static void ompi_osc_pt2pt_receive_construct (ompi_osc_pt2pt_receive_t *recv)
470+
{
471+
recv->buffer = NULL;
472+
recv->pml_request = NULL;
473+
}
474+
475+
static void ompi_osc_pt2pt_receive_destruct (ompi_osc_pt2pt_receive_t *recv)
476+
{
477+
free (recv->buffer);
478+
if (recv->pml_request && MPI_REQUEST_NULL != recv->pml_request) {
479+
recv->pml_request->req_complete_cb = NULL;
480+
ompi_request_cancel (recv->pml_request);
481+
ompi_request_free (&recv->pml_request);
482+
}
483+
}
484+
485+
OBJ_CLASS_INSTANCE(ompi_osc_pt2pt_receive_t, opal_list_item_t,
486+
ompi_osc_pt2pt_receive_construct,
487+
ompi_osc_pt2pt_receive_destruct);
488+
452489
static void ompi_osc_pt2pt_peer_construct (ompi_osc_pt2pt_peer_t *peer)
453490
{
454491
OBJ_CONSTRUCT(&peer->queued_frags, opal_list_t);
455492
OBJ_CONSTRUCT(&peer->lock, opal_mutex_t);
456493
peer->active_frag = NULL;
457494
peer->passive_incoming_frag_count = 0;
458-
peer->unexpected_post = false;
495+
peer->flags = 0;
459496
}
460497

461498
static void ompi_osc_pt2pt_peer_destruct (ompi_osc_pt2pt_peer_t *peer)

0 commit comments

Comments
 (0)