Skip to content

Commit 5b6e1be

Browse files
committed
fix the merge algorithm in the individual sharedfp component, which could
lead to file inconsistency in case of identical timestamps Also fixes a potential buffer size problem.
1 parent 5e4f1c7 commit 5b6e1be

File tree

2 files changed

+45
-13
lines changed

2 files changed

+45
-13
lines changed

ompi/mca/sharedfp/individual/sharedfp_individual.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -139,9 +139,9 @@ mca_sharedfp_individual_header_record* mca_sharedfp_individual_insert_headnode(v
139139
int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh);
140140
int mca_sharedfp_individual_get_timestamps_and_reclengths(double **buff, long **rec_length, MPI_Offset **offbuff,struct mca_sharedfp_base_data_t *sh);
141141
int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totalnodes,int size);
142-
int mca_sharedfp_individual_sort_timestamps(double **ts,MPI_Offset **off, int totalnodes);
142+
int mca_sharedfp_individual_sort_timestamps(double **ts,MPI_Offset **off, int **ranks, int totalnodes);
143143
MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,int totalnodes,struct mca_sharedfp_base_data_t *sh);
144-
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int totalnodes);
144+
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes);
145145
/*int mca_sharedfp_individual_cleanup(double *ts, int* rnk, MPI_Offset *off);*/
146146

147147
int mca_sharedfp_individual_insert_metadata(int functype,long recordlength,struct mca_sharedfp_base_data_t *sh );

ompi/mca/sharedfp/individual/sharedfp_individual_collaborate_data.c

Lines changed: 43 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
3838
MPI_Comm comm;
3939
int rank, size;
4040
int nodesoneachprocess = 0;
41-
int idx = 0,i = 0;
41+
int idx=0,i=0,j=0, l=0;
42+
int *ranks = NULL;
4243
double *timestampbuff = NULL;
4344
OMPI_MPI_OFFSET_TYPE *offsetbuff = NULL;
4445
int *countbuff = NULL;
@@ -48,6 +49,7 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
4849
OMPI_MPI_OFFSET_TYPE *local_off = NULL;
4950
int totalnodes = 0;
5051
ompi_status_public_t status;
52+
int recordlength=0;
5153

5254
comm = sh->comm;
5355

@@ -99,7 +101,7 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
99101
}
100102
}
101103

102-
if ( nodesoneachprocess == 0) {
104+
if ( 0 == nodesoneachprocess ) {
103105
ind_ts[0] = 0;
104106
ind_recordlength[0] = 0;
105107
local_off[0] = 0;
@@ -118,6 +120,17 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
118120
goto exit;
119121
}
120122

123+
ranks = (int *) malloc ( totalnodes * sizeof(int));
124+
if ( NULL == ranks ) {
125+
ret = OMPI_ERR_OUT_OF_RESOURCE;
126+
goto exit;
127+
}
128+
for ( l=0, i=0; i<size; i++ ) {
129+
for ( j=0; j<countbuff[i]; j++ ) {
130+
ranks[l++]=i;
131+
}
132+
}
133+
121134
ret = mca_sharedfp_individual_create_buff ( &timestampbuff, &offsetbuff, totalnodes, size);
122135
if ( OMPI_SUCCESS != ret ) {
123136
goto exit;
@@ -137,31 +150,41 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
137150
goto exit;
138151
}
139152

140-
ret = mca_sharedfp_individual_sort_timestamps(&timestampbuff, &offsetbuff,totalnodes);
153+
ret = mca_sharedfp_individual_sort_timestamps(&timestampbuff, &offsetbuff, &ranks, totalnodes);
141154
if ( OMPI_SUCCESS != ret ) {
142155
goto exit;
143156
}
144157

145158
sh->global_offset = mca_sharedfp_individual_assign_globaloffset ( &offsetbuff, totalnodes, sh);
146159

147-
buff = (char * ) malloc( ind_recordlength[0] * 1.2 );
160+
recordlength = ind_recordlength[0] * 1.2;
161+
buff = (char * ) malloc( recordlength );
148162
if ( NULL == buff ) {
149163
ret = OMPI_ERR_OUT_OF_RESOURCE;
150164
goto exit;
151165
}
152166

153167
for (i = 0; i < nodesoneachprocess ; i++) {
168+
if ( ind_recordlength[i] > recordlength ) {
169+
recordlength = ind_recordlength[i] * 1.2;
170+
buff = (char *) realloc ( buff, recordlength );
171+
if ( NULL == buff ) {
172+
ret = OMPI_ERR_OUT_OF_RESOURCE;
173+
goto exit;
174+
}
175+
}
176+
154177
/*Read from the local data file*/
155178
ompio_io_ompio_file_read_at ( headnode->datafilehandle,
156179
local_off[i], buff, ind_recordlength[i],
157180
MPI_BYTE, &status);
158181

159-
idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff,totalnodes);
182+
idx = mca_sharedfp_individual_getoffset(ind_ts[i],timestampbuff, ranks, rank, totalnodes);
160183

161184
if ( mca_sharedfp_individual_verbose ) {
162185
opal_output(ompi_sharedfp_base_framework.framework_output,
163-
"sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file \n",
164-
rank,ind_recordlength[i]);
186+
"sharedfp_individual_collaborate_data: Process %d writing %ld bytes to main file at position"
187+
"%lld (%d)\n", rank, ind_recordlength[i], offsetbuff[idx], idx);
165188
}
166189

167190
/*Write into main data file*/
@@ -196,6 +219,9 @@ int mca_sharedfp_individual_collaborate_data(struct mca_sharedfp_base_data_t *sh
196219
if ( NULL != buff ) {
197220
free ( buff );
198221
}
222+
if ( NULL != ranks ) {
223+
free ( ranks );
224+
}
199225

200226
return ret;
201227
}
@@ -323,15 +349,15 @@ int mca_sharedfp_individual_create_buff(double **ts,MPI_Offset **off,int totaln
323349
}
324350

325351
/*Sort the timestamp buffer*/
326-
int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int totalnodes)
352+
int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int **ranks, int totalnodes)
327353
{
328354

329355
int i = 0;
330356
int j = 0;
331357
int flag = 1;
332358
double tempts = 0.0;
333359
OMPI_MPI_OFFSET_TYPE tempoffset = 0;
334-
360+
int temprank = 0;
335361

336362
for (i= 1; (i <= totalnodes)&&(flag) ; i++) {
337363
flag = 0;
@@ -347,6 +373,11 @@ int mca_sharedfp_individual_sort_timestamps(double **ts, MPI_Offset **off, int
347373
*(*off + j) = *(*off + j + 1);
348374
*(*off + j + 1) = tempoffset;
349375

376+
/*swap ranks*/
377+
temprank = *(*ranks + j);
378+
*(*ranks + j) = *(*ranks + j + 1);
379+
*(*ranks + j + 1) = temprank;
380+
350381
flag = 1;
351382
}
352383
}
@@ -380,13 +411,14 @@ MPI_Offset mca_sharedfp_individual_assign_globaloffset(MPI_Offset **offsetbuff,
380411
}
381412

382413

383-
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int totalnodes)
414+
int mca_sharedfp_individual_getoffset(double timestamp, double *ts, int *ranks, int myrank, int totalnodes)
384415
{
385416
int i = 0;
386417
int notfound = 1;
387418

419+
388420
while (notfound) {
389-
if (ts[i] == timestamp)
421+
if (ts[i] == timestamp && ranks[i] == myrank )
390422
break;
391423

392424
i++;

0 commit comments

Comments
 (0)