Skip to content

Commit 0c8c7e5

Browse files
authored
Merge pull request #3682 from hjelmn/comm_assertions
ompi: add support for new communicator info assertions
2 parents 70107b3 + db2204f commit 0c8c7e5

File tree

7 files changed

+138
-19
lines changed

7 files changed

+138
-19
lines changed

ompi/communicator/comm_init.c

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -444,3 +444,40 @@ static void ompi_comm_destruct(ompi_communicator_t* comm)
444444
comm->c_f_to_c_index, NULL);
445445
}
446446
}
447+
448+
#define OMPI_COMM_SET_INFO_FN(name, flag) \
449+
static char *ompi_comm_set_ ## name (opal_infosubscriber_t *obj, char *key, char *value) \
450+
{ \
451+
ompi_communicator_t *comm = (ompi_communicator_t *) obj; \
452+
\
453+
if (opal_str_to_bool(value)) { \
454+
comm->c_assertions |= flag; \
455+
} else { \
456+
comm->c_assertions &= ~flag; \
457+
} \
458+
\
459+
return OMPI_COMM_CHECK_ASSERT(comm, flag) ? "true" : "false"; \
460+
}
461+
462+
OMPI_COMM_SET_INFO_FN(no_any_source, OMPI_COMM_ASSERT_NO_ANY_SOURCE)
463+
OMPI_COMM_SET_INFO_FN(no_any_tag, OMPI_COMM_ASSERT_NO_ANY_TAG)
464+
OMPI_COMM_SET_INFO_FN(allow_overtake, OMPI_COMM_ASSERT_ALLOW_OVERTAKE)
465+
OMPI_COMM_SET_INFO_FN(exact_length, OMPI_COMM_ASSERT_EXACT_LENGTH)
466+
467+
void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag)
468+
{
469+
switch (assert_flag) {
470+
case OMPI_COMM_ASSERT_NO_ANY_SOURCE:
471+
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_source", "false", ompi_comm_set_no_any_source);
472+
break;
473+
case OMPI_COMM_ASSERT_NO_ANY_TAG:
474+
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_no_any_tag", "false", ompi_comm_set_no_any_tag);
475+
break;
476+
case OMPI_COMM_ASSERT_ALLOW_OVERTAKE:
477+
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_allow_overtaking", "false", ompi_comm_set_allow_overtake);
478+
break;
479+
case OMPI_COMM_ASSERT_EXACT_LENGTH:
480+
opal_infosubscribe_subscribe (&comm->super, "mpi_assert_exact_length", "false", ompi_comm_set_exact_length);
481+
break;
482+
}
483+
}

ompi/communicator/communicator.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,17 @@ OMPI_DECLSPEC OBJ_CLASS_DECLARATION(ompi_communicator_t);
9090
#define OMPI_COMM_BARRIER_TAG -31079
9191
#define OMPI_COMM_ALLREDUCE_TAG -31080
9292

93+
#define OMPI_COMM_ASSERT_NO_ANY_TAG 0x00000001
94+
#define OMPI_COMM_ASSERT_NO_ANY_SOURCE 0x00000002
95+
#define OMPI_COMM_ASSERT_EXACT_LENGTH 0x00000004
96+
#define OMPI_COMM_ASSERT_ALLOW_OVERTAKE 0x00000008
97+
98+
#define OMPI_COMM_CHECK_ASSERT(comm, flag) !!((comm)->c_assertions & flag)
99+
#define OMPI_COMM_CHECK_ASSERT_NO_ANY_TAG(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_TAG)
100+
#define OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE)
101+
#define OMPI_COMM_CHECK_ASSERT_EXACT_LENGTH(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_EXACT_LENGTH)
102+
#define OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm) OMPI_COMM_CHECK_ASSERT(comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE)
103+
93104
/**
94105
* Modes required for acquiring the new comm-id.
95106
* The first (INTER/INTRA) indicates whether the
@@ -126,6 +137,7 @@ struct ompi_communicator_t {
126137
int c_my_rank;
127138
uint32_t c_flags; /* flags, e.g. intercomm,
128139
topology, etc. */
140+
uint32_t c_assertions; /* info assertions */
129141

130142
int c_id_available; /* the currently available Cid for allocation
131143
to a child*/
@@ -697,6 +709,8 @@ extern int ompi_comm_num_dyncomm;
697709
OMPI_DECLSPEC int ompi_comm_cid_init ( void );
698710

699711

712+
void ompi_comm_assert_subscribe (ompi_communicator_t *comm, int32_t assert_flag);
713+
700714
END_C_DECLS
701715

702716
#endif /* OMPI_COMMUNICATOR_H */

ompi/mca/pml/ob1/pml_ob1.c

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,9 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
206206
return OMPI_ERR_OUT_OF_RESOURCE;
207207
}
208208

209+
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_NO_ANY_SOURCE);
210+
ompi_comm_assert_subscribe (comm, OMPI_COMM_ASSERT_ALLOW_OVERTAKE);
211+
209212
mca_pml_ob1_comm_init_size(pml_comm, comm->c_remote_group->grp_proc_count);
210213
comm->c_pml_comm = pml_comm;
211214

@@ -222,6 +225,12 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
222225
* non_existing_communicator_pending list. */
223226
opal_list_remove_item (&mca_pml_ob1.non_existing_communicator_pending,
224227
(opal_list_item_t *) frag);
228+
if (OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
229+
opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag );
230+
PERUSE_TRACE_MSG_EVENT(PERUSE_COMM_MSG_INSERT_IN_UNEX_Q, comm,
231+
hdr->hdr_src, hdr->hdr_tag, PERUSE_RECV);
232+
continue;
233+
}
225234

226235
add_fragment_to_unexpected:
227236

@@ -242,7 +251,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
242251
*/
243252
pml_proc = mca_pml_ob1_peer_lookup(comm, hdr->hdr_src);
244253

245-
if( ((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
254+
if (((uint16_t)hdr->hdr_seq) == ((uint16_t)pml_proc->expected_sequence) ) {
246255
/* We're now expecting the next sequence number. */
247256
pml_proc->expected_sequence++;
248257
opal_list_append( &pml_proc->unexpected_frags, (opal_list_item_t*)frag );
@@ -254,9 +263,7 @@ int mca_pml_ob1_add_comm(ompi_communicator_t* comm)
254263
* situation as the cant_match is only checked when a new fragment is received from
255264
* the network.
256265
*/
257-
for(frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_first(&pml_proc->frags_cant_match);
258-
frag != (mca_pml_ob1_recv_frag_t *)opal_list_get_end(&pml_proc->frags_cant_match);
259-
frag = (mca_pml_ob1_recv_frag_t *)opal_list_get_next(frag)) {
266+
OPAL_LIST_FOREACH(frag, &pml_proc->frags_cant_match, mca_pml_ob1_recv_frag_t) {
260267
hdr = &frag->hdr.hdr_match;
261268
/* If the message has the next expected seq from that proc... */
262269
if(hdr->hdr_seq != pml_proc->expected_sequence)

ompi/mca/pml/ob1/pml_ob1_isend.c

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,16 @@ int mca_pml_ob1_isend(const void *buf,
143143
mca_pml_ob1_send_request_t *sendreq = NULL;
144144
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
145145
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
146-
int16_t seqn;
146+
int16_t seqn = 0;
147147
int rc;
148148

149149
if (OPAL_UNLIKELY(NULL == endpoint)) {
150150
return OMPI_ERR_UNREACH;
151151
}
152152

153-
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
153+
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
154+
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
155+
}
154156

155157
if (MCA_PML_BASE_SEND_SYNCHRONOUS != sendmode) {
156158
rc = mca_pml_ob1_send_inline (buf, count, datatype, dst, tag, seqn, dst_proc,
@@ -196,7 +198,7 @@ int mca_pml_ob1_send(const void *buf,
196198
ompi_proc_t *dst_proc = ob1_proc->ompi_proc;
197199
mca_bml_base_endpoint_t* endpoint = mca_bml_base_get_endpoint (dst_proc);
198200
mca_pml_ob1_send_request_t *sendreq = NULL;
199-
int16_t seqn;
201+
int16_t seqn = 0;
200202
int rc;
201203

202204
if (OPAL_UNLIKELY(NULL == endpoint)) {
@@ -217,7 +219,9 @@ int mca_pml_ob1_send(const void *buf,
217219
return OMPI_SUCCESS;
218220
}
219221

220-
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
222+
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm)) {
223+
seqn = (uint16_t) OPAL_THREAD_ADD32(&ob1_proc->send_sequence, 1);
224+
}
221225

222226
/**
223227
* The immediate send will not have a request, so they are

ompi/mca/pml/ob1/pml_ob1_recvfrag.c

Lines changed: 38 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -163,18 +163,20 @@ void mca_pml_ob1_recv_frag_callback_match(mca_btl_base_module_t* btl,
163163
*/
164164
OB1_MATCHING_LOCK(&comm->matching_lock);
165165

166-
/* get sequence number of next message that can be processed */
167-
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
168-
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
169-
goto slow_path;
170-
}
166+
if (!OMPI_COMM_CHECK_ASSERT_ALLOW_OVERTAKE(comm_ptr)) {
167+
/* get sequence number of next message that can be processed */
168+
if(OPAL_UNLIKELY((((uint16_t) hdr->hdr_seq) != ((uint16_t) proc->expected_sequence)) ||
169+
(opal_list_get_size(&proc->frags_cant_match) > 0 ))) {
170+
goto slow_path;
171+
}
171172

172-
/* This is the sequence number we were expecting, so we can try
173-
* matching it to already posted receives.
174-
*/
173+
/* This is the sequence number we were expecting, so we can try
174+
* matching it to already posted receives.
175+
*/
175176

176-
/* We're now expecting the next sequence number. */
177-
proc->expected_sequence++;
177+
/* We're now expecting the next sequence number. */
178+
proc->expected_sequence++;
179+
}
178180

179181
/* We generate the SEARCH_POSTED_QUEUE only when the message is
180182
* received in the correct sequence. Otherwise, we delay the event
@@ -506,6 +508,27 @@ static mca_pml_ob1_recv_request_t *match_incomming(
506508
return NULL;
507509
}
508510

511+
static mca_pml_ob1_recv_request_t *match_incomming_no_any_source (
512+
mca_pml_ob1_match_hdr_t *hdr, mca_pml_ob1_comm_t *comm,
513+
mca_pml_ob1_comm_proc_t *proc)
514+
{
515+
mca_pml_ob1_recv_request_t *recv_req;
516+
int tag = hdr->hdr_tag;
517+
518+
OPAL_LIST_FOREACH(recv_req, &proc->specific_receives, mca_pml_ob1_recv_request_t) {
519+
int req_tag = recv_req->req_recv.req_base.req_tag;
520+
521+
if (req_tag == tag || (req_tag == OMPI_ANY_TAG && tag >= 0)) {
522+
opal_list_remove_item (&proc->specific_receives, (opal_list_item_t *) recv_req);
523+
PERUSE_TRACE_COMM_EVENT(PERUSE_COMM_REQ_REMOVE_FROM_POSTED_Q,
524+
&(recv_req->req_recv.req_base), PERUSE_RECV);
525+
return recv_req;
526+
}
527+
}
528+
529+
return NULL;
530+
}
531+
509532
static mca_pml_ob1_recv_request_t*
510533
match_one(mca_btl_base_module_t *btl,
511534
mca_pml_ob1_match_hdr_t *hdr, mca_btl_base_segment_t* segments,
@@ -517,7 +540,11 @@ match_one(mca_btl_base_module_t *btl,
517540
mca_pml_ob1_comm_t *comm = (mca_pml_ob1_comm_t *)comm_ptr->c_pml_comm;
518541

519542
do {
520-
match = match_incomming(hdr, comm, proc);
543+
if (!OMPI_COMM_CHECK_ASSERT_NO_ANY_SOURCE (comm_ptr)) {
544+
match = match_incomming(hdr, comm, proc);
545+
} else {
546+
match = match_incomming_no_any_source (hdr, comm, proc);
547+
}
521548

522549
/* if match found, process data */
523550
if(OPAL_LIKELY(NULL != match)) {

ompi/mpi/man/man3/MPI_Comm_dup_with_info.3in

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ MPI_Comm_dup_with_info acts exactly like MPI_Comm_dup except that the
6060
info hints associated with the communicator \fIcomm\fP are not duplicated in \fInewcomm\fP. The
6161
hints provided by the argument \fIinfo\fP are associated with the output communicator \fInewcomm\fP
6262
instead.
63+
.sp
64+
See
65+
.BR MPI_Comm_set_info (3)
66+
for the list of recognized info keys.
6367

6468
.SH NOTES
6569
This operation is used to provide a parallel
@@ -82,3 +86,4 @@ called. By default, this error handler aborts the MPI job, except for I/O functi
8286
.SH SEE ALSO
8387
MPI_Comm_dup
8488
MPI_Comm_idup
89+
MPI_Comm_set_info

ompi/mpi/man/man3/MPI_Comm_set_info.3in

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,31 @@ requires to be the same on all processes must appear with the same
5858
value in each process's
5959
.I info
6060
object.
61+
.sp
62+
The following info key assertions may be accepted by Open MPI:
63+
.sp
64+
\fImpi_assert_no_any_tag\fP (boolean): If set to true, then the
65+
implementation may assume that the process will not use the
66+
MPI_ANY_TAG wildcard on the given
67+
communicator.
68+
.sp
69+
\fImpi_assert_no_any_source\fP (boolean): If set to true, then
70+
the implementation may assume that the process will not use the
71+
MPI_ANY_SOURCE wildcard on the given communicator.
72+
.sp
73+
\fImpi_assert_exact_length\fP (boolean): If set to true, then the
74+
implementation may assume that the lengths of messages received by the
75+
process are equal to the lengths of the corresponding receive buffers,
76+
for point-to-point communication operations on the given communicator.
77+
.sp
78+
\fImpi_assert_allow_overtaking\fP (boolean): If set to true, then the
79+
implementation may assume that point-to-point communications on the
80+
given communicator do not rely on the non-overtaking rule specified in
81+
MPI-3.1 Section 3.5. In other words, the application asserts that send
82+
operations are not required to be matched at the receiver in the order
83+
in which the send operations were performed by the sender, and receive
84+
operations are not required to be matched in the order in which they
85+
were performed by the receiver.
6186
.
6287
.SH ERRORS
6388
Almost all MPI routines return an error value; C routines as the value

0 commit comments

Comments
 (0)