Skip to content

Commit 4092138

Browse files
authored
Merge pull request #4987 from raafatfeki/master
fcoll/dynamic_gen2: use hindexed constructor on the sender side
2 parents e79debc + 1006777 commit 4092138

File tree

1 file changed

+73
-71
lines changed

1 file changed

+73
-71
lines changed

ompi/mca/fcoll/dynamic_gen2/fcoll_dynamic_gen2_file_write_all.c

Lines changed: 73 additions & 71 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636

3737
#define DEBUG_ON 0
3838
#define FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG 123
39+
#define INIT_LEN 10
3940

4041
/*Used for loading file-offsets per aggregator*/
4142
typedef struct mca_io_ompio_local_io_array{
@@ -55,13 +56,11 @@ typedef struct mca_io_ompio_aggregator_data {
5556
int current_index, current_position;
5657
int bytes_to_write_in_cycle, bytes_remaining, procs_per_group;
5758
int *procs_in_group, iov_index;
58-
bool sendbuf_is_contiguous, prev_sendbuf_is_contiguous;
5959
int bytes_sent, prev_bytes_sent;
6060
struct iovec *decoded_iov;
6161
int bytes_to_write, prev_bytes_to_write;
6262
mca_io_ompio_io_array_t *io_array, *prev_io_array;
6363
int num_io_entries, prev_num_io_entries;
64-
char *send_buf, *prev_send_buf;
6564
} mca_io_ompio_aggregator_data;
6665

6766

@@ -76,9 +75,7 @@ typedef struct mca_io_ompio_aggregator_data {
7675
for (_i=0; _i<_num; _i++ ) { \
7776
_aggr[_i]->prev_io_array=_aggr[_i]->io_array; \
7877
_aggr[_i]->prev_num_io_entries=_aggr[_i]->num_io_entries; \
79-
_aggr[_i]->prev_send_buf=_aggr[_i]->send_buf; \
8078
_aggr[_i]->prev_bytes_sent=_aggr[_i]->bytes_sent; \
81-
_aggr[_i]->prev_sendbuf_is_contiguous=_aggr[_i]->sendbuf_is_contiguous; \
8279
_aggr[_i]->prev_bytes_to_write=_aggr[_i]->bytes_to_write; \
8380
_t=_aggr[_i]->prev_global_buf; \
8481
_aggr[_i]->prev_global_buf=_aggr[_i]->global_buf; \
@@ -229,8 +226,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
229226
aggr_data[i]->procs_in_group = fh->f_procs_in_group;
230227
aggr_data[i]->comm = fh->f_comm;
231228
aggr_data[i]->buf = (char *)buf; // should not be used in the new version.
232-
aggr_data[i]->sendbuf_is_contiguous = false; //safe assumption for right now
233-
aggr_data[i]->prev_sendbuf_is_contiguous = false; //safe assumption for right now
234229
}
235230

236231
/*********************************************************************
@@ -611,10 +606,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
611606
end_write_time = MPI_Wtime();
612607
write_time += end_write_time - start_write_time;
613608
#endif
614-
615-
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
616-
free (aggr_data[i]->prev_send_buf);
617-
}
618609
}
619610

620611
} /* end for (index = 0; index < cycles; index++) */
@@ -644,10 +635,6 @@ int mca_fcoll_dynamic_gen2_file_write_all (mca_io_ompio_file_t *fh,
644635
end_write_time = MPI_Wtime();
645636
write_time += end_write_time - start_write_time;
646637
#endif
647-
648-
if (!aggr_data[i]->prev_sendbuf_is_contiguous && aggr_data[i]->prev_bytes_sent) {
649-
free (aggr_data[i]->prev_send_buf);
650-
}
651638
}
652639
}
653640

@@ -785,7 +772,6 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
785772
data->num_io_entries = 0;
786773
data->bytes_sent = 0;
787774
data->io_array=NULL;
788-
data->send_buf=NULL;
789775
/**********************************************************************
790776
*** 7a. Getting ready for next cycle: initializing and freeing buffers
791777
**********************************************************************/
@@ -1143,73 +1129,89 @@ static int shuffle_init ( int index, int cycles, int aggregator, int rank, mca_i
11431129
}
11441130
} /* end if (entries_per_aggr > 0 ) */
11451131
}/* end if (aggregator == rank ) */
1146-
1147-
if ( data->sendbuf_is_contiguous ) {
1148-
data->send_buf = &((char*)data->buf)[data->total_bytes_written];
1149-
}
1150-
else if (bytes_sent) {
1151-
/* allocate a send buffer and copy the data that needs
1152-
to be sent into it in case the data is non-contigous
1153-
in memory */
1154-
ptrdiff_t mem_address;
1155-
size_t remaining = 0;
1156-
size_t temp_position = 0;
1157-
1158-
data->send_buf = malloc (bytes_sent);
1159-
if (NULL == data->send_buf) {
1160-
opal_output (1, "OUT OF MEMORY\n");
1161-
ret = OMPI_ERR_OUT_OF_RESOURCE;
1162-
goto exit;
1132+
1133+
if (bytes_sent) {
1134+
size_t remaining = bytes_sent;
1135+
int block_index = -1;
1136+
int blocklength_size = INIT_LEN;
1137+
1138+
ptrdiff_t send_mem_address = NULL;
1139+
ompi_datatype_t *newType = MPI_DATATYPE_NULL;
1140+
int* blocklength_proc = (int *) calloc (blocklength_size, sizeof (int));
1141+
ptrdiff_t* displs_proc = (ptrdiff_t *) calloc (blocklength_size, sizeof (ptrdiff_t));
1142+
1143+
if (NULL == blocklength_proc || NULL == displs_proc ) {
1144+
opal_output (1, "OUT OF MEMORY\n");
1145+
ret = OMPI_ERR_OUT_OF_RESOURCE;
1146+
goto exit;
1147+
}
1148+
1149+
while (remaining) {
1150+
block_index++;
1151+
1152+
if(0 == block_index) {
1153+
send_mem_address = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1154+
data->current_position;
11631155
}
1164-
1165-
remaining = bytes_sent;
1166-
1167-
while (remaining) {
1168-
mem_address = (ptrdiff_t)
1169-
(data->decoded_iov[data->iov_index].iov_base) + data->current_position;
1170-
1171-
if (remaining >=
1172-
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1173-
memcpy (data->send_buf+temp_position,
1174-
(IOVBASE_TYPE *)mem_address,
1175-
data->decoded_iov[data->iov_index].iov_len - data->current_position);
1176-
remaining = remaining -
1177-
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
1178-
temp_position = temp_position +
1179-
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
1180-
data->iov_index = data->iov_index + 1;
1181-
data->current_position = 0;
1182-
}
1183-
else {
1184-
memcpy (data->send_buf+temp_position,
1185-
(IOVBASE_TYPE *) mem_address,
1186-
remaining);
1187-
data->current_position += remaining;
1188-
remaining = 0;
1189-
}
1156+
else {
1157+
// Reallocate more memory if blocklength_size is not enough
1158+
if(0 == block_index % INIT_LEN) {
1159+
blocklength_size += INIT_LEN;
1160+
blocklength_proc = (int *) realloc(blocklength_proc, blocklength_size * sizeof(int));
1161+
displs_proc = (ptrdiff_t *) realloc(displs_proc, blocklength_size * sizeof(ptrdiff_t));
1162+
}
1163+
displs_proc[block_index] = (ptrdiff_t) (data->decoded_iov[data->iov_index].iov_base) +
1164+
data->current_position - send_mem_address;
11901165
}
1191-
}
1192-
data->total_bytes_written += bytes_sent;
1193-
data->bytes_sent = bytes_sent;
1194-
/* Gather the sendbuf from each process in appropritate locations in
1195-
aggregators*/
1196-
1197-
if (bytes_sent){
1198-
ret = MCA_PML_CALL(isend(data->send_buf,
1199-
bytes_sent,
1200-
MPI_BYTE,
1166+
1167+
if (remaining >=
1168+
(data->decoded_iov[data->iov_index].iov_len - data->current_position)) {
1169+
1170+
blocklength_proc[block_index] = data->decoded_iov[data->iov_index].iov_len -
1171+
data->current_position;
1172+
remaining = remaining -
1173+
(data->decoded_iov[data->iov_index].iov_len - data->current_position);
1174+
data->iov_index = data->iov_index + 1;
1175+
data->current_position = 0;
1176+
}
1177+
else {
1178+
blocklength_proc[block_index] = remaining;
1179+
data->current_position += remaining;
1180+
remaining = 0;
1181+
}
1182+
}
1183+
1184+
data->total_bytes_written += bytes_sent;
1185+
data->bytes_sent = bytes_sent;
1186+
1187+
if ( 0 <= block_index ) {
1188+
ompi_datatype_create_hindexed(block_index+1,
1189+
blocklength_proc,
1190+
displs_proc,
1191+
MPI_BYTE,
1192+
&newType);
1193+
ompi_datatype_commit(&newType);
1194+
1195+
ret = MCA_PML_CALL(isend((char *)send_mem_address,
1196+
1,
1197+
newType,
12011198
aggregator,
12021199
FCOLL_DYNAMIC_GEN2_SHUFFLE_TAG+index,
12031200
MCA_PML_BASE_SEND_STANDARD,
12041201
data->comm,
12051202
&reqs[data->procs_per_group]));
1206-
1207-
1208-
if ( OMPI_SUCCESS != ret ){
1203+
if (OMPI_SUCCESS != ret){
12091204
goto exit;
12101205
}
1206+
}
1207+
if ( MPI_DATATYPE_NULL != newType ) {
1208+
ompi_datatype_destroy(&newType);
1209+
}
12111210

1211+
free(blocklength_proc);
1212+
free(displs_proc);
12121213
}
1214+
12131215

12141216
#if DEBUG_ON
12151217
if (aggregator == rank){

0 commit comments

Comments
 (0)