4545Warning
4646-------
4747To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
48- Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not.
49-
50- > ⚠️ Also : this module can only be imported with **Python 3.11 or higher** !
48+ Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
5149"""
5250import os
5351import numpy as np
@@ -207,7 +205,7 @@ def initialize(self):
207205 if not self .ALLOW_OVERWRITE :
208206 assert not os .path .isfile (
209207 self .fileName
210- ), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
208+ ), f "file { self . fileName !r } already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
211209
212210 with open (self .fileName , "w+b" ) as f :
213211 self .hBase .tofile (f )
@@ -480,7 +478,7 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
480478
481479 Example
482480 -------
483- >>> # Suppose the FieldsIO object is already writen into outputs.pysdc
481+ >>> # Suppose the FieldsIO object is already written into outputs.pysdc
484482 >>> import os
485483 >>> from pySDC.utils.fieldsIO import Rectilinear
486484 >>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder
@@ -499,12 +497,13 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
499497 # MPI-parallel implementation
500498 # -------------------------------------------------------------------------
501499 comm : MPI .Intracomm = None
500+ _nCollectiveIO = None
502501
503502 @classmethod
504503 def setupMPI (cls , comm : MPI .Intracomm , iLoc , nLoc ):
505504 """
506505 Setup the MPI mode for the files IO, considering a decomposition
507- of the 1D grid into contiuous subintervals.
506+ of the 1D grid into contiguous subintervals.
508507
509508 Parameters
510509 ----------
@@ -519,6 +518,20 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
519518 cls .iLoc = iLoc
520519 cls .nLoc = nLoc
521520 cls .mpiFile : MPI .File = None
521+ cls ._nCollectiveIO = None
522+
523+ @property
524+ def nCollectiveIO (self ):
525+ """
526+ Number of collective IO operations over all processes, when reading or writing a field.
527+
528+ Returns:
529+ --------
530+ int: Number of collective IO accesses
531+ """
532+ if self ._nCollectiveIO is None :
533+ self ._nCollectiveIO = self .comm .allreduce (self .nVar * np .prod (self .nLoc [:- 1 ]), op = MPI .MAX )
534+ return self ._nCollectiveIO
522535
523536 @property
524537 def MPI_ON (self ):
@@ -546,7 +559,7 @@ def MPI_WRITE(self, data):
546559 """Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
547560 self .mpiFile .Write (data )
548561
549- def MPI_WRITE_AT (self , offset , data : np .ndarray ):
562+ def MPI_WRITE_AT_ALL (self , offset , data : np .ndarray ):
550563 """
551564 Write data in the binary file in MPI mode, with a given offset
552565 **relative to the beginning of the file**.
@@ -560,7 +573,7 @@ def MPI_WRITE_AT(self, offset, data: np.ndarray):
560573 """
561574 self .mpiFile .Write_at_all (offset , data )
562575
563- def MPI_READ_AT (self , offset , data ):
576+ def MPI_READ_AT_ALL (self , offset , data : np . ndarray ):
564577 """
565578 Read data from the binary file in MPI mode, with a given offset
566579 **relative to the beginning of the file**.
@@ -625,13 +638,22 @@ def addField(self, time, field):
625638
626639 offset0 = self .fileSize
627640 self .MPI_FILE_OPEN (mode = "a" )
641+ nWrites = 0
642+ nCollectiveIO = self .nCollectiveIO
643+
628644 if self .MPI_ROOT :
629645 self .MPI_WRITE (np .array (time , dtype = T_DTYPE ))
630646 offset0 += self .tSize
631647
632648 for (iVar , * iBeg ) in itertools .product (range (self .nVar ), * [range (n ) for n in self .nLoc [:- 1 ]]):
633649 offset = offset0 + self .iPos (iVar , iBeg ) * self .itemSize
634- self .MPI_WRITE_AT (offset , field [iVar , * iBeg ])
650+ self .MPI_WRITE_AT_ALL (offset , field [(iVar , * iBeg )])
651+ nWrites += 1
652+
653+ for _ in range (nCollectiveIO - nWrites ):
654+ # Additional collective write to catch up with other processes
655+ self .MPI_WRITE_AT_ALL (offset0 , field [:0 ])
656+
635657 self .MPI_FILE_CLOSE ()
636658
637659 def iPos (self , iVar , iX ):
@@ -674,9 +696,18 @@ def readField(self, idx):
674696 field = np .empty ((self .nVar , * self .nLoc ), dtype = self .dtype )
675697
676698 self .MPI_FILE_OPEN (mode = "r" )
699+ nReads = 0
700+ nCollectiveIO = self .nCollectiveIO
701+
677702 for (iVar , * iBeg ) in itertools .product (range (self .nVar ), * [range (n ) for n in self .nLoc [:- 1 ]]):
678703 offset = offset0 + self .iPos (iVar , iBeg ) * self .itemSize
679- self .MPI_READ_AT (offset , field [iVar , * iBeg ])
704+ self .MPI_READ_AT_ALL (offset , field [(iVar , * iBeg )])
705+ nReads += 1
706+
707+ for _ in range (nCollectiveIO - nReads ):
708+ # Additional collective read to catch up with other processes
709+ self .MPI_READ_AT_ALL (offset0 , field [:0 ])
710+
680711 self .MPI_FILE_CLOSE ()
681712
682713 return t , field
@@ -689,7 +720,7 @@ def initGrid(nVar, gridSizes):
689720 dim = len (gridSizes )
690721 coords = [np .linspace (0 , 1 , num = n , endpoint = False ) for n in gridSizes ]
691722 s = [None ] * dim
692- u0 = np .array (np .arange (nVar ) + 1 )[: , * s ]
723+ u0 = np .array (np .arange (nVar ) + 1 )[( slice ( None ) , * s ) ]
693724 for x in np .meshgrid (* coords , indexing = "ij" ):
694725 u0 = u0 * x
695726 return coords , u0
@@ -711,8 +742,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
711742 iLoc , nLoc = blocks .localBounds
712743 Rectilinear .setupMPI (comm , iLoc , nLoc )
713744 s = [slice (i , i + n ) for i , n in zip (iLoc , nLoc )]
714- u0 = u0 [:, * s ]
715- print (MPI_RANK , u0 .shape )
745+ u0 = u0 [(slice (None ), * s )]
716746
717747 f1 = Rectilinear (DTYPES [dtypeIdx ], fileName )
718748 f1 .setHeader (nVar = nVar , coords = coords )
@@ -731,6 +761,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
731761def compareFields_MPI (fileName , u0 , nSteps ):
732762 from pySDC .helpers .fieldsIO import FieldsIO
733763
764+ comm = MPI .COMM_WORLD
765+ MPI_RANK = comm .Get_rank ()
766+ if MPI_RANK == 0 :
767+ print ("Comparing fields with MPI" )
768+
734769 f2 = FieldsIO .fromFile (fileName )
735770
736771 times = np .arange (nSteps ) / nSteps
0 commit comments