Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 35 additions & 56 deletions pySDC/helpers/fieldsIO.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,13 @@

Warning
-------
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.setupMPI` (cf their docstring).
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
"""
import os
import numpy as np
from typing import Type, TypeVar
import logging
import itertools

T = TypeVar("T")

Expand All @@ -61,11 +60,17 @@
except ImportError:
pass
from mpi4py import MPI
from mpi4py.util.dtlib import from_numpy_dtype as MPI_DTYPE
except ImportError:

class MPI:
COMM_WORLD = None
Intracomm = T
File = T
Datatype = T

def MPI_DTYPE():
pass


# Supported data types
Expand Down Expand Up @@ -412,6 +417,8 @@ def setHeader(self, nVar, coords):
coords = self.setupCoords(*coords)
self.header = {"nVar": int(nVar), "coords": coords}
self.nItems = nVar * self.nDoF
if self.MPI_ON:
self.MPI_SETUP()

@property
def hInfos(self):
Expand All @@ -433,6 +440,8 @@ def readHeader(self, f):
gridSizes = np.fromfile(f, dtype=np.int32, count=dim)
coords = [np.fromfile(f, dtype=np.float64, count=n) for n in gridSizes]
self.setHeader(nVar, coords)
if self.MPI_ON:
self.MPI_SETUP()

def reshape(self, fields: np.ndarray):
"""Reshape the fields to a N-d array (inplace operation)"""
Expand Down Expand Up @@ -493,7 +502,6 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
# MPI-parallel implementation
# -------------------------------------------------------------------------
comm: MPI.Intracomm = None
_nCollectiveIO = None

@classmethod
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
Expand All @@ -513,21 +521,9 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
cls.comm = comm
cls.iLoc = iLoc
cls.nLoc = nLoc
cls.mpiFile = None
cls._nCollectiveIO = None

@property
def nCollectiveIO(self):
"""
Number of collective IO operations over all processes, when reading or writing a field.

Returns:
--------
int: Number of collective IO accesses
"""
if self._nCollectiveIO is None:
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
return self._nCollectiveIO
cls.mpiFile: MPI.File = None
cls.mpiType: MPI.Datatype = None
cls.mpiFileType: MPI.Datatype = None

@property
def MPI_ON(self):
Expand All @@ -543,6 +539,16 @@ def MPI_ROOT(self):
return True
return self.comm.Get_rank() == 0

def MPI_SETUP(self):
"""Setup subarray masks for each processes"""
self.mpiType = MPI_DTYPE(self.dtype)
self.mpiFileType = self.mpiType.Create_subarray(
[self.nVar, *self.gridSizes], # Global array sizes
[self.nVar, *self.nLoc], # Local array sizes
[0, *self.iLoc], # Global starting indices of local blocks
)
self.mpiFileType.Commit()

def MPI_FILE_OPEN(self, mode):
"""Open the binary file in MPI mode"""
amode = {
Expand All @@ -567,7 +573,8 @@ def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
data : np.ndarray
Data to be written in the binary file.
"""
self.mpiFile.Write_at_all(offset, data)
self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType)
self.mpiFile.Write_all(data)

def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
"""
Expand All @@ -581,7 +588,8 @@ def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
data : np.ndarray
Array on which to read the data from the binary file.
"""
self.mpiFile.Read_at_all(offset, data)
self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType)
self.mpiFile.Read_all(data)

def MPI_FILE_CLOSE(self):
"""Close the binary file in MPI mode"""
Expand Down Expand Up @@ -632,33 +640,15 @@ def addField(self, time, field):
*self.nLoc,
), f"expected {(self.nVar, *self.nLoc)} shape, got {field.shape}"

offset0 = self.fileSize
offset = self.fileSize
self.MPI_FILE_OPEN(mode="a")
nWrites = 0
nCollectiveIO = self.nCollectiveIO

if self.MPI_ROOT:
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
offset0 += self.tSize

for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
nWrites += 1

for _ in range(nCollectiveIO - nWrites):
# Additional collective write to catch up with other processes
self.MPI_WRITE_AT_ALL(offset0, field[:0])

offset += self.tSize
self.MPI_WRITE_AT_ALL(offset, field)
self.MPI_FILE_CLOSE()

def iPos(self, iVar, iX):
iPos = iVar * self.nDoF
for axis in range(self.dim - 1):
iPos += (self.iLoc[axis] + iX[axis]) * np.prod(self.gridSizes[axis + 1 :])
iPos += self.iLoc[-1]
return iPos

def readField(self, idx):
"""
Read one field stored in the binary file, corresponding to the given
Expand All @@ -684,26 +674,15 @@ def readField(self, idx):
return super().readField(idx)

idx = self.formatIndex(idx)
offset0 = self.hSize + idx * (self.tSize + self.fSize)
offset = self.hSize + idx * (self.tSize + self.fSize)
with open(self.fileName, "rb") as f:
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0])
offset0 += self.tSize
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0])
offset += self.tSize

field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)

self.MPI_FILE_OPEN(mode="r")
nReads = 0
nCollectiveIO = self.nCollectiveIO

for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
nReads += 1

for _ in range(nCollectiveIO - nReads):
# Additional collective read to catch up with other processes
self.MPI_READ_AT_ALL(offset0, field[:0])

self.MPI_READ_AT_ALL(offset, field)
self.MPI_FILE_CLOSE()

return t, field
Expand Down
4 changes: 2 additions & 2 deletions pySDC/tests/test_benchmarks/test_collocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@

from pySDC.core.collocation import CollBase

t_start = float(np.random.rand(1) * 0.2)
t_end = float(0.8 + np.random.rand(1) * 0.2)
t_start = float(np.random.rand(1)[0] * 0.2)
t_end = float(0.8 + np.random.rand(1)[0] * 0.2)

tolQuad = 1e-13

Expand Down
Loading