Skip to content

Commit 3a7516f

Browse files
committed
support for mpi_comm_attach_buffer and friends
Add support for functions added as part of MPI 4.1 standard concerning buffer management for buffered send mode: MPI_Comm_attach_buffer MPI_Comm_detach_buffer MPI_Session_attach_buffer MPI_Session_detach_buffer MPI_Comm_buffer_flush MPI_Comm_buffer_iflush MPI_Session_buffer_flush MPI_Session_buffer_iflush Full support for non-blocking flush is deferred to a subsequent PR to avoid reviewer overload. Related to #12074 Signed-off-by: Howard Pritchard <[email protected]>
1 parent c859bfe commit 3a7516f

23 files changed

+1157
-132
lines changed

ompi/communicator/communicator.h

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
* Copyright (c) 2015 Research Organization for Information Science
2323
* and Technology (RIST). All rights reserved.
2424
* Copyright (c) 2016-2017 IBM Corporation. All rights reserved.
25-
* Copyright (c) 2018-2024 Triad National Security, LLC. All rights
25+
* Copyright (c) 2018-2025 Triad National Security, LLC. All rights
2626
* reserved.
2727
* Copyright (c) 2023 Advanced Micro Devices, Inc. All rights reserved.
2828
* Copyright (c) 2024 NVIDIA Corporation. All rights reserved.
@@ -345,6 +345,9 @@ struct ompi_communicator_t {
345345
/* instance that this comm belongs to */
346346
ompi_instance_t* instance;
347347

348+
/* pointer to buffer object used for buffered sends */
349+
void *bsend_buffer;
350+
348351
#if OPAL_ENABLE_FT_MPI
349352
/** agreement caching info for topology and previous returned decisions */
350353
opal_object_t *agreement_specific;
@@ -775,6 +778,18 @@ static inline bool ompi_comm_iface_create_check(ompi_communicator_t *comm, int *
775778
return ompi_comm_iface_coll_check(comm, err);
776779
}
777780

781+
static inline void *ompi_comm_bsend_buffer_get(ompi_communicator_t *comm)
782+
{
783+
assert(NULL != comm);
784+
return comm->bsend_buffer;
785+
}
786+
787+
static inline int ompi_comm_bsend_buffer_set(ompi_communicator_t *comm, void *buffer)
788+
{
789+
comm->bsend_buffer = buffer;
790+
return OMPI_SUCCESS;
791+
}
792+
778793
/*
779794
* Communicator creation support collectives
780795
* - Agreement style allreduce

ompi/include/mpi.h.in

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -564,6 +564,7 @@ typedef MPI_Win_errhandler_function MPI_Win_errhandler_fn
564564
#define MPI_WEIGHTS_EMPTY ((int *) 3) /* empty weights */
565565
#define MPI_BOTTOM ((void *) 0) /* base reference address */
566566
#define MPI_IN_PLACE ((void *) 1) /* in place buffer */
567+
#define MPI_BUFFER_AUTOMATIC ((void *) 4) /* MPI_BUFFER_AUTOMATIC for buffer attach funcs */
567568
#define MPI_BSEND_OVERHEAD 128 /* size of bsend header + ptr */
568569
#define MPI_MAX_INFO_KEY OPAL_MAX_INFO_KEY /* max info key length */
569570
#define MPI_MAX_INFO_VAL OPAL_MAX_INFO_VAL /* max info value length */
@@ -1574,6 +1575,8 @@ OMPI_DECLSPEC int MPI_Buffer_attach(void *buffer, int size);
15741575
OMPI_DECLSPEC int MPI_Buffer_attach_c(void *buffer, MPI_Count size);
15751576
OMPI_DECLSPEC int MPI_Buffer_detach(void *buffer, int *size);
15761577
OMPI_DECLSPEC int MPI_Buffer_detach_c(void *buffer, MPI_Count *size);
1578+
OMPI_DECLSPEC int MPI_Buffer_flush(void);
1579+
OMPI_DECLSPEC int MPI_Buffer_iflush(MPI_Request *request);
15771580
OMPI_DECLSPEC int MPI_Cancel(MPI_Request *request);
15781581
OMPI_DECLSPEC int MPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]);
15791582
OMPI_DECLSPEC int MPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[],
@@ -1637,11 +1640,17 @@ OMPI_DECLSPEC int MPI_Dist_graph_neighbors_count(MPI_Comm comm,
16371640
int *inneighbors,
16381641
int *outneighbors,
16391642
int *weighted);
1643+
OMPI_DECLSPEC int MPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size);
1644+
OMPI_DECLSPEC int MPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size);
1645+
OMPI_DECLSPEC int MPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size);
1646+
OMPI_DECLSPEC int MPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size);
1647+
OMPI_DECLSPEC int MPI_Comm_flush_buffer(MPI_Comm comm);
16401648
OMPI_DECLSPEC int MPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler);
16411649
OMPI_DECLSPEC int MPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used);
16421650
OMPI_DECLSPEC int MPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen);
16431651
OMPI_DECLSPEC int MPI_Comm_get_parent(MPI_Comm *parent);
16441652
OMPI_DECLSPEC int MPI_Comm_group(MPI_Comm comm, MPI_Group *group);
1653+
OMPI_DECLSPEC int MPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request);
16451654
OMPI_DECLSPEC int MPI_Comm_join(int fd, MPI_Comm *intercomm);
16461655
OMPI_DECLSPEC int MPI_Comm_rank(MPI_Comm comm, int *rank);
16471656
OMPI_DECLSPEC int MPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group);
@@ -2331,15 +2340,21 @@ OMPI_DECLSPEC int MPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Datat
23312340
int dest, int sendtag, int source, int recvtag,
23322341
MPI_Comm comm, MPI_Status *status);
23332342
OMPI_DECLSPEC MPI_Fint MPI_Session_c2f (const MPI_Session session);
2343+
OMPI_DECLSPEC int MPI_Session_attach_buffer(MPI_Session session, void *buffer, int size);
2344+
OMPI_DECLSPEC int MPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size);
23342345
OMPI_DECLSPEC int MPI_Session_call_errhandler(MPI_Session session, int errorcode);
23352346
OMPI_DECLSPEC int MPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn,
23362347
MPI_Errhandler *errhandler);
2348+
OMPI_DECLSPEC int MPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size);
2349+
OMPI_DECLSPEC int MPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size);
23372350
OMPI_DECLSPEC int MPI_Session_finalize (MPI_Session *session);
2351+
OMPI_DECLSPEC int MPI_Session_flush_buffer(MPI_Session session);
23382352
OMPI_DECLSPEC int MPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler);
23392353
OMPI_DECLSPEC int MPI_Session_get_info (MPI_Session session, MPI_Info *info_used);
23402354
OMPI_DECLSPEC int MPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names);
23412355
OMPI_DECLSPEC int MPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name);
23422356
OMPI_DECLSPEC int MPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used);
2357+
OMPI_DECLSPEC int MPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request);
23432358
OMPI_DECLSPEC int MPI_Session_init (MPI_Info info, MPI_Errhandler errhandler,
23442359
MPI_Session *session);
23452360
OMPI_DECLSPEC MPI_Session MPI_Session_f2c (MPI_Fint session);
@@ -2733,6 +2748,8 @@ OMPI_DECLSPEC int PMPI_Buffer_attach(void *buffer, int size);
27332748
OMPI_DECLSPEC int PMPI_Buffer_attach_c(void *buffer, MPI_Count size);
27342749
OMPI_DECLSPEC int PMPI_Buffer_detach(void *buffer, int *size);
27352750
OMPI_DECLSPEC int PMPI_Buffer_detach_c(void *buffer, MPI_Count *size);
2751+
OMPI_DECLSPEC int PMPI_Buffer_flush(void);
2752+
OMPI_DECLSPEC int PMPI_Buffer_iflush(MPI_Request *request);
27362753
OMPI_DECLSPEC int PMPI_Cancel(MPI_Request *request);
27372754
OMPI_DECLSPEC int PMPI_Cart_coords(MPI_Comm comm, int rank, int maxdims, int coords[]);
27382755
OMPI_DECLSPEC int PMPI_Cart_create(MPI_Comm old_comm, int ndims, const int dims[],
@@ -2796,11 +2813,17 @@ OMPI_DECLSPEC int PMPI_Dist_graph_neighbors_count(MPI_Comm comm,
27962813
int *inneighbors,
27972814
int *outneighbors,
27982815
int *weighted);
2816+
OMPI_DECLSPEC int PMPI_Comm_attach_buffer(MPI_Comm comm, void *buffer, int size);
2817+
OMPI_DECLSPEC int PMPI_Comm_attach_buffer_c(MPI_Comm comm, void *buffer, MPI_Count size);
2818+
OMPI_DECLSPEC int PMPI_Comm_detach_buffer(MPI_Comm comm, void *buffer_addr, int *size);
2819+
OMPI_DECLSPEC int PMPI_Comm_detach_buffer_c(MPI_Comm comm, void *buffer_addr, MPI_Count *size);
2820+
OMPI_DECLSPEC int PMPI_Comm_flush_buffer(MPI_Comm comm);
27992821
OMPI_DECLSPEC int PMPI_Comm_get_errhandler(MPI_Comm comm, MPI_Errhandler *erhandler);
28002822
OMPI_DECLSPEC int PMPI_Comm_get_info(MPI_Comm comm, MPI_Info *info_used);
28012823
OMPI_DECLSPEC int PMPI_Comm_get_name(MPI_Comm comm, char *comm_name, int *resultlen);
28022824
OMPI_DECLSPEC int PMPI_Comm_get_parent(MPI_Comm *parent);
28032825
OMPI_DECLSPEC int PMPI_Comm_group(MPI_Comm comm, MPI_Group *group);
2826+
OMPI_DECLSPEC int PMPI_Comm_iflush_buffer(MPI_Comm comm, MPI_Request *request);
28042827
OMPI_DECLSPEC int PMPI_Comm_join(int fd, MPI_Comm *intercomm);
28052828
OMPI_DECLSPEC int PMPI_Comm_rank(MPI_Comm comm, int *rank);
28062829
OMPI_DECLSPEC int PMPI_Comm_remote_group(MPI_Comm comm, MPI_Group *group);
@@ -3490,15 +3513,21 @@ OMPI_DECLSPEC int PMPI_Sendrecv_replace_c(void * buf, MPI_Count count, MPI_Data
34903513
int dest, int sendtag, int source, int recvtag,
34913514
MPI_Comm comm, MPI_Status *status);
34923515
OMPI_DECLSPEC MPI_Fint PMPI_Session_c2f (const MPI_Session session);
3516+
OMPI_DECLSPEC int PMPI_Session_attach_buffer(MPI_Session session, void *buffer, int size);
3517+
OMPI_DECLSPEC int PMPI_Session_attach_buffer_c(MPI_Session session, void *buffer, MPI_Count size);
34933518
OMPI_DECLSPEC int PMPI_Session_call_errhandler(MPI_Session session, int errorcode);
34943519
OMPI_DECLSPEC int PMPI_Session_create_errhandler (MPI_Session_errhandler_function *session_errhandler_fn,
34953520
MPI_Errhandler *errhandler);
3521+
OMPI_DECLSPEC int PMPI_Session_detach_buffer(MPI_Session session, void *buffer_addr, int *size);
3522+
OMPI_DECLSPEC int PMPI_Session_detach_buffer_c(MPI_Session session, void *buffer_addr, MPI_Count *size);
34963523
OMPI_DECLSPEC int PMPI_Session_finalize (MPI_Session *session);
3524+
OMPI_DECLSPEC int PMPI_Session_flush_buffer(MPI_Session session);
34973525
OMPI_DECLSPEC int PMPI_Session_get_errhandler(MPI_Session session, MPI_Errhandler *erhandler);
34983526
OMPI_DECLSPEC int PMPI_Session_get_info (MPI_Session session, MPI_Info *info_used);
34993527
OMPI_DECLSPEC int PMPI_Session_get_num_psets (MPI_Session session, MPI_Info info, int *npset_names);
35003528
OMPI_DECLSPEC int PMPI_Session_get_nth_pset (MPI_Session session, MPI_Info info, int n, int *len, char *pset_name);
35013529
OMPI_DECLSPEC int PMPI_Session_get_pset_info (MPI_Session session, const char *pset_name, MPI_Info *info_used);
3530+
OMPI_DECLSPEC int PMPI_Session_iflush_buffer(MPI_Session session, MPI_Request *request);
35023531
OMPI_DECLSPEC int PMPI_Session_init (MPI_Info info, MPI_Errhandler errhandler,
35033532
MPI_Session *session);
35043533
OMPI_DECLSPEC MPI_Session PMPI_Session_f2c (MPI_Fint session);

ompi/instance/instance.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -894,7 +894,7 @@ static int ompi_mpi_instance_finalize_common (void)
894894
/* As finalize is the last legal MPI call, we are allowed to force the release
895895
* of the user buffer used for bsend, before going anywhere further.
896896
*/
897-
(void) mca_pml_base_bsend_detach (NULL, NULL);
897+
(void) mca_pml_base_bsend_detach(BASE_BSEND_BUF, NULL, NULL, NULL);
898898

899899
/* Shut down any bindings-specific issues: C++, F77, F90 */
900900

ompi/instance/instance.h

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
22
/*
3-
* Copyright (c) 2018 Triad National Security, LLC. All rights reserved.
3+
* Copyright (c) 2018-2025 Triad National Security, LLC. All rights reserved.
44
* $COPYRIGHT$
55
*
66
* Additional copyrights may follow
@@ -41,6 +41,10 @@ struct ompi_instance_t {
4141

4242
ompi_errhandler_t *error_handler;
4343
ompi_errhandler_type_t errhandler_type;
44+
45+
/* pointer to buffer object used for buffered sends */
46+
void *bsend_buffer;
47+
4448
};
4549

4650
typedef struct ompi_instance_t ompi_instance_t;
@@ -164,4 +168,16 @@ static inline int ompi_instance_invalid (const ompi_instance_t* instance)
164168
return false;
165169
}
166170

171+
static inline void *ompi_instance_bsend_buffer_get(ompi_instance_t *instance)
172+
{
173+
assert(NULL != instance);
174+
return instance->bsend_buffer;
175+
}
176+
177+
static inline int ompi_instance_bsend_buffer_set(ompi_instance_t *instance, void *buffer)
178+
{
179+
instance->bsend_buffer = buffer;
180+
return OMPI_SUCCESS;
181+
}
182+
167183
#endif /* !defined(OMPI_INSTANCE_H) */

0 commit comments

Comments
 (0)