Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 43 additions & 13 deletions pySDC/helpers/fieldsIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
Warning
-------
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, wether the code is run in parallel or not.
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.

> ⚠️ Also : this module can only be imported with **Python 3.11 or higher** !
"""
Expand All @@ -54,6 +54,7 @@
from typing import Type, TypeVar
import logging
import itertools
import warnings

T = TypeVar("T")

Expand Down Expand Up @@ -202,7 +203,7 @@ def initialize(self):
if not self.ALLOW_OVERWRITE:
assert not os.path.isfile(
self.fileName
), "file already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"
), f"file {self.fileName!r} already exists, use FieldsIO.ALLOW_OVERWRITE = True to allow overwriting"

with open(self.fileName, "w+b") as f:
self.hBase.tofile(f)
Expand Down Expand Up @@ -475,7 +476,7 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):

Example
-------
>>> # Suppose the FieldsIO object is already writen into outputs.pysdc
>>> # Suppose the FieldsIO object is already written into outputs.pysdc
>>> import os
>>> from pySDC.utils.fieldsIO import Rectilinear
>>> os.makedirs("vtrFiles") # to store all VTR files into a subfolder
Expand All @@ -494,12 +495,13 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
# MPI-parallel implementation
# -------------------------------------------------------------------------
comm: MPI.Intracomm = None
_num_collective_IO = None

@classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
"""
Setup the MPI mode for the files IO, considering a decomposition
of the 1D grid into contiuous subintervals.
of the 1D grid into contiguous subintervals.

Parameters
----------
Expand All @@ -515,6 +517,21 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
cls.nLoc = nLoc
cls.mpiFile = None

@property
def num_collective_IO(self):
"""
Number of collective IO operations.
If the distribution is unbalanced, some tasks read/write more data than others, implying that some accesses
cannot be collective, but need to be of the slower individual kind.

Returns:
--------
int: Number of collective IO accesses
"""
if self._num_collective_IO is None:
self._num_collective_IO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MIN)
return self._num_collective_IO

@property
def MPI_ON(self):
"""Wether or not MPI is activated"""
Expand All @@ -541,7 +558,7 @@ def MPI_WRITE(self, data):
"""Write data (np.ndarray) in the binary file in MPI mode, at the current file cursor position."""
self.mpiFile.Write(data)

def MPI_WRITE_AT(self, offset, data: np.ndarray):
def MPI_WRITE_AT(self, offset, data: np.ndarray, collective=True):
"""
Write data in the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Expand All @@ -552,10 +569,15 @@ def MPI_WRITE_AT(self, offset, data: np.ndarray):
Offset to write at, relative to the beginning of the file, in bytes.
data : np.ndarray
Data to be written in the binary file.
collective : bool
Use `MPI.Write_at_all` if true and `MPI.Write_at` if false
"""
self.mpiFile.Write_at(offset, data)
if collective:
self.mpiFile.Write_at_all(offset, data)
else:
self.mpiFile.Write_at(offset, data)

def MPI_READ_AT(self, offset, data):
def MPI_READ_AT(self, offset, data, collective=True):
"""
Read data from the binary file in MPI mode, with a given offset
**relative to the beginning of the file**.
Expand All @@ -566,8 +588,13 @@ def MPI_READ_AT(self, offset, data):
Offset to read at, relative to the beginning of the file, in bytes.
data : np.ndarray
Array on which to read the data from the binary file.
collective : bool
Use `MPI.Read_at_all` if true and `MPI.Read_at` if false
"""
self.mpiFile.Read_at(offset, data)
if collective:
self.mpiFile.Read_at_all(offset, data)
else:
self.mpiFile.Read_at(offset, data)

def MPI_FILE_CLOSE(self):
"""Close the binary file in MPI mode"""
Expand Down Expand Up @@ -624,9 +651,11 @@ def addField(self, time, field):
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize

_num_writes = 0
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
Copy link
Member

@tlunet tlunet Mar 31, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not exactly what I suggested ... all IO operations must be collective with _all. It's just that we have to ensure that all processes are calling exactly the same number of time MPI_WRITE_AT or MPI_READ_AT. It means that you have to add after the first for loop :

for _ in range(self.num_collective_IO - _num_writes):
    self.MPI_WRITE_AT(0, field[:0]) # should produce a no-op

That should avoid the deadlock

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is that better? I expect little performance hit by non-collective IO here because there are few non-collective operations relative to total operations.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still counts ... and you can have a majority of non-collective operation with some decomposition. For instance, a 2D problem of size [10,32] decomposed in the first direction in 6 sub-domains => 4 processes have 2 points, 2 have 1 point. Because you need to write contiguous chunk of data in the file, for the first loop all are doing collective write for the first points, but for the second points 4 processes are doing a non-collective operation.

It's just better to always have all process doing collective write, with those who don't write anything doing a no-op, and the MPI implementation is hopefully well done enough to avoid losing anything on that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I found some inconsistent behaviour when reading and writing nothing on some tasks on my laptop. Namely, if I print something, it goes through, but if I don't, it deadlocks. I don't understand this behaviour. I suggest we leave this with some non-collective IO operations and if you want, you can fix it when you return from vacation.

Copy link
Member

@tlunet tlunet Apr 1, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found where the problem was : self.num_collective_IO must be called at the beginning of readField or addField. If instead of that, the property is evaluated on the second for loop, it calls a allreduce but then the process having the maximum number of collective IO are still launching a write_at_all or a read_at_all, which creates the deadlock.

Now, the allreduce is done before any other collective operation, the tests did pass on my laptop. Let's see what happen with the github tests ...

offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_WRITE_AT(offset, field[iVar, *iBeg])
self.MPI_WRITE_AT(offset, field[(iVar, *iBeg)], collective=_num_writes < self.num_collective_IO)
_num_writes += 1
self.MPI_FILE_CLOSE()

def iPos(self, iVar, iX):
Expand Down Expand Up @@ -669,9 +698,11 @@ def readField(self, idx):
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)

self.MPI_FILE_OPEN(mode="r")
_num_reads = 0
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_READ_AT(offset, field[iVar, *iBeg])
self.MPI_READ_AT(offset, field[(iVar, *iBeg)], collective=_num_reads < self.num_collective_IO)
_num_reads += 1
self.MPI_FILE_CLOSE()

return t, field
Expand All @@ -684,7 +715,7 @@ def initGrid(nVar, gridSizes):
dim = len(gridSizes)
coords = [np.linspace(0, 1, num=n, endpoint=False) for n in gridSizes]
s = [None] * dim
u0 = np.array(np.arange(nVar) + 1)[:, *s]
u0 = np.array(np.arange(nVar) + 1)[(slice(None), *s)]
for x in np.meshgrid(*coords, indexing="ij"):
u0 = u0 * x
return coords, u0
Expand All @@ -706,8 +737,7 @@ def writeFields_MPI(fileName, dtypeIdx, algo, nSteps, nVar, gridSizes):
iLoc, nLoc = blocks.localBounds
Rectilinear.setupMPI(comm, iLoc, nLoc)
s = [slice(i, i + n) for i, n in zip(iLoc, nLoc)]
u0 = u0[:, *s]
print(MPI_RANK, u0.shape)
u0 = u0[(slice(None), *s)]

f1 = Rectilinear(DTYPES[dtypeIdx], fileName)
f1.setHeader(nVar=nVar, coords=coords)
Expand Down
14 changes: 7 additions & 7 deletions pySDC/tests/test_helpers/test_fieldsIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,6 @@
import glob
import pytest

if sys.version_info < (3, 11):
pytest.skip("skipping fieldsIO tests on python lower than 3.11", allow_module_level=True)

import itertools
import numpy as np

Expand All @@ -14,6 +11,7 @@
FieldsIO.ALLOW_OVERWRITE = True


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("dim", range(4))
def testHeader(dim, dtypeIdx):
Expand Down Expand Up @@ -65,6 +63,7 @@ def testHeader(dim, dtypeIdx):
assert np.allclose(val, f2.header[key]), f"header's discrepancy for {key} in written {f2}"


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 10, 100])
@pytest.mark.parametrize("nVar", [1, 2, 5])
Expand Down Expand Up @@ -106,6 +105,7 @@ def testScalar(nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"


@pytest.mark.base
@pytest.mark.parametrize("dtypeIdx", DTYPES.keys())
@pytest.mark.parametrize("nSteps", [1, 2, 5, 10])
@pytest.mark.parametrize("nVar", [1, 2, 5])
Expand Down Expand Up @@ -155,6 +155,7 @@ def testRectilinear(dim, nVar, nSteps, dtypeIdx):
assert np.allclose(u2, u1), f"{idx}'s fields in {f1} has incorrect values"


@pytest.mark.base
@pytest.mark.parametrize("nSteps", [1, 10])
@pytest.mark.parametrize("nZ", [1, 5, 16])
@pytest.mark.parametrize("nY", [1, 5, 16])
Expand Down Expand Up @@ -249,8 +250,7 @@ def testRectilinear_MPI(dim, nProcs, dtypeIdx, algo, nSteps, nVar):
parser.add_argument('--gridSizes', type=int, nargs='+', help="number of grid points in each dimensions")
args = parser.parse_args()

if sys.version_info >= (3, 11):
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI
from pySDC.helpers.fieldsIO import writeFields_MPI, compareFields_MPI

u0 = writeFields_MPI(**args.__dict__)
compareFields_MPI(args.fileName, u0, args.nSteps)
u0 = writeFields_MPI(**args.__dict__)
compareFields_MPI(args.fileName, u0, args.nSteps)