Skip to content

Commit d6a57b8

Browse files
author
Alessandro Fanfarillo
committed
Partial patch for locks
1 parent 17699e5 commit d6a57b8

File tree

1 file changed

+56
-54
lines changed

1 file changed

+56
-54
lines changed

src/mpi/mpi_caf.c

Lines changed: 56 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -107,11 +107,12 @@ char err_buffer[MPI_MAX_ERROR_STRING];
107107
MPI_Comm CAF_COMM_WORLD;
108108

109109
/* Failed Images */
110-
/* MPI_Comm *communicators; */
110+
MPI_Comm lock_comm;
111+
MPI_Request lock_req;
111112
int used_comm = -1, n_failed_imgs=0, error_called=0;
112113
int *ranks_gc,*ranks_gf; //to be returned by failed images
113114
MPI_Errhandler errh,errh_w;
114-
int completed = 0;
115+
int completed = 0,tmp_lock;
115116

116117
static void verbose_win_errhandler(MPI_Win* win, int* err, ...) {
117118
/* printf("in win err handler\n"); */
@@ -251,11 +252,12 @@ caf_runtime_error (const char *message, ...)
251252
/* inline */ void locking_atomic_op(MPI_Win win, int *value, int newval,
252253
int compare, int image_index, int index)
253254
{
255+
int ret;
254256
# ifdef CAF_MPI_LOCK_UNLOCK
255257
MPI_Win_lock (MPI_LOCK_EXCLUSIVE, image_index-1, 0, win);
256258
# endif // CAF_MPI_LOCK_UNLOCK
257-
MPI_Compare_and_swap (&newval,&compare,value, MPI_INT,image_index-1,
258-
index*sizeof(int), win);
259+
ret = MPI_Compare_and_swap (&newval,&compare,value, MPI_INT,image_index-1,
260+
index*sizeof(int), win);
259261
# ifdef CAF_MPI_LOCK_UNLOCK
260262
MPI_Win_unlock (image_index-1, win);
261263
# else // CAF_MPI_LOCK_UNLOCK
@@ -268,11 +270,29 @@ void mutex_lock(MPI_Win win, int image_index, int index, int *stat,
268270
{
269271
const char msg[] = "Already locked";
270272
#if MPI_VERSION >= 3
271-
int value=1, compare = 0, newval = caf_this_image, i = 1,zero=0;
273+
int value=1, compare = 0, newval = caf_this_image, i = 1,zero=0,ret=0;
274+
int flag, it = 0, check_failure = 100;
272275

273276
if(stat != NULL)
274277
*stat = 0;
275278

279+
MPI_Test(&lock_req,&flag,MPI_STATUS_IGNORE);
280+
281+
/* if(error_called == 1) */
282+
/* { */
283+
/* /\* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); *\/ */
284+
/* communicator_shrink(&CAF_COMM_WORLD); */
285+
/* error_called = 0; */
286+
/* } */
287+
288+
if(error_called == 1)
289+
{
290+
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
291+
communicator_shrink(&lock_comm);
292+
communicator_shrink(&CAF_COMM_WORLD);
293+
error_called = 0;
294+
}
295+
276296
locking_atomic_op(win, &value, newval, compare, image_index, index);
277297

278298
if(value == caf_this_image && image_index == caf_this_image)
@@ -289,11 +309,26 @@ void mutex_lock(MPI_Win win, int image_index, int index, int *stat,
289309

290310
while(value != 0)
291311
{
312+
it++;
313+
314+
if(it == check_failure)
315+
{
316+
it = 1;
317+
MPI_Test(&lock_req,&flag,MPI_STATUS_IGNORE);
318+
}
319+
320+
if(error_called == 1)
321+
{
322+
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
323+
communicator_shrink(&lock_comm);
324+
communicator_shrink(&CAF_COMM_WORLD);
325+
error_called = 0;
326+
}
327+
292328
locking_atomic_op(win, &value, newval, compare, image_index, index);
293-
printf("n_failed_images: %d\n",n_failed_imgs);
329+
294330
for(i=0;i<n_failed_imgs;i++)
295-
{
296-
printf("value: %d\n",value);
331+
{
297332
if(ranks_gc[i] == value)
298333
{
299334
# ifdef CAF_MPI_LOCK_UNLOCK
@@ -308,8 +343,6 @@ void mutex_lock(MPI_Win win, int image_index, int index, int *stat,
308343
break;
309344
}
310345
}
311-
// usleep(caf_this_image*i);
312-
//i++;
313346
}
314347

315348
return;
@@ -441,18 +474,14 @@ PREFIX (init) (int *argc, char ***argv)
441474

442475
stat_tok = malloc (sizeof(MPI_Win));
443476

444-
/* communicators = (MPI_Comm *)calloc(caf_num_images,sizeof(MPI_Comm)); */
445-
446477
MPI_Comm_create_errhandler(verbose_comm_errhandler, &errh);
447478
MPI_Comm_set_errhandler(CAF_COMM_WORLD, errh);
479+
480+
MPI_Comm_dup(CAF_COMM_WORLD, &lock_comm);
481+
MPI_Comm_set_errhandler(lock_comm, errh);
482+
MPI_Irecv(&tmp_lock,1,MPI_INT,MPI_ANY_SOURCE,MPI_ANY_TAG,lock_comm,&lock_req);
448483

449484
MPI_Win_create_errhandler(verbose_win_errhandler, &errh_w);
450-
451-
/* for(i=0;i<caf_num_images;i++) */
452-
/* { */
453-
/* MPI_Comm_dup(CAF_COMM_WORLD,&communicators[i]); */
454-
/* MPI_Comm_set_errhandler(communicators[i], errh); */
455-
/* } */
456485

457486
ranks_gf = (int*)malloc(caf_num_images * sizeof(int));
458487
ranks_gc = (int*)malloc(caf_num_images * sizeof(int));
@@ -544,7 +573,7 @@ PREFIX (num_images)(int distance __attribute__ ((unused)),
544573
return caf_num_images;
545574
}
546575

547-
static int communicator_shrink()
576+
int communicator_shrink(MPI_Comm *comm)
548577
{
549578
int ns,srank,crank,rc,flag,i,drank,nc,nd;
550579
MPI_Comm shrunk, *newcomm;
@@ -553,40 +582,11 @@ static int communicator_shrink()
553582
redo:
554583
newcomm = (MPI_Comm *)calloc(1,sizeof(MPI_Comm));
555584

556-
MPIX_Comm_shrink(CAF_COMM_WORLD, &shrunk);
585+
MPIX_Comm_shrink(*comm, &shrunk);
557586
MPI_Comm_set_errhandler( shrunk, errh );
558587
MPI_Comm_size(shrunk, &ns); MPI_Comm_rank(shrunk, &srank);
559588

560-
MPI_Comm_rank(CAF_COMM_WORLD, &crank);
561-
562-
/* if(MPI_COMM_NULL != CAF_COMM_WORLD) { /\* I was not a spare before... *\/ */
563-
/* /\* not enough processes to continue, aborting. *\/ */
564-
/* MPI_Comm_size(CAF_COMM_WORLD, &nc); */
565-
/* if( nc > ns ) MPI_Abort(CAF_COMM_WORLD, MPI_ERR_PROC_FAILED); */
566-
567-
/* /\* remembering the former rank: we will reassign the same */
568-
/* * ranks in the new world. *\/ */
569-
/* MPI_Comm_rank(CAF_COMM_WORLD, &crank); */
570-
571-
/* /\* the rank 0 in the shrinked comm is going to determine the */
572-
/* * ranks at which the spares need to be inserted. *\/ */
573-
/* if(0 == srank) { */
574-
/* /\* getting the group of dead processes: */
575-
/* * those in comm, but not in shrinked are the deads *\/ */
576-
/* MPI_Comm_group(CAF_COMM_WORLD, &cgrp); MPI_Comm_group(shrunk, &sgrp); */
577-
/* MPI_Group_difference(cgrp, sgrp, &dgrp); MPI_Group_size(dgrp, &nd); */
578-
/* /\* Computing the rank assignment for the newly inserted spares *\/ */
579-
/* for(i=0; i<ns-(nc-nd); i++) { */
580-
/* if( i < nd ) MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank); */
581-
/* else drank=-1; /\* still a spare *\/ */
582-
/* /\* sending their new assignment to all spares *\/ */
583-
/* MPI_Send(&drank, 1, MPI_INT, i+nc-nd, 1, shrunk); */
584-
/* } */
585-
/* MPI_Group_free(&cgrp); MPI_Group_free(&sgrp); MPI_Group_free(&dgrp); */
586-
/* } */
587-
/* else { /\* I was a spare, waiting for my new assignment *\/ */
588-
/* MPI_Recv(&crank, 1, MPI_INT, 0, 1, shrunk, MPI_STATUS_IGNORE); */
589-
/* } */
589+
MPI_Comm_rank(*comm, &crank);
590590

591591
/* Split does the magic: removing spare processes and reordering ranks
592592
* so that all surviving processes remain at their former place */
@@ -598,10 +598,10 @@ static int communicator_shrink()
598598
flag = MPIX_Comm_agree(shrunk, &flag);
599599
MPI_Comm_free(&shrunk);
600600
if( MPI_SUCCESS != flag ) {
601-
if( MPI_SUCCESS == rc ) MPI_Comm_free(newcomm);
601+
if( MPI_SUCCESS == rc ) MPI_Comm_free(*newcomm);
602602
goto redo;
603603
}
604-
CAF_COMM_WORLD = *newcomm;
604+
*comm = *newcomm;
605605
return MPI_SUCCESS;
606606
}
607607

@@ -650,7 +650,8 @@ void *
650650
if(error_called == 1)
651651
{
652652
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
653-
communicator_shrink();
653+
communicator_shrink(&CAF_COMM_WORLD);
654+
communicator_shrink(&lock_comm);
654655
error_called = 0;
655656
}
656657

@@ -818,7 +819,8 @@ PREFIX (sync_all) (int *stat, char *errmsg, int errmsg_len)
818819
if(error_called == 1)
819820
{
820821
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
821-
communicator_shrink();
822+
communicator_shrink(&CAF_COMM_WORLD);
823+
communicator_shrink(&lock_comm);
822824
error_called = 0;
823825
}
824826

0 commit comments

Comments
 (0)