@@ -698,6 +698,7 @@ exit :
698698 }
699699 free (aggr_data );
700700 }
701+ free (local_iov_array );
701702 free (displs );
702703 free (decoded_iov );
703704 free (broken_counts );
@@ -768,6 +769,8 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
768769 MPI_Aint * memory_displacements = NULL ;
769770 int * temp_disp_index = NULL ;
770771 MPI_Aint global_count = 0 ;
772+ int * blocklength_proc = NULL ;
773+ ptrdiff_t * displs_proc = NULL ;
771774
772775 data -> num_io_entries = 0 ;
773776 data -> bytes_sent = 0 ;
@@ -1131,85 +1134,82 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
11311134 }/* end if (aggregator == rank ) */
11321135
11331136 if (bytes_sent ) {
1134- size_t remaining = bytes_sent ;
1135- int block_index = -1 ;
1136- int blocklength_size = INIT_LEN ;
1137+ size_t remaining = bytes_sent ;
1138+ int block_index = -1 ;
1139+ int blocklength_size = INIT_LEN ;
11371140
1138- ptrdiff_t send_mem_address = NULL ;
1139- ompi_datatype_t * newType = MPI_DATATYPE_NULL ;
1140- int * blocklength_proc = ( int * ) calloc (blocklength_size , sizeof (int ));
1141- ptrdiff_t * displs_proc = ( ptrdiff_t * ) calloc (blocklength_size , sizeof (ptrdiff_t ));
1141+ ptrdiff_t send_mem_address = NULL ;
1142+ ompi_datatype_t * newType = MPI_DATATYPE_NULL ;
1143+ blocklength_proc = ( int * ) calloc (blocklength_size , sizeof (int ));
1144+ displs_proc = ( ptrdiff_t * ) calloc (blocklength_size , sizeof (ptrdiff_t ));
11421145
1143- if (NULL == blocklength_proc || NULL == displs_proc ) {
1144- opal_output (1 , "OUT OF MEMORY\n" );
1145- ret = OMPI_ERR_OUT_OF_RESOURCE ;
1146- goto exit ;
1147- }
1146+ if (NULL == blocklength_proc || NULL == displs_proc ) {
1147+ opal_output (1 , "OUT OF MEMORY\n" );
1148+ ret = OMPI_ERR_OUT_OF_RESOURCE ;
1149+ goto exit ;
1150+ }
11481151
1149- while (remaining ) {
1150- block_index ++ ;
1152+ while (remaining ) {
1153+ block_index ++ ;
11511154
1152- if (0 == block_index ) {
1153- send_mem_address = (ptrdiff_t ) (data -> decoded_iov [data -> iov_index ].iov_base ) +
1154- data -> current_position ;
1155- }
1156- else {
1157- // Reallocate more memory if blocklength_size is not enough
1158- if (0 == block_index % INIT_LEN ) {
1159- blocklength_size += INIT_LEN ;
1160- blocklength_proc = (int * ) realloc (blocklength_proc , blocklength_size * sizeof (int ));
1161- displs_proc = (ptrdiff_t * ) realloc (displs_proc , blocklength_size * sizeof (ptrdiff_t ));
1162- }
1163- displs_proc [block_index ] = (ptrdiff_t ) (data -> decoded_iov [data -> iov_index ].iov_base ) +
1164- data -> current_position - send_mem_address ;
1165- }
1155+ if (0 == block_index ) {
1156+ send_mem_address = (ptrdiff_t ) (data -> decoded_iov [data -> iov_index ].iov_base ) +
1157+ data -> current_position ;
1158+ }
1159+ else {
1160+ // Reallocate more memory if blocklength_size is not enough
1161+ if (0 == block_index % INIT_LEN ) {
1162+ blocklength_size += INIT_LEN ;
1163+ blocklength_proc = (int * ) realloc (blocklength_proc , blocklength_size * sizeof (int ));
1164+ displs_proc = (ptrdiff_t * ) realloc (displs_proc , blocklength_size * sizeof (ptrdiff_t ));
1165+ }
1166+ displs_proc [block_index ] = (ptrdiff_t ) (data -> decoded_iov [data -> iov_index ].iov_base ) +
1167+ data -> current_position - send_mem_address ;
1168+ }
11661169
1167- if (remaining >=
1168- (data -> decoded_iov [data -> iov_index ].iov_len - data -> current_position )) {
1170+ if (remaining >=
1171+ (data -> decoded_iov [data -> iov_index ].iov_len - data -> current_position )) {
11691172
1170- blocklength_proc [block_index ] = data -> decoded_iov [data -> iov_index ].iov_len -
1171- data -> current_position ;
1172- remaining = remaining -
1173- (data -> decoded_iov [data -> iov_index ].iov_len - data -> current_position );
1174- data -> iov_index = data -> iov_index + 1 ;
1175- data -> current_position = 0 ;
1176- }
1177- else {
1178- blocklength_proc [block_index ] = remaining ;
1179- data -> current_position += remaining ;
1180- remaining = 0 ;
1173+ blocklength_proc [block_index ] = data -> decoded_iov [data -> iov_index ].iov_len -
1174+ data -> current_position ;
1175+ remaining = remaining -
1176+ (data -> decoded_iov [data -> iov_index ].iov_len - data -> current_position );
1177+ data -> iov_index = data -> iov_index + 1 ;
1178+ data -> current_position = 0 ;
1179+ }
1180+ else {
1181+ blocklength_proc [block_index ] = remaining ;
1182+ data -> current_position += remaining ;
1183+ remaining = 0 ;
1184+ }
11811185 }
1182- }
1183-
1184- data -> total_bytes_written += bytes_sent ;
1185- data -> bytes_sent = bytes_sent ;
1186-
1187- if ( 0 <= block_index ) {
1188- ompi_datatype_create_hindexed (block_index + 1 ,
1189- blocklength_proc ,
1190- displs_proc ,
1191- MPI_BYTE ,
1192- & newType );
1193- ompi_datatype_commit (& newType );
1194-
1195- ret = MCA_PML_CALL (isend ((char * )send_mem_address ,
1196- 1 ,
1197- newType ,
1198- aggregator ,
1199- FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG + index ,
1200- MCA_PML_BASE_SEND_STANDARD ,
1201- data -> comm ,
1202- & reqs [data -> procs_per_group ]));
1203- if (OMPI_SUCCESS != ret ){
1204- goto exit ;
1186+
1187+ data -> total_bytes_written += bytes_sent ;
1188+ data -> bytes_sent = bytes_sent ;
1189+
1190+ if ( 0 <= block_index ) {
1191+ ompi_datatype_create_hindexed (block_index + 1 ,
1192+ blocklength_proc ,
1193+ displs_proc ,
1194+ MPI_BYTE ,
1195+ & newType );
1196+ ompi_datatype_commit (& newType );
1197+
1198+ ret = MCA_PML_CALL (isend ((char * )send_mem_address ,
1199+ 1 ,
1200+ newType ,
1201+ aggregator ,
1202+ FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG + index ,
1203+ MCA_PML_BASE_SEND_STANDARD ,
1204+ data -> comm ,
1205+ & reqs [data -> procs_per_group ]));
1206+ if ( MPI_DATATYPE_NULL != newType ) {
1207+ ompi_datatype_destroy (& newType );
1208+ }
1209+ if (OMPI_SUCCESS != ret ){
1210+ goto exit ;
1211+ }
12051212 }
1206- }
1207- if ( MPI_DATATYPE_NULL != newType ) {
1208- ompi_datatype_destroy (& newType );
1209- }
1210-
1211- free (blocklength_proc );
1212- free (displs_proc );
12131213 }
12141214
12151215
@@ -1288,6 +1288,8 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
12881288 free (sorted_file_offsets );
12891289 free (file_offsets_for_agg );
12901290 free (memory_displacements );
1291+ free (blocklength_proc );
1292+ free (displs_proc );
12911293
12921294 return OMPI_SUCCESS ;
12931295}
0 commit comments