Skip to content

Commit d6640d9

Browse files
author
Alessandro Fanfarillo
committed
Shrinking communicator
1 parent 8cdc90e commit d6640d9

File tree

1 file changed

+91
-15
lines changed

1 file changed

+91
-15
lines changed

src/mpi/mpi_caf.c

Lines changed: 91 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -107,10 +107,11 @@ char err_buffer[MPI_MAX_ERROR_STRING];
107107
MPI_Comm CAF_COMM_WORLD;
108108

109109
/* Failed Images */
110-
MPI_Comm *communicators;
111-
int used_comm = -1, n_failed_imgs=0;
110+
/* MPI_Comm *communicators; */
111+
int used_comm = -1, n_failed_imgs=0, error_called=0;
112112
int *ranks_gc,*ranks_gf; //to be returned by failed images
113113
MPI_Errhandler errh,errh_w;
114+
int completed = 0;
114115

115116
static void verbose_win_errhandler(MPI_Win* win, int* err, ...) {
116117
/* printf("in win err handler\n"); */
@@ -142,9 +143,9 @@ static void verbose_comm_errhandler(MPI_Comm* pcomm, int* err, ...){
142143
}
143144

144145
n_failed_imgs = nf;
145-
146-
used_comm++;
147-
CAF_COMM_WORLD = communicators[used_comm];
146+
error_called = 1;
147+
/* used_comm++; */
148+
/* CAF_COMM_WORLD = communicators[used_comm]; */
148149
}
149150

150151
/* For MPI interoperability, allow external initialization
@@ -422,18 +423,18 @@ PREFIX (init) (int *argc, char ***argv)
422423

423424
stat_tok = malloc (sizeof(MPI_Win));
424425

425-
communicators = (MPI_Comm *)calloc(caf_num_images,sizeof(MPI_Comm));
426+
/* communicators = (MPI_Comm *)calloc(caf_num_images,sizeof(MPI_Comm)); */
426427

427428
MPI_Comm_create_errhandler(verbose_comm_errhandler, &errh);
428429
MPI_Comm_set_errhandler(CAF_COMM_WORLD, errh);
429430

430431
MPI_Win_create_errhandler(verbose_win_errhandler, &errh_w);
431432

432-
for(i=0;i<caf_num_images;i++)
433-
{
434-
MPI_Comm_dup(CAF_COMM_WORLD,&communicators[i]);
435-
MPI_Comm_set_errhandler(communicators[i], errh);
436-
}
433+
/* for(i=0;i<caf_num_images;i++) */
434+
/* { */
435+
/* MPI_Comm_dup(CAF_COMM_WORLD,&communicators[i]); */
436+
/* MPI_Comm_set_errhandler(communicators[i], errh); */
437+
/* } */
437438

438439
ranks_gf = (int*)malloc(caf_num_images * sizeof(int));
439440
ranks_gc = (int*)malloc(caf_num_images * sizeof(int));
@@ -466,6 +467,7 @@ _gfortran_caf_finalize (void)
466467
PREFIX (finalize) (void)
467468
#endif
468469
{
470+
completed = 1;
469471
*img_status = STAT_STOPPED_IMAGE; /* GFC_STAT_STOPPED_IMAGE = 6000 */
470472
MPI_Win_sync(*stat_tok);
471473

@@ -524,6 +526,67 @@ PREFIX (num_images)(int distance __attribute__ ((unused)),
524526
return caf_num_images;
525527
}
526528

529+
static int communicator_shrink()
530+
{
531+
int ns,srank,crank,rc,flag,i,drank,nc,nd;
532+
MPI_Comm shrunk, *newcomm;
533+
MPI_Group cgrp, sgrp, dgrp;
534+
535+
redo:
536+
newcomm = (MPI_Comm *)calloc(1,sizeof(MPI_Comm));
537+
538+
MPIX_Comm_shrink(CAF_COMM_WORLD, &shrunk);
539+
MPI_Comm_set_errhandler( shrunk, errh );
540+
MPI_Comm_size(shrunk, &ns); MPI_Comm_rank(shrunk, &srank);
541+
542+
MPI_Comm_rank(CAF_COMM_WORLD, &crank);
543+
544+
/* if(MPI_COMM_NULL != CAF_COMM_WORLD) { /\* I was not a spare before... *\/ */
545+
/* /\* not enough processes to continue, aborting. *\/ */
546+
/* MPI_Comm_size(CAF_COMM_WORLD, &nc); */
547+
/* if( nc > ns ) MPI_Abort(CAF_COMM_WORLD, MPI_ERR_PROC_FAILED); */
548+
549+
/* /\* remembering the former rank: we will reassign the same */
550+
/* * ranks in the new world. *\/ */
551+
/* MPI_Comm_rank(CAF_COMM_WORLD, &crank); */
552+
553+
/* /\* the rank 0 in the shrinked comm is going to determine the */
554+
/* * ranks at which the spares need to be inserted. *\/ */
555+
/* if(0 == srank) { */
556+
/* /\* getting the group of dead processes: */
557+
/* * those in comm, but not in shrinked are the deads *\/ */
558+
/* MPI_Comm_group(CAF_COMM_WORLD, &cgrp); MPI_Comm_group(shrunk, &sgrp); */
559+
/* MPI_Group_difference(cgrp, sgrp, &dgrp); MPI_Group_size(dgrp, &nd); */
560+
/* /\* Computing the rank assignment for the newly inserted spares *\/ */
561+
/* for(i=0; i<ns-(nc-nd); i++) { */
562+
/* if( i < nd ) MPI_Group_translate_ranks(dgrp, 1, &i, cgrp, &drank); */
563+
/* else drank=-1; /\* still a spare *\/ */
564+
/* /\* sending their new assignment to all spares *\/ */
565+
/* MPI_Send(&drank, 1, MPI_INT, i+nc-nd, 1, shrunk); */
566+
/* } */
567+
/* MPI_Group_free(&cgrp); MPI_Group_free(&sgrp); MPI_Group_free(&dgrp); */
568+
/* } */
569+
/* } else { /\* I was a spare, waiting for my new assignment *\/ */
570+
/* MPI_Recv(&crank, 1, MPI_INT, 0, 1, shrunk, MPI_STATUS_IGNORE); */
571+
/* } */
572+
573+
/* Split does the magic: removing spare processes and reordering ranks
574+
* so that all surviving processes remain at their former place */
575+
rc = MPI_Comm_split(shrunk, crank<0?MPI_UNDEFINED:1, crank, newcomm);
576+
577+
/* Split or some of the communications above may have failed if
578+
* new failures have disrupted the process: we need to
579+
* make sure we succeeded at all ranks, or retry until it works. */
580+
flag = MPIX_Comm_agree(shrunk, &flag);
581+
MPI_Comm_free(&shrunk);
582+
if( MPI_SUCCESS != flag ) {
583+
if( MPI_SUCCESS == rc ) MPI_Comm_free(newcomm);
584+
goto redo;
585+
}
586+
CAF_COMM_WORLD = *newcomm;
587+
return MPI_SUCCESS;
588+
}
589+
527590

528591
#ifdef COMPILER_SUPPORTS_CAF_INTRINSICS
529592
void *
@@ -567,6 +630,13 @@ void *
567630
else
568631
actual_size = size;
569632

633+
if(error_called == 1)
634+
{
635+
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
636+
communicator_shrink();
637+
error_called = 0;
638+
}
639+
570640
#if MPI_VERSION >= 3
571641
MPI_Win_allocate(actual_size, 1, mpi_info_same_size, CAF_COMM_WORLD, &mem, *token);
572642
# ifndef CAF_MPI_LOCK_UNLOCK
@@ -728,6 +798,13 @@ PREFIX (sync_all) (int *stat, char *errmsg, int errmsg_len)
728798
{
729799
int ierr=0;
730800

801+
if(error_called == 1)
802+
{
803+
/* MPIX_Comm_agree( CAF_COMM_WORLD, &completed ); */
804+
communicator_shrink();
805+
error_called = 0;
806+
}
807+
731808
if (unlikely (caf_is_finalized))
732809
ierr = STAT_STOPPED_IMAGE;
733810
else
@@ -2626,14 +2703,13 @@ PREFIX (image_status) (int image)
26262703
return res;
26272704
}
26282705

2629-
void *
2706+
int *
26302707
PREFIX (failed_images) (int *num_failed_images, int team __attribute__ ((unused)),
26312708
int kind __attribute__ ((unused)))
26322709
{
2633-
void *mem;
2634-
printf("Failed images:%d\n",n_failed_imgs);
2710+
int *mem;
26352711
*num_failed_images = n_failed_imgs;
2636-
mem = calloc(n_failed_imgs,sizeof(int));
2712+
mem = (int *)calloc(n_failed_imgs,sizeof(int));
26372713
memcpy(mem,ranks_gc,n_failed_imgs*sizeof(int));
26382714
return mem;
26392715
}

0 commit comments

Comments
 (0)