Skip to content
This repository was archived by the owner on Sep 30, 2022. It is now read-only.

Commit 8b705e8

Browse files
committed
Merge pull request #1080 from kawashima-fj/pr/v2.x/parallel-rma-fix
v2.x: osc/pt2pt: Fix tag conflicts on parallel RMA communications
2 parents 1dbc692 + 5f6d90f commit 8b705e8

File tree

4 files changed

+84
-53
lines changed

4 files changed

+84
-53
lines changed

ompi/mca/osc/pt2pt/osc_pt2pt.h

Lines changed: 30 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
1515
* Copyright (c) 2015 Research Organization for Information Science
1616
* and Technology (RIST). All rights reserved.
17+
* Copyright (c) 2016 FUJITSU LIMITED. All rights reserved.
1718
* $COPYRIGHT$
1819
*
1920
* Additional copyrights may follow
@@ -150,7 +151,6 @@ struct ompi_osc_pt2pt_module_t {
150151

151152
/** cyclic counter for a unique tage for long messages. */
152153
uint32_t tag_counter;
153-
uint32_t rtag_counter;
154154

155155
/* Number of outgoing fragments that have completed since the
156156
begining of time */
@@ -637,13 +637,15 @@ static inline void osc_pt2pt_add_pending (ompi_osc_pt2pt_pending_t *pending)
637637
/**
638638
* get_tag:
639639
*
640-
* @short Get a send/recv tag for large memory operations.
640+
* @short Get a send/recv base tag for large memory operations.
641641
*
642642
* @param[in] module - OSC PT2PT module
643643
*
644-
* @long This function aquires a 16-bit tag for use with large memory operations. The
644+
* @long This function acquires a 16-bit tag for use with large memory operations. The
645645
* tag will be odd or even depending on if this is in a passive target access
646-
* or not.
646+
* or not. An actual tag that will be passed to PML send/recv function is given
647+
* by tag_to_target or tag_to_origin function depending on the communication
648+
* direction.
647649
*/
648650
static inline int get_tag(ompi_osc_pt2pt_module_t *module)
649651
{
@@ -654,14 +656,32 @@ static inline int get_tag(ompi_osc_pt2pt_module_t *module)
654656
return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
655657
}
656658

657-
static inline int get_rtag(ompi_osc_pt2pt_module_t *module)
659+
/**
660+
* tag_to_target:
661+
*
662+
* @short Get a tag used for PML send/recv communication from an origin to a target.
663+
*
664+
* @param[in] tag - base tag given by get_tag function.
665+
*/
666+
static inline int tag_to_target(int tag)
658667
{
659-
/* the LSB of the tag is used be the receiver to determine if the
660-
message is a passive or active target (ie, where to mark
661-
completion). */
662-
int32_t tmp = OPAL_THREAD_ADD32((volatile int32_t *) &module->rtag_counter, 4);
663-
return (tmp & OSC_PT2PT_FRAG_MASK) | !!(module->passive_target_access_epoch);
668+
/* (returned_tag >> 1) & 0x1 == 0 */
669+
return tag + 0;
664670
}
671+
672+
/**
673+
* tag_to_origin:
674+
*
675+
* @short Get a tag used for PML send/recv communication from a target to an origin.
676+
*
677+
* @param[in] tag - base tag given by get_tag function.
678+
*/
679+
static inline int tag_to_origin(int tag)
680+
{
681+
/* (returned_tag >> 1) & 0x1 == 1 */
682+
return tag + 2;
683+
}
684+
665685
/**
666686
* ompi_osc_pt2pt_accumulate_lock:
667687
*

ompi/mca/osc/pt2pt/osc_pt2pt_comm.c

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
1515
* Copyright (c) 2015 Research Organization for Information Science
1616
* and Technology (RIST). All rights reserved.
17+
* Copyright (c) 2016 FUJITSU LIMITED. All rights reserved.
1718
* $COPYRIGHT$
1819
*
1920
* Additional copyrights may follow
@@ -363,9 +364,9 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
363364

364365
OBJ_RETAIN(target_dt);
365366

366-
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
367-
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
368-
target_dt);
367+
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
368+
target, tag_to_target(tag), module->comm,
369+
ompi_osc_pt2pt_dt_send_complete, target_dt);
369370
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
370371
break;
371372
}
@@ -394,8 +395,8 @@ static inline int ompi_osc_pt2pt_put_w_req (const void *origin_addr, int origin_
394395
header->tag = tag;
395396
osc_pt2pt_hton(header, proc);
396397

397-
ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt, target, tag,
398-
request);
398+
ret = ompi_osc_pt2pt_data_isend (module,origin_addr, origin_count, origin_dt,
399+
target, tag_to_target(tag), request);
399400
}
400401
} while (0);
401402

@@ -491,7 +492,7 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
491492
}
492493

493494
is_long_msg = true;
494-
tag = get_rtag (module);
495+
tag = get_tag (module);
495496
}
496497

497498
if (is_long_msg) {
@@ -519,9 +520,9 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
519520

520521
OBJ_RETAIN(target_dt);
521522

522-
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
523-
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
524-
target_dt);
523+
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
524+
target, tag_to_target(tag), module->comm,
525+
ompi_osc_pt2pt_dt_send_complete, target_dt);
525526
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
526527
break;
527528
}
@@ -553,8 +554,8 @@ ompi_osc_pt2pt_accumulate_w_req (const void *origin_addr, int origin_count,
553554
OPAL_OUTPUT_VERBOSE((25, ompi_osc_base_framework.framework_output,
554555
"acc: starting long accumulate with tag %d", tag));
555556

556-
ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt, target, tag,
557-
request);
557+
ret = ompi_osc_pt2pt_data_isend (module, origin_addr, origin_count, origin_dt,
558+
target, tag_to_target(tag), request);
558559
}
559560
} while (0);
560561

@@ -663,7 +664,8 @@ int ompi_osc_pt2pt_compare_and_swap (const void *origin_addr, const void *compar
663664
osc_pt2pt_copy_for_send (ptr, dt->super.size, compare_addr, proc, 1, dt);
664665

665666
request->outstanding_requests = 1;
666-
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt, target, tag, module->comm,
667+
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, 1, dt,
668+
target, tag_to_origin(tag), module->comm,
667669
NULL, ompi_osc_pt2pt_req_comm_complete, request);
668670
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
669671
return ret;
@@ -786,7 +788,7 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
786788
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
787789
if (OMPI_SUCCESS != ret) {
788790
/* allocate space for the header plus space to store ddt_len */
789-
frag_len = sizeof(ompi_osc_pt2pt_header_put_t) + 8;
791+
frag_len = sizeof(ompi_osc_pt2pt_header_get_t) + 8;
790792
ret = ompi_osc_pt2pt_frag_alloc(module, target, frag_len, &frag, &ptr, false, release_req);
791793
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
792794
return OMPI_ERR_OUT_OF_RESOURCE;
@@ -827,9 +829,9 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
827829

828830
OBJ_RETAIN(target_dt);
829831

830-
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target,
831-
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
832-
target_dt);
832+
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
833+
target, tag_to_target(tag), module->comm,
834+
ompi_osc_pt2pt_dt_send_complete, target_dt);
833835
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
834836
break;
835837
}
@@ -843,8 +845,9 @@ static inline int ompi_osc_pt2pt_rget_internal (void *origin_addr, int origin_co
843845

844846
/* TODO -- store the request somewhere so we can cancel it on error */
845847
pt2pt_request->outstanding_requests = 1;
846-
ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt, target, tag,
847-
module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
848+
ret = ompi_osc_pt2pt_irecv_w_cb (origin_addr, origin_count, origin_dt,
849+
target, tag_to_origin(tag), module->comm,
850+
NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
848851
} while (0);
849852

850853
if (OMPI_SUCCESS == ret) {
@@ -1045,9 +1048,9 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
10451048

10461049
OBJ_RETAIN(target_datatype);
10471050

1048-
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE, target_rank,
1049-
tag, module->comm, ompi_osc_pt2pt_dt_send_complete,
1050-
target_datatype);
1051+
ret = ompi_osc_pt2pt_isend_w_cb ((void *) packed_ddt, ddt_len, MPI_BYTE,
1052+
target_rank, tag_to_target(tag), module->comm,
1053+
ompi_osc_pt2pt_dt_send_complete, target_datatype);
10511054
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
10521055
break;
10531056
}
@@ -1059,8 +1062,9 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
10591062
ptr += ddt_len;
10601063
}
10611064

1062-
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype, target_rank, tag,
1063-
module->comm, NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
1065+
ret = ompi_osc_pt2pt_irecv_w_cb (result_addr, result_count, result_datatype,
1066+
target_rank, tag_to_origin(tag), module->comm,
1067+
NULL, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
10641068
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
10651069
break;
10661070
}
@@ -1077,8 +1081,9 @@ int ompi_osc_pt2pt_rget_accumulate_internal (const void *origin_addr, int origin
10771081
header->base.type = OMPI_OSC_PT2PT_HDR_TYPE_GET_ACC_LONG;
10781082
osc_pt2pt_hton(header, proc);
10791083

1080-
ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_datatype, target_rank,
1081-
tag, module->comm, ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
1084+
ret = ompi_osc_pt2pt_isend_w_cb (origin_addr, origin_count, origin_datatype,
1085+
target_rank, tag_to_target(tag), module->comm,
1086+
ompi_osc_pt2pt_req_comm_complete, pt2pt_request);
10821087
}
10831088
} while (0);
10841089

ompi/mca/osc/pt2pt/osc_pt2pt_component.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,6 @@ component_select(struct ompi_win_t *win, void **base, size_t size, int disp_unit
290290
/* fill in the function pointer part */
291291
memcpy(module, &ompi_osc_pt2pt_module_template,
292292
sizeof(ompi_osc_base_module_t));
293-
module->rtag_counter = 2;
294293

295294
/* initialize the objects, so that always free in cleanup */
296295
OBJ_CONSTRUCT(&module->lock, opal_mutex_t);

ompi/mca/osc/pt2pt/osc_pt2pt_data_move.c

Lines changed: 24 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
* Copyright (c) 2012-2013 Sandia National Laboratories. All rights reserved.
1515
* Copyright (c) 2014-2015 Research Organization for Information Science
1616
* and Technology (RIST). All rights reserved.
17+
* Copyright (c) 2016 FUJITSU LIMITED. All rights reserved.
1718
* $COPYRIGHT$
1819
*
1920
* Additional copyrights may follow
@@ -398,7 +399,7 @@ static inline int process_put_long(ompi_osc_pt2pt_module_t* module, int source,
398399
ret = ompi_osc_pt2pt_component_irecv (module, target,
399400
put_header->count,
400401
datatype, source,
401-
put_header->tag,
402+
tag_to_target(put_header->tag),
402403
module->comm);
403404
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
404405
OPAL_OUTPUT_VERBOSE((1, ompi_osc_base_framework.framework_output,
@@ -534,7 +535,8 @@ static inline int process_get (ompi_osc_pt2pt_module_t* module, int target,
534535
}
535536

536537
/* send get data */
537-
ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype, target, get_header->tag);
538+
ret = osc_pt2pt_get_post_send (module, source, get_header->count, datatype,
539+
target, tag_to_origin(get_header->tag));
538540

539541
OBJ_RELEASE(datatype);
540542

@@ -847,9 +849,9 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s
847849

848850
do {
849851
if (op == &ompi_mpi_op_replace.op) {
850-
ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype, source,
851-
acc_header->tag, module->comm, NULL,
852-
replace_cb, module);
852+
ret = ompi_osc_pt2pt_irecv_w_cb (target, acc_header->count, datatype,
853+
source, tag_to_target(acc_header->tag), module->comm,
854+
NULL, replace_cb, module);
853855
break;
854856
}
855857

@@ -876,8 +878,9 @@ static int ompi_osc_pt2pt_acc_long_start (ompi_osc_pt2pt_module_t *module, int s
876878
break;
877879
}
878880

879-
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype, source,
880-
acc_header->tag, module->comm, NULL, accumulate_cb, acc_data);
881+
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, primitive_count, primitive_datatype,
882+
source, tag_to_target(acc_header->tag), module->comm,
883+
NULL, accumulate_cb, acc_data);
881884
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
882885
OBJ_RELEASE(acc_data);
883886
}
@@ -925,8 +928,9 @@ static int ompi_osc_pt2pt_gacc_start (ompi_osc_pt2pt_module_t *module, int sourc
925928
break;
926929
}
927930

928-
ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype, source, acc_header->tag,
929-
module->comm, accumulate_cb, acc_data);
931+
ret = ompi_osc_pt2pt_isend_w_cb (target, acc_header->count, datatype,
932+
source, tag_to_origin(acc_header->tag), module->comm,
933+
accumulate_cb, acc_data);
930934
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
931935
OBJ_RELEASE(acc_data);
932936
}
@@ -994,15 +998,17 @@ static int ompi_osc_gacc_long_start (ompi_osc_pt2pt_module_t *module, int source
994998
break;
995999
}
9961000

997-
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype, source, acc_header->tag,
998-
module->comm, &recv_request, accumulate_cb, acc_data);
1001+
ret = ompi_osc_pt2pt_irecv_w_cb (buffer, acc_header->count, datatype,
1002+
source, tag_to_target(acc_header->tag), module->comm,
1003+
&recv_request, accumulate_cb, acc_data);
9991004
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
10001005
OBJ_RELEASE(acc_data);
10011006
break;
10021007
}
10031008

1004-
ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype, source, acc_header->tag,
1005-
module->comm, accumulate_cb, acc_data);
1009+
ret = ompi_osc_pt2pt_isend_w_cb (target, primitive_count, primitive_datatype,
1010+
source, tag_to_origin(acc_header->tag), module->comm,
1011+
accumulate_cb, acc_data);
10061012
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
10071013
/* cancel the receive and free the accumulate data */
10081014
ompi_request_cancel (recv_request);
@@ -1054,8 +1060,8 @@ static int ompi_osc_pt2pt_cswap_start (ompi_osc_pt2pt_module_t *module, int sour
10541060

10551061
do {
10561062
/* no reason to do a non-blocking send here */
1057-
ret = MCA_PML_CALL(send(target, 1, datatype, source, cswap_header->tag, MCA_PML_BASE_SEND_STANDARD,
1058-
module->comm));
1063+
ret = MCA_PML_CALL(send(target, 1, datatype, source, tag_to_origin(cswap_header->tag),
1064+
MCA_PML_BASE_SEND_STANDARD, module->comm));
10591065
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
10601066
break;
10611067
}
@@ -1503,8 +1509,9 @@ static int process_large_datatype_request (ompi_osc_pt2pt_module_t *module, int
15031509
memcpy (ddt_buffer->header, header, header_len);
15041510

15051511
ret = ompi_osc_pt2pt_irecv_w_cb ((void *)((uintptr_t) ddt_buffer->header + header_len),
1506-
ddt_len, MPI_BYTE, source, tag, module->comm, NULL,
1507-
process_large_datatype_request_cb, ddt_buffer);
1512+
ddt_len, MPI_BYTE,
1513+
source, tag_to_target(tag), module->comm,
1514+
NULL, process_large_datatype_request_cb, ddt_buffer);
15081515
if (OPAL_UNLIKELY(OMPI_SUCCESS != ret)) {
15091516
OBJ_RELEASE(ddt_buffer);
15101517
return ret;

0 commit comments

Comments
 (0)