Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions ompi/include/mpi.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -1700,11 +1700,17 @@ OMPI_DECLSPEC int MPI_Pack(const void *inbuf, int incount, MPI_Datatype datatyp
OMPI_DECLSPEC int MPI_Pack_size(int incount, MPI_Datatype datatype, MPI_Comm comm,
int *size);
OMPI_DECLSPEC int MPI_Parrived(MPI_Request request, MPI_Count partition, int *flag);
OMPI_DECLSPEC int MPIX_Pbuf_prepare(MPI_Request request);
OMPI_DECLSPEC int MPIX_Pbuf_prepareall(int count, MPI_Request* requests);
OMPI_DECLSPEC int MPI_Pcontrol(const int level, ...);
OMPI_DECLSPEC int MPI_Pready(int partitions, MPI_Request request);
OMPI_DECLSPEC int MPIX_Pready_fast(int partitions, MPI_Request request);
OMPI_DECLSPEC int MPI_Pready_range(int partition_low, int partition_high,
MPI_Request request);
OMPI_DECLSPEC int MPIX_Pready_range_fast(int partition_low, int partition_high,
MPI_Request request);
OMPI_DECLSPEC int MPI_Pready_list(int length, int partition_list[], MPI_Request request);
OMPI_DECLSPEC int MPIX_Pready_list_fast(int length, int partition_list[], MPI_Request request);
OMPI_DECLSPEC int MPI_Precv_init(void* buf, int partitions, MPI_Count count,
MPI_Datatype datatype, int source, int tag, MPI_Comm comm,
MPI_Request *request);
Expand Down Expand Up @@ -2321,16 +2327,22 @@ OMPI_DECLSPEC int PMPI_Isend(const void *buf, int count, MPI_Datatype datatype,
int tag, MPI_Comm comm, MPI_Request *request);
OMPI_DECLSPEC int PMPI_Issend(const void *buf, int count, MPI_Datatype datatype, int dest,
int tag, MPI_Comm comm, MPI_Request *request);
OMPI_DECLSPEC int PMPIX_Pbuf_prepare(MPI_Request request);
OMPI_DECLSPEC int PMPIX_Pbuf_prepareall(int count, MPI_Request* requests);
OMPI_DECLSPEC int PMPI_Precv_init(void* buf, int partitions, MPI_Count count,
MPI_Datatype datatype, int source, int tag, MPI_Comm comm,
MPI_Request *request);
OMPI_DECLSPEC int PMPI_Psend_init(const void* buf, int partitions, MPI_Count count,
MPI_Datatype datatype, int dest, int tag, MPI_Comm comm,
MPI_Request *request);
OMPI_DECLSPEC int PMPI_Pready(int partitions, MPI_Request request);
OMPI_DECLSPEC int PMPIX_Pready_fast(int partitions, MPI_Request request);
OMPI_DECLSPEC int PMPI_Pready_range(int partition_low, int partition_high,
MPI_Request request);
OMPI_DECLSPEC int PMPIX_Pready_range_fast(int partition_low, int partition_high,
MPI_Request request);
OMPI_DECLSPEC int PMPI_Pready_list(int length, int partition_list[], MPI_Request request);
OMPI_DECLSPEC int PMPIX_Pready_list_fast(int length, int partition_list[], MPI_Request request);
OMPI_DECLSPEC int PMPI_Parrived(MPI_Request request, MPI_Count partition, int *flag);
OMPI_DECLSPEC int PMPI_Is_thread_main(int *flag);
OMPI_DECLSPEC int PMPI_Lookup_name(const char *service_name, MPI_Info info, char *port_name);
Expand Down
27 changes: 22 additions & 5 deletions ompi/mca/part/part.h
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,20 @@ typedef int (*mca_part_base_module_parrived_fn_t)(
struct ompi_request_t* request
);

/**
* Block until reciving buffer is ready.
*
* @param request (IN/OUT) Request
* @return OMPI_SUCCESS or failure status.
*
*/
typedef int (*mca_part_base_module_pbuf_prepare_fn_t)(
int count,
struct ompi_request_t** request
);



/**
* PART instance.
*/
Expand All @@ -220,11 +234,14 @@ struct mca_part_base_module_1_0_1_t {
mca_part_base_module_progress_fn_t part_progress;

/* downcalls from MPI to PART */
mca_part_base_module_precv_init_fn_t part_precv_init;
mca_part_base_module_psend_init_fn_t part_psend_init;
mca_part_base_module_start_fn_t part_start;
mca_part_base_module_pready_fn_t part_pready;
mca_part_base_module_parrived_fn_t part_parrived;
mca_part_base_module_precv_init_fn_t part_precv_init;
mca_part_base_module_psend_init_fn_t part_psend_init;
mca_part_base_module_start_fn_t part_start;
mca_part_base_module_pready_fn_t part_pready;
mca_part_base_module_pready_fn_t part_pready_fast; /* pready_fast is a varient of pready with the same function signature */
mca_part_base_module_parrived_fn_t part_parrived;
mca_part_base_module_pbuf_prepare_fn_t part_pbuf_prepare;

/* diagnostics */

/* FT Event */
Expand Down
1 change: 1 addition & 0 deletions ompi/mca/part/persist/part_persist.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ ompi_part_persist_t ompi_part_persist = {
.part_start = mca_part_persist_start,
.part_pready = mca_part_persist_pready,
.part_parrived = mca_part_persist_parrived,
.part_pbuf_prepare = mca_part_persist_pbuf_prepare,
}
};

Expand Down
62 changes: 62 additions & 0 deletions ompi/mca/part/persist/part_persist.h
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,7 @@ mca_part_persist_start(size_t count, ompi_request_t** requests)

for(i = 0; i < _count && OMPI_SUCCESS == err; i++) {
mca_part_persist_request_t *req = (mca_part_persist_request_t *)(requests[i]);
req->synched = 0;
/* First use is a special case, to support lazy initialization */
if(false == req->first_send)
{
Expand Down Expand Up @@ -544,6 +545,29 @@ mca_part_persist_pready(size_t min_part,
return err;
}

__opal_attribute_always_inline__ static inline int
mca_part_persist_pready_fast(size_t min_part,
size_t max_part,
ompi_request_t* request)
{
int err = OMPI_SUCCESS;
size_t i;

mca_part_persist_request_t *req = (mca_part_persist_request_t *)(request);
if(true == req->initialized)
{
err = req->persist_reqs[min_part]->req_start(max_part-min_part+1, (&(req->persist_reqs[min_part])));
for(i = min_part; i <= max_part && OMPI_SUCCESS == err; i++) {
req->flags[i] = 0; /* Mark partion as ready for testing */
}
}
else
{
return OMPI_ERROR; /* This codepath is erronious for pready_fast */
}
return err;
}

__opal_attribute_always_inline__ static inline int
mca_part_persist_parrived(size_t min_part,
size_t max_part,
Expand Down Expand Up @@ -574,6 +598,44 @@ mca_part_persist_parrived(size_t min_part,
return err;
}

__opal_attribute_always_inline__ static inline int
mca_part_persist_pbuf_prepare(int count, ompi_request_t** requests)
{
int err = OMPI_SUCCESS;
int not_synched = 1; int test;
while(not_synched) {
not_synched = 0;
for(int i = 0; i < count; i++) {
mca_part_persist_request_t* req = (mca_part_persist_request_t*)requests[i];
if(true == req->initialized) {
if(MCA_PART_PERSIST_REQUEST_PSEND == req->req_type) {
if(0 == req->synched) {
err = MCA_PML_CALL(irecv(NULL, 0, MPI_BYTE, OMPI_ANY_SOURCE, req->my_recv_tag, ompi_part_persist.part_comm_setup, &req->setup_req[1]));
req->synched = 1;
} else if (1 == req->synched) {
ompi_request_test(&(req->setup_req[1]), &test, MPI_STATUS_IGNORE);
if(test) req->synched = 2;
}
} else {
if(0 == req->synched) {
err = MCA_PML_CALL(isend(NULL, 0, MPI_BYTE, req->world_peer, req->my_recv_tag, MCA_PML_BASE_SEND_STANDARD, ompi_part_persist.part_comm_setup, &req->setup_req[0]));
req->synched = 1;
} else if (1 == req->synched) {
ompi_request_test(&(req->setup_req[0]), &test, MPI_STATUS_IGNORE);
if(test) req->synched = 2;
}
}
if(2 != req->synched) {
not_synched = 1;
}
} else {
not_synched = 1;
}
}
mca_part_persist_progress(); /* Manually invoke the progress engine */
}
return err;
}

/**
* mca_part_persist_free marks an entry as free called and sets the request to
Expand Down
1 change: 1 addition & 0 deletions ompi/mca/part/persist/part_persist_request.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ struct mca_part_persist_request_t {

struct mca_part_persist_list_t* progress_elem; /**< pointer to progress list element for removal durring free. */

int32_t synched; /**< Internal flag for pbuf_prepare */
};
typedef struct mca_part_persist_request_t mca_part_persist_request_t;
OBJ_CLASS_DECLARATION(mca_part_persist_request_t);
Expand Down
5 changes: 5 additions & 0 deletions ompi/mpi/c/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -301,10 +301,15 @@ libmpi_c_mpi_la_SOURCES = \
pack.c \
pack_size.c \
parrived.c \
pbuf_prepare.c \
pbuf_prepareall.c \
pcontrol.c \
pready.c \
pready_fast.c \
pready_list.c \
pready_list_fast.c \
pready_range.c \
pready_range_fast.c \
precv_init.c \
probe.c \
psend_init.c \
Expand Down
64 changes: 64 additions & 0 deletions ompi/mpi/c/pbuf_prepare.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2018 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>

#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mca/part/part.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/runtime/ompi_spc.h"

#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPIX_Pbuf_prepare = PMPIX_Pbuf_prepare
#endif
#define MPIX_Pbuf_prepare PMPIX_Pbuf_prepare
#endif

static const char FUNC_NAME[] = "MPIX_Pbuf_prepare";


int MPIX_Pbuf_prepare(MPI_Request request)
{
int rc;

SPC_RECORD(OMPI_SPC_PBUF_PREPARE, 1);

if (MPI_PARAM_CHECK) {
rc = OMPI_SUCCESS;

OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (NULL == request || OMPI_REQUEST_PART != request->req_type) {
rc = MPI_ERR_REQUEST;
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

rc = mca_part.part_pbuf_prepare(1, &request);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
65 changes: 65 additions & 0 deletions ompi/mpi/c/pbuf_prepareall.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2018 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>

#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mca/part/part.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/runtime/ompi_spc.h"

#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPIX_Pbuf_prepareall = PMPIX_Pbuf_prepareall
#endif
#define MPIX_Pbuf_prepareall PMPIX_Pbuf_prepareall
#endif

static const char FUNC_NAME[] = "MPIX_Pbuf_prepareall";


int MPIX_Pbuf_prepareall(int count, MPI_Request* requests)
{
int rc;

SPC_RECORD(OMPI_SPC_PBUF_PREPARE, 1);

if (MPI_PARAM_CHECK) {
rc = OMPI_SUCCESS;

OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
// TODO: Param Check
// if (NULL == requests || OMPI_REQUEST_PART != request->req_type) {
// rc = MPI_ERR_REQUEST;
// }
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

rc = mca_part.part_pbuf_prepare(count, requests);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
64 changes: 64 additions & 0 deletions ompi/mpi/c/pready_fast.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2004-2007 The Trustees of Indiana University and Indiana
* University Research and Technology
* Corporation. All rights reserved.
* Copyright (c) 2004-2018 The University of Tennessee and The University
* of Tennessee Research Foundation. All rights
* reserved.
* Copyright (c) 2004-2008 High Performance Computing Center Stuttgart,
* University of Stuttgart. All rights reserved.
* Copyright (c) 2004-2005 The Regents of the University of California.
* All rights reserved.
* Copyright (c) 2006 Cisco Systems, Inc. All rights reserved.
* Copyright (c) 2013-2015 Los Alamos National Security, LLC. All rights
* reserved.
* Copyright (c) 2015 Research Organization for Information Science
* and Technology (RIST). All rights reserved.
* Copyright (c) 2020 Sandia National Laboratories. All rights reserved.
* $COPYRIGHT$
*
* Additional copyrights may follow
*
* $HEADER$
*/
#include "ompi_config.h"
#include <stdio.h>

#include "ompi/mpi/c/bindings.h"
#include "ompi/runtime/params.h"
#include "ompi/communicator/communicator.h"
#include "ompi/errhandler/errhandler.h"
#include "ompi/mca/part/part.h"
#include "ompi/datatype/ompi_datatype.h"
#include "ompi/runtime/ompi_spc.h"

#if OMPI_BUILD_MPI_PROFILING
#if OPAL_HAVE_WEAK_SYMBOLS
#pragma weak MPIX_Pready_fast = PMPIX_Pready_fast
#endif
#define MPIX_Pready_fast PMPIX_Pready_fast
#endif

static const char FUNC_NAME[] = "MPIX_Pready_fast";


int MPIX_Pready_fast(int partition, MPI_Request request)
{
int rc;

SPC_RECORD(OMPI_SPC_PREADY, 1);

if (MPI_PARAM_CHECK) {
rc = OMPI_SUCCESS;

OMPI_ERR_INIT_FINALIZE(FUNC_NAME);
if (NULL == request || OMPI_REQUEST_PART != request->req_type) {
rc = MPI_ERR_REQUEST;
}
OMPI_ERRHANDLER_CHECK(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}

rc = mca_part.part_pready_fast(partition, partition, request);
OMPI_ERRHANDLER_RETURN(rc, MPI_COMM_WORLD, rc, FUNC_NAME);
}
Loading