Skip to content

Commit c99d637

Browse files
committed
TL: debug the full collective approach
1 parent f186407 commit c99d637

File tree

1 file changed

+36
-29
lines changed

1 file changed

+36
-29
lines changed

pySDC/helpers/fieldsIO.py

Lines changed: 36 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -492,7 +492,7 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
492492
# MPI-parallel implementation
493493
# -------------------------------------------------------------------------
494494
comm: MPI.Intracomm = None
495-
_num_collective_IO = None
495+
_nCollectiveIO = None
496496

497497
@classmethod
498498
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
@@ -513,22 +513,20 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
513513
cls.iLoc = iLoc
514514
cls.nLoc = nLoc
515515
cls.mpiFile = None
516-
cls._num_collective_IO = None
516+
cls._nCollectiveIO = None
517517

518518
@property
519-
def num_collective_IO(self):
519+
def nCollectiveIO(self):
520520
"""
521-
Number of collective IO operations.
522-
If the distribution is unbalanced, some tasks read/write more data than others, implying that some accesses
523-
cannot be collective, but need to be of the slower individual kind.
521+
Number of collective IO operations over all processes, when reading or writing a field.
524522
525523
Returns:
526524
--------
527525
int: Number of collective IO accesses
528526
"""
529-
if self._num_collective_IO is None:
530-
self._num_collective_IO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MIN)
531-
return self._num_collective_IO
527+
if self._nCollectiveIO is None:
528+
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
529+
return self._nCollectiveIO
532530

533531
@property
534532
def MPI_ON(self):
@@ -556,7 +554,7 @@ def MPI_WRITE(self, data):
556554
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
557555
self.mpiFile.Write(data)
558556

559-
def MPI_WRITE_AT(self, offset, data: np.ndarray, collective=True):
557+
def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
560558
"""
561559
Write data in the binary file in MPI mode, with a given offset
562560
**relative to the beginning of the file**.
@@ -567,15 +565,10 @@ def MPI_WRITE_AT(self, offset, data: np.ndarray, collective=True):
567565
Offset to write at, relative to the beginning of the file, in bytes.
568566
data : np.ndarray
569567
Data to be written in the binary file.
570-
collective : bool
571-
Use `MPI.Write_at_all` if true and `MPI.Write_at` if false
572568
"""
573-
if collective:
574-
self.mpiFile.Write_at_all(offset, data)
575-
else:
576-
self.mpiFile.Write_at(offset, data)
569+
self.mpiFile.Write_at_all(offset, data)
577570

578-
def MPI_READ_AT(self, offset, data, collective=True):
571+
def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
579572
"""
580573
Read data from the binary file in MPI mode, with a given offset
581574
**relative to the beginning of the file**.
@@ -586,13 +579,8 @@ def MPI_READ_AT(self, offset, data, collective=True):
586579
Offset to read at, relative to the beginning of the file, in bytes.
587580
data : np.ndarray
588581
Array on which to read the data from the binary file.
589-
collective : bool
590-
Use `MPI.Read_at_all` if true and `MPI.Read_at` if false
591582
"""
592-
if collective:
593-
self.mpiFile.Read_at_all(offset, data)
594-
else:
595-
self.mpiFile.Read_at(offset, data)
583+
self.mpiFile.Read_at_all(offset, data)
596584

597585
def MPI_FILE_CLOSE(self):
598586
"""Close the binary file in MPI mode"""
@@ -645,15 +633,22 @@ def addField(self, time, field):
645633

646634
offset0 = self.fileSize
647635
self.MPI_FILE_OPEN(mode="a")
636+
nWrites = 0
637+
nCollectiveIO = self.nCollectiveIO
638+
648639
if self.MPI_ROOT:
649640
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
650641
offset0 += self.tSize
651642

652-
_num_writes = 0
653643
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
654644
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
655-
self.MPI_WRITE_AT(offset, field[(iVar, *iBeg)], collective=_num_writes < self.num_collective_IO)
656-
_num_writes += 1
645+
self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
646+
nWrites += 1
647+
648+
for _ in range(nCollectiveIO - nWrites):
649+
# Additional collective write to catch up with other processes
650+
self.MPI_WRITE_AT_ALL(offset0, field[:0])
651+
657652
self.MPI_FILE_CLOSE()
658653

659654
def iPos(self, iVar, iX):
@@ -696,11 +691,18 @@ def readField(self, idx):
696691
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)
697692

698693
self.MPI_FILE_OPEN(mode="r")
699-
_num_reads = 0
694+
nReads = 0
695+
nCollectiveIO = self.nCollectiveIO
696+
700697
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
701698
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
702-
self.MPI_READ_AT(offset, field[(iVar, *iBeg)], collective=_num_reads < self.num_collective_IO)
703-
_num_reads += 1
699+
self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
700+
nReads += 1
701+
702+
for _ in range(nCollectiveIO - nReads):
703+
# Additional collective read to catch up with other processes
704+
self.MPI_READ_AT_ALL(offset0, field[:0])
705+
704706
self.MPI_FILE_CLOSE()
705707

706708
return t, field
@@ -754,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
754756
def compareFields_MPI(fileName, u0, nSteps):
755757
from pySDC.helpers.fieldsIO import FieldsIO
756758

759+
comm = MPI.COMM_WORLD
760+
MPI_RANK = comm.Get_rank()
761+
if MPI_RANK == 0:
762+
print("Comparing fields with MPI")
763+
757764
f2 = FieldsIO.fromFile(fileName)
758765

759766
times = np.arange(nSteps) / nSteps

0 commit comments

Comments
 (0)