From 86d3dc455986608118827401d1f7152cc28d011f Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Fri, 9 May 2025 16:34:26 +0200 Subject: [PATCH 1/6] TL: performance fix for fieldsIO --- pySDC/helpers/fieldsIO.py | 78 +++++++++++++++++++-------------------- 1 file changed, 37 insertions(+), 41 deletions(-) diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py index fe0e8cbd0e..7927b50832 100644 --- a/pySDC/helpers/fieldsIO.py +++ b/pySDC/helpers/fieldsIO.py @@ -44,14 +44,13 @@ Warning ------- -To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring). +To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.setupMPI` (cf their docstring). Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not. """ import os import numpy as np from typing import Type, TypeVar import logging -import itertools T = TypeVar("T") @@ -61,11 +60,17 @@ except ImportError: pass from mpi4py import MPI + from mpi4py.util.dtlib import from_numpy_dtype as MPI_DTYPE except ImportError: class MPI: COMM_WORLD = None Intracomm = T + File = T + Datatype = T + + def MPI_DTYPE(): + pass # Supported data types @@ -412,6 +417,8 @@ def setHeader(self, nVar, coords): coords = self.setupCoords(*coords) self.header = {"nVar": int(nVar), "coords": coords} self.nItems = nVar * self.nDoF + if self.MPI_ON: + self.MPI_SETUP() @property def hInfos(self): @@ -433,6 +440,8 @@ def readHeader(self, f): gridSizes = np.fromfile(f, dtype=np.int32, count=dim) coords = [np.fromfile(f, dtype=np.float64, count=n) for n in gridSizes] self.setHeader(nVar, coords) + if self.MPI_ON: + self.MPI_SETUP() def reshape(self, fields: np.ndarray): """Reshape the fields to a N-d array (inplace operation)""" @@ -513,7 +522,9 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): cls.comm = comm cls.iLoc = iLoc cls.nLoc = nLoc - cls.mpiFile = None + cls.mpiFile:MPI.File = None + cls.mpiType:MPI.Datatype = None + cls.mpiFileType:MPI.Datatype = None cls._nCollectiveIO = None @property @@ -543,6 +554,18 @@ def MPI_ROOT(self): return True return self.comm.Get_rank() == 0 + def MPI_SETUP(self): + """Setup subarray masks for each processes""" + self.mpiType = MPI_DTYPE(self.dtype) + self.mpiFileType = self.mpiType.Create_subarray( + [self.nVar, *self.gridSizes], # Global array sizes + [self.nVar, *self.nLoc], # Local array sizes + [0, *self.iLoc] # Global starting indices of local blocks + ) + self.mpiFileType.Commit() + print("MPI_TYPE ", self.mpiType) + print("MPI_FILETYPE ", self.mpiFileType) + def MPI_FILE_OPEN(self, mode): """Open the binary file in MPI mode""" amode = { @@ -567,7 +590,8 @@ def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray): data : np.ndarray Data to be written in the binary file. """ - self.mpiFile.Write_at_all(offset, data) + self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType) + self.mpiFile.Write_all(data) def MPI_READ_AT_ALL(self, offset, data: np.ndarray): """ @@ -581,7 +605,8 @@ def MPI_READ_AT_ALL(self, offset, data: np.ndarray): data : np.ndarray Array on which to read the data from the binary file. """ - self.mpiFile.Read_at_all(offset, data) + self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType) + self.mpiFile.Read_all(data) def MPI_FILE_CLOSE(self): """Close the binary file in MPI mode""" @@ -632,33 +657,15 @@ def addField(self, time, field): *self.nLoc, ), f"expected {(self.nVar, *self.nLoc)} shape, got {field.shape}" - offset0 = self.fileSize + offset = self.fileSize self.MPI_FILE_OPEN(mode="a") - nWrites = 0 - nCollectiveIO = self.nCollectiveIO if self.MPI_ROOT: self.MPI_WRITE(np.array(time, dtype=T_DTYPE)) - offset0 += self.tSize - - for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): - offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)]) - nWrites += 1 - - for _ in range(nCollectiveIO - nWrites): - # Additional collective write to catch up with other processes - self.MPI_WRITE_AT_ALL(offset0, field[:0]) - + offset += self.tSize + self.MPI_WRITE_AT_ALL(offset, field) self.MPI_FILE_CLOSE() - def iPos(self, iVar, iX): - iPos = iVar * self.nDoF - for axis in range(self.dim - 1): - iPos += (self.iLoc[axis] + iX[axis]) * np.prod(self.gridSizes[axis + 1 :]) - iPos += self.iLoc[-1] - return iPos - def readField(self, idx): """ Read one field stored in the binary file, corresponding to the given @@ -684,26 +691,15 @@ def readField(self, idx): return super().readField(idx) idx = self.formatIndex(idx) - offset0 = self.hSize + idx * (self.tSize + self.fSize) + offset = self.hSize + idx * (self.tSize + self.fSize) with open(self.fileName, "rb") as f: - t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0]) - offset0 += self.tSize + t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0]) + offset += self.tSize field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype) self.MPI_FILE_OPEN(mode="r") - nReads = 0 - nCollectiveIO = self.nCollectiveIO - - for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]): - offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize - self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)]) - nReads += 1 - - for _ in range(nCollectiveIO - nReads): - # Additional collective read to catch up with other processes - self.MPI_READ_AT_ALL(offset0, field[:0]) - + self.MPI_READ_AT_ALL(offset, field) self.MPI_FILE_CLOSE() return t, field From e4b3d27c6f95216d45266b9eb6d40d9588e42232 Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Fri, 9 May 2025 17:03:41 +0200 Subject: [PATCH 2/6] TL: final update before PR --- pySDC/helpers/fieldsIO.py | 30 +++++++----------------------- 1 file changed, 7 insertions(+), 23 deletions(-) diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py index 7927b50832..3f17efe2ec 100644 --- a/pySDC/helpers/fieldsIO.py +++ b/pySDC/helpers/fieldsIO.py @@ -522,23 +522,9 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): cls.comm = comm cls.iLoc = iLoc cls.nLoc = nLoc - cls.mpiFile:MPI.File = None - cls.mpiType:MPI.Datatype = None - cls.mpiFileType:MPI.Datatype = None - cls._nCollectiveIO = None - - @property - def nCollectiveIO(self): - """ - Number of collective IO operations over all processes, when reading or writing a field. - - Returns: - -------- - int: Number of collective IO accesses - """ - if self._nCollectiveIO is None: - self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX) - return self._nCollectiveIO + cls.mpiFile: MPI.File = None + cls.mpiType: MPI.Datatype = None + cls.mpiFileType: MPI.Datatype = None @property def MPI_ON(self): @@ -558,13 +544,11 @@ def MPI_SETUP(self): """Setup subarray masks for each processes""" self.mpiType = MPI_DTYPE(self.dtype) self.mpiFileType = self.mpiType.Create_subarray( - [self.nVar, *self.gridSizes], # Global array sizes - [self.nVar, *self.nLoc], # Local array sizes - [0, *self.iLoc] # Global starting indices of local blocks - ) + [self.nVar, *self.gridSizes], # Global array sizes + [self.nVar, *self.nLoc], # Local array sizes + [0, *self.iLoc], # Global starting indices of local blocks + ) self.mpiFileType.Commit() - print("MPI_TYPE ", self.mpiType) - print("MPI_FILETYPE ", self.mpiFileType) def MPI_FILE_OPEN(self, mode): """Open the binary file in MPI mode""" From 95fbf66dbf428d6c778de2f425e316769c87f71a Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Fri, 9 May 2025 17:10:51 +0200 Subject: [PATCH 3/6] TL: missing detail --- pySDC/helpers/fieldsIO.py | 1 - 1 file changed, 1 deletion(-) diff --git a/pySDC/helpers/fieldsIO.py b/pySDC/helpers/fieldsIO.py index 3f17efe2ec..58e00d3cb4 100644 --- a/pySDC/helpers/fieldsIO.py +++ b/pySDC/helpers/fieldsIO.py @@ -502,7 +502,6 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"): # MPI-parallel implementation # ------------------------------------------------------------------------- comm: MPI.Intracomm = None - _nCollectiveIO = None @classmethod def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc): From 9449f4e08d8366f4e3672ae72a6a9aafed17c3cd Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Fri, 9 May 2025 20:03:16 +0200 Subject: [PATCH 4/6] TL: removing warning --- pySDC/tests/test_benchmarks/test_collocation.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pySDC/tests/test_benchmarks/test_collocation.py b/pySDC/tests/test_benchmarks/test_collocation.py index 14cd361d29..2d6963e374 100644 --- a/pySDC/tests/test_benchmarks/test_collocation.py +++ b/pySDC/tests/test_benchmarks/test_collocation.py @@ -3,8 +3,8 @@ from pySDC.core.collocation import CollBase -t_start = float(np.random.rand(1) * 0.2) -t_end = float(0.8 + np.random.rand(1) * 0.2) +t_start = float(np.random.rand(1)[0] * 0.2) +t_end = float(0.8 + np.random.rand(1)[0] * 0.2) tolQuad = 1e-13 From e166c1431ce3c5bafdfe67b5ac0cec8420848341 Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Sat, 10 May 2025 11:34:18 +0200 Subject: [PATCH 5/6] TL: trying something --- pySDC/tests/test_helpers/test_gusto_coupling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pySDC/tests/test_helpers/test_gusto_coupling.py b/pySDC/tests/test_helpers/test_gusto_coupling.py index 119442a223..a25edf8b2a 100644 --- a/pySDC/tests/test_helpers/test_gusto_coupling.py +++ b/pySDC/tests/test_helpers/test_gusto_coupling.py @@ -647,10 +647,10 @@ def test_pySDC_integrator_MSSDC(n_steps, useMPIController, setup, submit=True, n ) return None + from firedrake import norm, Constant, COMM_WORLD from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI from pySDC.helpers.pySDC_as_gusto_time_discretization import pySDC_integrator from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_cls - from firedrake import norm, Constant, COMM_WORLD import numpy as np MSSDC_args = {} From b41215f00ac2490b102b3290ed9bc7230bf53467 Mon Sep 17 00:00:00 2001 From: Thibaut Lunet Date: Mon, 12 May 2025 17:16:21 +0200 Subject: [PATCH 6/6] TL: removing gusto stuff --- pySDC/tests/test_helpers/test_gusto_coupling.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pySDC/tests/test_helpers/test_gusto_coupling.py b/pySDC/tests/test_helpers/test_gusto_coupling.py index a25edf8b2a..119442a223 100644 --- a/pySDC/tests/test_helpers/test_gusto_coupling.py +++ b/pySDC/tests/test_helpers/test_gusto_coupling.py @@ -647,10 +647,10 @@ def test_pySDC_integrator_MSSDC(n_steps, useMPIController, setup, submit=True, n ) return None - from firedrake import norm, Constant, COMM_WORLD from pySDC.implementations.controller_classes.controller_nonMPI import controller_nonMPI from pySDC.helpers.pySDC_as_gusto_time_discretization import pySDC_integrator from pySDC.implementations.sweeper_classes.generic_implicit import generic_implicit as sweeper_cls + from firedrake import norm, Constant, COMM_WORLD import numpy as np MSSDC_args = {}