Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 50 additions & 15 deletions pySDC/helpers/fieldsIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,7 @@
Warning
-------
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not.

> ⚠️ Also : this module can only be imported with **Python 3.11 or higher** !
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
"""
import os
import numpy as np
Expand Down Expand Up @@ -202,7 +200,7 @@ def initialize(self):
if not self.ALLOW_OVERWRITE:
assert not os.path.isfile(
self.fileName
), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
), f"file {self.fileName!r} already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"

with open(self.fileName, "w+b") as f:
self.hBase.tofile(f)
Expand Down Expand Up @@ -475,7 +473,7 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):

Example
-------
>>> # Suppose the FieldsIO object is already writen into outputs.pysdc
>>> # Suppose the FieldsIO object is already written into outputs.pysdc
>>> import os
>>> from pySDC.utils.fieldsIO import Rectilinear
>>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder
Expand All @@ -494,12 +492,13 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
# MPI-parallel implementation
# -------------------------------------------------------------------------
comm: MPI.Intracomm = None
_nCollectiveIO = None

@classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
"""
Setup the MPI mode for the files IO, considering a decomposition
of the 1D grid into contiuous subintervals.
of the 1D grid into contiguous subintervals.

Parameters
----------
Expand All @@ -514,6 +513,20 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
cls.iLoc = iLoc
cls.nLoc = nLoc
cls.mpiFile = None
cls._nCollectiveIO = None

@property
def nCollectiveIO(self):
"""
Number of collective IO operations over all processes, when reading or writing a field.

Returns:
--------
int: Number of collective IO accesses
"""
if self._nCollectiveIO is None:
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
return self._nCollectiveIO

@property
def MPI_ON(self):
Expand Down Expand Up @@ -541,7 +554,7 @@ def MPI_WRITE(self, data):
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
self.mpiFile.Write(data)

def MPI_WRITE_AT(self, offset, data: np.ndarray):
def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
"""
Write data in the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Expand All @@ -553,9 +566,9 @@ def MPI_WRITE_AT(self, offset, data: np.ndarray):
data : np.ndarray
Data to be written in the binary file.
"""
self.mpiFile.Write_at(offset, data)
self.mpiFile.Write_at_all(offset, data)

def MPI_READ_AT(self, offset, data):
def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
"""
Read data from the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Expand All @@ -567,7 +580,7 @@ def MPI_READ_AT(self, offset, data):
data : np.ndarray
Array on which to read the data from the binary file.
"""
self.mpiFile.Read_at(offset, data)
self.mpiFile.Read_at_all(offset, data)

def MPI_FILE_CLOSE(self):
"""Close the binary file in MPI mode"""
Expand Down Expand Up @@ -620,13 +633,22 @@ def addField(self, time, field):

offset0 = self.fileSize
self.MPI_FILE_OPEN(mode="a")
nWrites = 0
nCollectiveIO = self.nCollectiveIO

if self.MPI_ROOT:
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize

for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
Copy link
Member

@tlunet tlunet Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not exactly what I suggested ... all IO operations must be collective with _all. It's just that we have to ensure that all processes are calling exactly the same number of time MPI_WRITE_AT or MPI_READ_AT. It means that you have to add after the first for loop :

for _ in range(self.num_collective_IO - _num_writes):
    self.MPI_WRITE_AT(0, field[:0]) # should produce a no-op

That should avoid the deadlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that better? I expect little performance hit by non-collective IO here because there are few non-collective operations relative to total operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still counts ... and you can have a majority of non-collective operation with some decomposition. For instance, a 2D problem of size [10,32] decomposed in the first direction in 6 sub-domains => 4 processes have 2 points, 2 have 1 point. Because you need to write contiguous chunk of data in the file, for the first loop all are doing collective write for the first points, but for the second points 4 processes are doing a non-collective operation.

It's just better to always have all process doing collective write, with those who don't write anything doing a no-op, and the MPI implementation is hopefully well done enough to avoid losing anything on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I found some inconsistent behaviour when reading and writing nothing on some tasks on my laptop. Namely, if I print something, it goes through, but if I don't, it deadlocks. I don't understand this behaviour. I suggest we leave this with some non-collective IO operations and if you want, you can fix it when you return from vacation.

Copy link
Member

@tlunet tlunet Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found where the problem was : self.num_collective_IO must be called at the beginning of readField or addField. If instead of that, the property is evaluated on the second for loop, it calls a allreduce but then the process having the maximum number of collective IO are still launching a write_at_all or a read_at_all, which creates the deadlock.

Now, the allreduce is done before any other collective operation, the tests did pass on my laptop. Let's see what happen with the github tests ...

offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_WRITE_AT(offset, field[iVar, *iBeg])
self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
nWrites += 1

for _ in range(nCollectiveIO - nWrites):
# Additional collective write to catch up with other processes
self.MPI_WRITE_AT_ALL(offset0, field[:0])

self.MPI_FILE_CLOSE()

def iPos(self, iVar, iX):
Expand Down Expand Up @@ -669,9 +691,18 @@ def readField(self, idx):
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)

self.MPI_FILE_OPEN(mode="r")
nReads = 0
nCollectiveIO = self.nCollectiveIO

for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_READ_AT(offset, field[iVar, *iBeg])
self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
nReads += 1

for _ in range(nCollectiveIO - nReads):
# Additional collective read to catch up with other processes
self.MPI_READ_AT_ALL(offset0, field[:0])

self.MPI_FILE_CLOSE()

return t, field
Expand All @@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes):
dim = len(gridSizes)
coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes]
s = [None] * dim
u0 = np.array(np.arange(nVar) + 1)[:, *s]
u0 = np.array(np.arange(nVar) + 1)[(slice(None), *s)]
for x in np.meshgrid(*coords, indexing="ij"):
u0 = u0 * x
return coords, u0
Expand All @@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
iLoc, nLoc = blocks.localBounds
Rectilinear.setupMPI(comm, iLoc, nLoc)
s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)]
u0 = u0[:, *s]
print(MPI_RANK, u0.shape)
u0 = u0[(slice(None), *s)]

f1 = Rectilinear(DTYPES[dtypeIdx], fileName)
f1.setHeader(nVar=nVar, coords=coords)
Expand All @@ -726,6 +756,11 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
def compareFields_MPI(fileName, u0, nSteps):
from pySDC.helpers.fieldsIO import FieldsIO

comm = MPI.COMM_WORLD
MPI_RANK = comm.Get_rank()
if MPI_RANK == 0:
print("Comparing fields with MPI")

f2 = FieldsIO.fromFile(fileName)

times = np.arange(nSteps) / nSteps
Expand Down
14 changes: 7 additions & 7 deletions pySDC/tests/test_helpers/test_fieldsIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import glob
import pytest

if sys.version_info < (3, 11):
pytest.skip("skipping fieldsIO tests on python lower than 3.11", allow_module_level=True)

import itertools
import numpy as np

Expand All @@ -14,6 +11,7 @@
FieldsIO.ALLOW_OVERWRITE = True


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("dim", range(4))
def testHeader(dim, dtypeIdx):
Expand Down Expand Up @@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx):
assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}"


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 10, 100])
@pytest.mark.parametrize("nVar", [1, 2, 5])
Expand Down Expand Up @@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 5, 10])
@pytest.mark.parametrize("nVar", [1, 2, 5])
Expand Down Expand Up @@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"


@pytest.mark.base
@pytest.mark.parametrize("nSteps", [1, 10])
@pytest.mark.parametrize("nZ", [1, 5, 16])
@pytest.mark.parametrize("nY", [1, 5, 16])
Expand Down Expand Up @@ -249,8 +250,7 @@ def testRectilinear_MPI(dim, nProcs, dtypeIdx, algo, nSteps, nVar):
parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions")
args = parser.parse_args()

if sys.version_info >= (3, 11):
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI

u0 = writeFields_MPI(**args.__dict__)
compareFields_MPI(args.fileName, u0, args.nSteps)
u0 = writeFields_MPI(**args.__dict__)
compareFields_MPI(args.fileName, u0, args.nSteps)