Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ompi/mca/coll/portals4/coll_portals4.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ struct mca_coll_portals4_component_t {
opal_free_list_t requests; /* request free list for the i collectives */

ptl_ni_limits_t ni_limits;
ptl_size_t portals_max_msg_size;

int use_binomial_gather_algorithm;

Expand Down Expand Up @@ -314,7 +315,7 @@ is_reduce_optimizable(struct ompi_datatype_t *dtype, size_t length, struct ompi_
}

*ptl_dtype = ompi_coll_portals4_atomic_datatype[dtype->id];
if (*ptl_dtype == COLL_PORTALS4_NO_DTYPE){
if (*ptl_dtype == COLL_PORTALS4_NO_DTYPE) {
opal_output_verbose(50, ompi_coll_base_framework.framework_output,
"datatype %d not supported\n",
dtype->id);
Expand Down
4 changes: 2 additions & 2 deletions ompi/mca/coll/portals4/coll_portals4_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ allreduce_kary_tree_top(const void *sendbuf, void *recvbuf, int count,
ompi_coll_portals4_get_peer(comm, child[i]),
mca_coll_portals4_component.pt_idx,
match_bits_rtr, 0, NULL, 0)) != PTL_OK)
return opal_stderr("Put RTR failed", __FILE__, __LINE__, ret);
return opal_stderr("Put RTR failed %d", __FILE__, __LINE__, ret);
}
}
}
Expand Down Expand Up @@ -408,7 +408,7 @@ int ompi_coll_portals4_iallreduce_intra(const void* sendbuf, void* recvbuf, int
allreduce_kary_tree_top(sendbuf, recvbuf, count,
dtype, op, comm, request, portals4_module);

puts("iallreduce");
opal_output_verbose(10, ompi_coll_base_framework.framework_output, "iallreduce");
return (OMPI_SUCCESS);
}

Expand Down
26 changes: 24 additions & 2 deletions ompi/mca/coll/portals4/coll_portals4_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,31 @@ barrier_hypercube_top(struct ompi_communicator_t *comm,
}

if (is_sync) {
/* Send a put to self when we've received all our messages... */
ret = PtlCTWait(request->u.barrier.rtr_ct_h, num_msgs, &event);
/* Each process has a pending PtlTriggeredPut. To be sure this request will be triggered, we must
call PtlTriggeredCTInc twice. Otherwise, we could free the CT too early and the Put wouldn't be triggered */

ptl_ct_event_t ct_inc;

ct_inc.success = 1;
ct_inc.failure = 0;

if ((ret = PtlTriggeredCTInc(request->u.barrier.rtr_ct_h, ct_inc,
request->u.barrier.rtr_ct_h, num_msgs)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

if ((ret = PtlTriggeredCTInc(request->u.barrier.rtr_ct_h, ct_inc,
request->u.barrier.rtr_ct_h, num_msgs + 1)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

ret = PtlCTWait(request->u.barrier.rtr_ct_h, num_msgs + 2, &event);
if (PTL_OK != ret) {
opal_output_verbose(1, ompi_coll_base_framework.framework_output,
"%s:%d: PtlCTWait failed: %d\n",
__FILE__, __LINE__, ret);
return OMPI_ERROR;
}
}
else {
/* Send a put to self when we've received all our messages... */
Expand Down
148 changes: 117 additions & 31 deletions ompi/mca/coll/portals4/coll_portals4_bcast.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,12 +89,20 @@ static int prepare_bcast_data (struct ompi_communicator_t *comm,
}

/* Number of segments */
request->u.bcast.segment_nb = (request->u.bcast.tmpsize > COLL_PORTALS4_MAX_BW) ?
(((request->u.bcast.tmpsize + COLL_PORTALS4_MAX_BW -1) / COLL_PORTALS4_MAX_BW) < COLL_PORTALS4_MAX_SEGMENT ?
((request->u.bcast.tmpsize + COLL_PORTALS4_MAX_BW -1) / COLL_PORTALS4_MAX_BW) :
COLL_PORTALS4_MAX_SEGMENT) :
{
size_t max_msg_size = (COLL_PORTALS4_MAX_BW > mca_coll_portals4_component.ni_limits.max_msg_size) ?
mca_coll_portals4_component.ni_limits.max_msg_size :
COLL_PORTALS4_MAX_BW;

//TODO : Either make compatible Portals size limits and COLL_PORTALS4_MAX_SEGMENT or remove COLL_PORTALS4_MAX_SEGMENT
request->u.bcast.segment_nb = (request->u.bcast.tmpsize > max_msg_size) ?
(((request->u.bcast.tmpsize + max_msg_size -1) / max_msg_size) < COLL_PORTALS4_MAX_SEGMENT ?
((request->u.bcast.tmpsize + max_msg_size -1) / max_msg_size) : COLL_PORTALS4_MAX_SEGMENT) :
1;

OPAL_OUTPUT_VERBOSE((10, ompi_coll_base_framework.framework_output,
"seg_number=%d , seg_size_max=%lu", request->u.bcast.segment_nb, max_msg_size));
}
if (request->u.bcast.segment_nb > COLL_PORTALS4_BCAST_ALGO_THRESHOLD) {
request->u.bcast.algo = OMPI_COLL_PORTALS4_BCAST_PIPELINE_ALGO;
}
Expand Down Expand Up @@ -137,9 +145,9 @@ bcast_kary_tree_top(void *buff, int count,
mca_coll_portals4_module_t *portals4_module)
{
bool is_sync = request->is_sync;
int ret, seg;
unsigned int i;
int segment_nb = request->u.bcast.segment_nb;
int ret;
unsigned int i, seg, seg_size, nb_long;
unsigned int segment_nb = request->u.bcast.segment_nb;
unsigned int child_nb;
int size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
Expand Down Expand Up @@ -201,15 +209,22 @@ bcast_kary_tree_top(void *buff, int count,
COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
COLL_PORTALS4_BCAST, 0, internal_count);

/* The data will be cut in segment_nb segments.
* nb_long segments will have a size of (seg_size + 1)
* and (segment_nb - nb_long) segments will have a size of seg_size
*/
seg_size = request->u.bcast.tmpsize / segment_nb;
nb_long = request->u.bcast.tmpsize % segment_nb;
opal_output_verbose(10, ompi_coll_base_framework.framework_output, "seg_size=%d nb_long=%d segment_nb=%d", seg_size, nb_long, segment_nb);

if (rank != root) {
for (seg = 1, offset = 0, length = 0 ;
seg <= segment_nb ;
seg++, offset += length) {

/* Divide buffer into segments */
length = (seg < segment_nb) ?
(request->u.bcast.tmpsize + segment_nb - 1) / segment_nb :
request->u.bcast.tmpsize - ((request->u.bcast.tmpsize + segment_nb - 1) / segment_nb) * (segment_nb - 1);
if (seg <= nb_long) length = seg_size + 1;
else length = seg_size;

/*
** Prepare Data ME
Expand Down Expand Up @@ -352,13 +367,14 @@ bcast_kary_tree_top(void *buff, int count,
seg++, offset += length) {

/* Divide buffer into segments */
length = (seg < segment_nb) ?
(request->u.bcast.tmpsize + segment_nb - 1) / segment_nb :
request->u.bcast.tmpsize - ((request->u.bcast.tmpsize + segment_nb - 1) / segment_nb) * (segment_nb - 1);
if (seg <= nb_long) length = seg_size + 1;
else length = seg_size;
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"bcast with k-ary tree : segment of size %ld", length);

/* compute the triggering threshold to send data to the children */
trig_thr = (rank == root) ? (segment_nb) :
(segment_nb + seg);
trig_thr = segment_nb + seg - 1; /* To be sure the set of PtlTriggeredPut of DATA will be executed in order */
if (rank != root) trig_thr ++;

/*
** Send Data to children
Expand All @@ -381,6 +397,17 @@ bcast_kary_tree_top(void *buff, int count,
}
}

if (rank == root) {
trig_thr = segment_nb;
ct_inc.success = segment_nb;
ct_inc.failure = 0;

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}
}

ack_thr = child_nb;

if (is_sync) {
Expand Down Expand Up @@ -409,9 +436,28 @@ bcast_kary_tree_top(void *buff, int count,
*/

if (rank != root) {
ack_thr = segment_nb;
trig_thr = segment_nb;
if (is_sync) {
if ((ret = PtlCTWait(request->u.bcast.trig_ct_h, ack_thr, &ct)) != 0) {
/* Each leaf has a pending PtlTriggeredPut (to send the final ACK). We must call PtlTriggeredCTInc twice.
Otherwise, we could pass the PtlCTWait and then free the CT too early and the Put wouldn't be triggered.

This is necessary because portals4 does not insure the order in the triggered operations associated
with the same threshold. In the case where PtlCTWait is not called (else case), this is not necessary. */

ct_inc.success = 1;
ct_inc.failure = 0;

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr + 1)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

if ((ret = PtlCTWait(request->u.bcast.trig_ct_h, trig_thr + 2, &ct)) != 0) {
opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
}
}
Expand All @@ -421,7 +467,7 @@ bcast_kary_tree_top(void *buff, int count,
mca_coll_portals4_component.finish_pt_idx,
0, 0, NULL, (uintptr_t) request,
request->u.bcast.trig_ct_h,
ack_thr)) != 0) {
trig_thr)) != 0) {
return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
}

Expand All @@ -440,8 +486,9 @@ bcast_pipeline_top(void *buff, int count,
mca_coll_portals4_module_t *portals4_module)
{
bool is_sync = request->is_sync;
int ret, seg;
int segment_nb = request->u.bcast.segment_nb;
int ret;
unsigned int seg, seg_size, nb_long;
unsigned int segment_nb = request->u.bcast.segment_nb;
int size = ompi_comm_size(comm);
int rank = ompi_comm_rank(comm);
ptl_rank_t parent, child;
Expand Down Expand Up @@ -492,16 +539,22 @@ bcast_pipeline_top(void *buff, int count,

COLL_PORTALS4_SET_BITS(match_bits, ompi_comm_get_cid(comm), 0, 0,
COLL_PORTALS4_BCAST, 0, internal_count);
/* The data will be cut in segment_nb segments.
* nb_long segments will have a size of (seg_size + 1)
* and (segment_nb - nb_long) segments will have a size of seg_size
*/
seg_size = request->u.bcast.tmpsize / segment_nb;
nb_long = request->u.bcast.tmpsize % segment_nb;
opal_output_verbose(10, ompi_coll_base_framework.framework_output, "seg_size=%d nb_long=%d", seg_size, nb_long);

if (rank != root) {
for (seg = 1, offset = 0, length = 0 ;
seg <= segment_nb ;
seg++, offset += length) {

/* Divide buffer into segments */
length = (seg < segment_nb) ?
(request->u.bcast.tmpsize + segment_nb - 1) / segment_nb :
request->u.bcast.tmpsize - ((request->u.bcast.tmpsize + segment_nb - 1) / segment_nb) * (segment_nb - 1);
if (seg <= nb_long) length = seg_size + 1;
else length = seg_size;

/*
** Prepare Data ME
Expand Down Expand Up @@ -642,13 +695,14 @@ bcast_pipeline_top(void *buff, int count,
seg++, offset += length) {

/* Divide buffer into segments */
length = (seg < segment_nb) ?
(request->u.bcast.tmpsize + segment_nb - 1) / segment_nb :
request->u.bcast.tmpsize - ((request->u.bcast.tmpsize + segment_nb - 1) / segment_nb) * (segment_nb - 1);
if (seg <= nb_long) length = seg_size + 1;
else length = seg_size;
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"bcast with pipeline : segment of size %ld \n", length);

/* compute the triggering threshold to send data to the children */
trig_thr = (rank == root) ? (segment_nb) :
(segment_nb + seg);
trig_thr = segment_nb + seg - 1; /* To be sure the PtlTriggeredPut will be executed in order */
if (rank != root) trig_thr ++;

/*
** Send Data to children
Expand All @@ -668,6 +722,16 @@ bcast_pipeline_top(void *buff, int count,
}
}
}
if (rank == root) {
trig_thr = segment_nb;
ct_inc.success = segment_nb;
ct_inc.failure = 0;

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}
}

if (is_sync) {
if ((ret = PtlCTWait(request->u.bcast.ack_ct_h, 1, &ct)) != 0) {
Expand Down Expand Up @@ -696,8 +760,29 @@ bcast_pipeline_top(void *buff, int count,
*/

if (rank != root) {
trig_thr = segment_nb;

if (is_sync) {
if ((ret = PtlCTWait(request->u.bcast.trig_ct_h, segment_nb, &ct)) != 0) {
/* Each leaf has a pending PtlTriggeredPut (to send the final ACK). We must call PtlTriggeredCTInc twice.
Otherwise, we could pass the PtlCTWait and then free the CT too early and the Put wouldn't be triggered.

This is necessary because portals4 does not insure the order in the triggered operations associated
with the same threshold. In the case where PtlCTWait is not called (else case), this is not necessary. */

ct_inc.success = 1;
ct_inc.failure = 0;

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

if ((ret = PtlTriggeredCTInc(request->u.bcast.trig_ct_h, ct_inc,
request->u.bcast.trig_ct_h, trig_thr + 1)) != 0) {
return opal_stderr("PtlTriggeredCTInc failed", __FILE__, __LINE__, ret);
}

if ((ret = PtlCTWait(request->u.bcast.trig_ct_h, trig_thr + 2, &ct)) != 0) {
opal_stderr("PtlCTWait failed", __FILE__, __LINE__, ret);
}
}
Expand All @@ -707,7 +792,7 @@ bcast_pipeline_top(void *buff, int count,
mca_coll_portals4_component.finish_pt_idx,
0, 0, NULL, (uintptr_t) request,
request->u.bcast.trig_ct_h,
segment_nb)) != 0) {
trig_thr)) != 0) {
return opal_stderr("PtlTriggeredPut failed", __FILE__, __LINE__, ret);
}
}
Expand Down Expand Up @@ -831,7 +916,7 @@ ompi_coll_portals4_ibcast_intra(void *buff, int count,
return OMPI_ERROR;
}

puts("ibcast");
opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ibcast_intra");
return (OMPI_SUCCESS);
}

Expand Down Expand Up @@ -860,5 +945,6 @@ ompi_coll_portals4_ibcast_intra_fini(ompi_coll_portals4_request_t *request)
ompi_request_complete(&request->super, true);
OPAL_THREAD_UNLOCK(&ompi_request_lock);

opal_output_verbose(10, ompi_coll_base_framework.framework_output, "ibcast_intra_fini");
return (OMPI_SUCCESS);
}
16 changes: 16 additions & 0 deletions ompi/mca/coll/portals4/coll_portals4_component.c
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,16 @@ portals4_register(void)
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_portals4_component.use_binomial_gather_algorithm);

mca_coll_portals4_component.portals_max_msg_size = PTL_SIZE_MAX;
(void) mca_base_component_var_register(&mca_coll_portals4_component.super.collm_version,
"max_msg_size",
"Max size supported by portals4 (above that, a message is cut into messages less than that size)",
MCA_BASE_VAR_TYPE_UNSIGNED_LONG,
NULL, 0, 0,
OPAL_INFO_LVL_9,
MCA_BASE_VAR_SCOPE_READONLY,
&mca_coll_portals4_component.portals_max_msg_size);

return OMPI_SUCCESS;
}

Expand Down Expand Up @@ -369,7 +379,13 @@ portals4_init_query(bool enable_progress_threads,
__FILE__, __LINE__, ret);
return OMPI_ERROR;
}
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"ni_limits.max_atomic_size=%ld", mca_coll_portals4_component.ni_limits.max_atomic_size);

if (mca_coll_portals4_component.portals_max_msg_size < mca_coll_portals4_component.ni_limits.max_msg_size)
mca_coll_portals4_component.ni_limits.max_msg_size = mca_coll_portals4_component.portals_max_msg_size;
opal_output_verbose(10, ompi_coll_base_framework.framework_output,
"ni_limits.max_msg_size=%lu", mca_coll_portals4_component.ni_limits.max_msg_size);

ret = PtlGetId(mca_coll_portals4_component.ni_h, &mca_coll_portals4_component.id);
if (PTL_OK != ret) {
Expand Down
Loading