1515 * Copyright (c) 2023 Jeffrey M. Squyres. All rights reserved.
1616 * Copyright (c) 2024 Triad National Security, LLC. All rights
1717 * reserved.
18+ * Copyright (c) 2024 Advanced Micro Devices, Inc. All rights reserved.
1819 * $COPYRIGHT$
1920 *
2021 * Additional copyrights may follow
3031#include "ompi/mca/fcoll/fcoll.h"
3132#include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
3233#include "ompi/mca/common/ompio/common_ompio.h"
34+ #include "ompi/mca/common/ompio/common_ompio_buffer.h"
3335#include "ompi/mca/io/io.h"
3436#include "ompi/mca/common/ompio/common_ompio_request.h"
3537#include "math.h"
3638#include "ompi/mca/pml/pml.h"
39+ #include "opal/mca/accelerator/accelerator.h"
3740#include <unistd.h>
3841
3942#define DEBUG_ON 0
@@ -88,13 +91,12 @@ typedef struct mca_io_ompio_aggregator_data {
8891 _aggr[_i]->prev_recvtype=(ompi_datatype_t **)_t; } \
8992}
9093
91-
92-
9394static int shuffle_init ( int index , int cycles , int aggregator , int rank ,
9495 mca_io_ompio_aggregator_data * data ,
9596 ompi_request_t * * reqs );
9697static int write_init (ompio_file_t * fh , int aggregator , mca_io_ompio_aggregator_data * aggr_data ,
97- int write_chunksize , int write_synchType , ompi_request_t * * request );
98+ int write_chunksize , int write_synchType , ompi_request_t * * request ,
99+ bool is_accelerator_buffer );
98100int mca_fcoll_vulcan_break_file_view ( struct iovec * decoded_iov , int iov_count ,
99101 struct iovec * local_iov_array , int local_count ,
100102 struct iovec * * * broken_decoded_iovs , int * * broken_iov_counts ,
@@ -155,6 +157,8 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
155157
156158 ompi_count_array_t fview_count_desc ;
157159 ompi_disp_array_t displs_desc ;
160+ int is_gpu , is_managed ;
161+ bool use_accelerator_buffer = false;
158162
159163#if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
160164 double write_time = 0.0 , start_write_time = 0.0 , end_write_time = 0.0 ;
@@ -180,6 +184,11 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
180184 goto exit ;
181185 }
182186
187+ mca_common_ompio_check_gpu_buf (fh , buf , & is_gpu , & is_managed );
188+ if (is_gpu && !is_managed &&
189+ fh -> f_get_mca_parameter_value ("use_accelerator_buffers" , strlen ("use_accelerator_buffers" ))) {
190+ use_accelerator_buffer = true;
191+ }
183192 /* since we want to overlap 2 iterations, define the bytes_per_cycle to be half of what
184193 the user requested */
185194 bytes_per_cycle = bytes_per_cycle /2 ;
@@ -529,13 +538,31 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
529538 goto exit ;
530539 }
531540
532-
533- aggr_data [i ]-> global_buf = (char * ) malloc (bytes_per_cycle );
534- aggr_data [i ]-> prev_global_buf = (char * ) malloc (bytes_per_cycle );
535- if (NULL == aggr_data [i ]-> global_buf || NULL == aggr_data [i ]-> prev_global_buf ){
536- opal_output (1 , "OUT OF MEMORY" );
537- ret = OMPI_ERR_OUT_OF_RESOURCE ;
538- goto exit ;
541+ if (use_accelerator_buffer ) {
542+ opal_output_verbose (10 , ompi_fcoll_base_framework .framework_output ,
543+ "Allocating GPU device buffer for aggregation\n" );
544+ ret = opal_accelerator .mem_alloc (MCA_ACCELERATOR_NO_DEVICE_ID , (void * * )& aggr_data [i ]-> global_buf ,
545+ bytes_per_cycle );
546+ if (OPAL_SUCCESS != ret ) {
547+ opal_output (1 , "Could not allocate accelerator memory" );
548+ ret = OMPI_ERR_OUT_OF_RESOURCE ;
549+ goto exit ;
550+ }
551+ ret = opal_accelerator .mem_alloc (MCA_ACCELERATOR_NO_DEVICE_ID , (void * * )& aggr_data [i ]-> prev_global_buf ,
552+ bytes_per_cycle );
553+ if (OPAL_SUCCESS != ret ) {
554+ opal_output (1 , "Could not allocate accelerator memory" );
555+ ret = OMPI_ERR_OUT_OF_RESOURCE ;
556+ goto exit ;
557+ }
558+ } else {
559+ aggr_data [i ]-> global_buf = (char * ) malloc (bytes_per_cycle );
560+ aggr_data [i ]-> prev_global_buf = (char * ) malloc (bytes_per_cycle );
561+ if (NULL == aggr_data [i ]-> global_buf || NULL == aggr_data [i ]-> prev_global_buf ){
562+ opal_output (1 , "OUT OF MEMORY" );
563+ ret = OMPI_ERR_OUT_OF_RESOURCE ;
564+ goto exit ;
565+ }
539566 }
540567
541568 aggr_data [i ]-> recvtype = (ompi_datatype_t * * ) malloc (fh -> f_procs_per_group *
@@ -605,7 +632,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
605632 start_write_time = MPI_Wtime ();
606633#endif
607634 ret = write_init (fh , fh -> f_aggr_list [aggr_index ], aggr_data [aggr_index ],
608- write_chunksize , write_synch_type , & req_iwrite );
635+ write_chunksize , write_synch_type , & req_iwrite , use_accelerator_buffer );
609636 if (OMPI_SUCCESS != ret ){
610637 goto exit ;
611638 }
@@ -645,7 +672,7 @@ int mca_fcoll_vulcan_file_write_all (struct ompio_file_t *fh,
645672 start_write_time = MPI_Wtime ();
646673#endif
647674 ret = write_init (fh , fh -> f_aggr_list [aggr_index ], aggr_data [aggr_index ],
648- write_chunksize , write_synch_type , & req_iwrite );
675+ write_chunksize , write_synch_type , & req_iwrite , use_accelerator_buffer );
649676 if (OMPI_SUCCESS != ret ){
650677 goto exit ;
651678 }
@@ -704,8 +731,13 @@ exit :
704731
705732 free (aggr_data [i ]-> disp_index );
706733 free (aggr_data [i ]-> max_disp_index );
707- free (aggr_data [i ]-> global_buf );
708- free (aggr_data [i ]-> prev_global_buf );
734+ if (use_accelerator_buffer ) {
735+ opal_accelerator .mem_release (MCA_ACCELERATOR_NO_DEVICE_ID , aggr_data [i ]-> global_buf );
736+ opal_accelerator .mem_release (MCA_ACCELERATOR_NO_DEVICE_ID , aggr_data [i ]-> prev_global_buf );
737+ } else {
738+ free (aggr_data [i ]-> global_buf );
739+ free (aggr_data [i ]-> prev_global_buf );
740+ }
709741 for (l = 0 ;l < aggr_data [i ]-> procs_per_group ;l ++ ){
710742 free (aggr_data [i ]-> blocklen_per_process [l ]);
711743 free (aggr_data [i ]-> displs_per_process [l ]);
@@ -749,7 +781,8 @@ static int write_init (ompio_file_t *fh,
749781 mca_io_ompio_aggregator_data * aggr_data ,
750782 int write_chunksize ,
751783 int write_synchType ,
752- ompi_request_t * * request )
784+ ompi_request_t * * request ,
785+ bool is_accelerator_buffer )
753786{
754787 int ret = OMPI_SUCCESS ;
755788 ssize_t ret_temp = 0 ;
@@ -770,11 +803,20 @@ static int write_init (ompio_file_t *fh,
770803 write_chunksize );
771804
772805 if (1 == write_synchType ) {
773- ret = fh -> f_fbtl -> fbtl_ipwritev (fh , (ompi_request_t * ) ompio_req );
774- if (0 > ret ) {
775- opal_output (1 , "vulcan_write_all: fbtl_ipwritev failed\n" );
776- ompio_req -> req_ompi .req_status .MPI_ERROR = ret ;
777- ompio_req -> req_ompi .req_status ._ucount = 0 ;
806+ if (is_accelerator_buffer ) {
807+ ret = mca_common_ompio_file_iwrite_pregen (fh , (ompi_request_t * ) ompio_req );
808+ if (0 > ret ) {
809+ opal_output (1 , "vulcan_write_all: mca_common_ompio_iwrite_pregen failed\n" );
810+ ompio_req -> req_ompi .req_status .MPI_ERROR = ret ;
811+ ompio_req -> req_ompi .req_status ._ucount = 0 ;
812+ }
813+ } else {
814+ ret = fh -> f_fbtl -> fbtl_ipwritev (fh , (ompi_request_t * ) ompio_req );
815+ if (0 > ret ) {
816+ opal_output (1 , "vulcan_write_all: fbtl_ipwritev failed\n" );
817+ ompio_req -> req_ompi .req_status .MPI_ERROR = ret ;
818+ ompio_req -> req_ompi .req_status ._ucount = 0 ;
819+ }
778820 }
779821 }
780822 else {
0 commit comments