Skip to content

Commit a101f29

Browse files
committed
Merge branch 'master' into neuralpint
2 parents 748432a + c68a9a6 commit a101f29

File tree

8 files changed

+129
-115
lines changed

8 files changed

+129
-115
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ Any contribution is dearly welcome! If you want to contribute, please take the t
112112
This project has received funding from the [European High-Performance
113113
Computing Joint Undertaking](https://eurohpc-ju.europa.eu/) (JU) under
114114
grant agreement No 955701 ([TIME-X](https://www.time-x-eurohpc.eu/))
115-
and grant agreement No 101118139.
115+
and grant agreement No 101118139.
116116
The JU receives support from the European Union's Horizon 2020 research
117117
and innovation programme and Belgium, France, Germany, and Switzerland.
118118
This project also received funding from the [German Federal Ministry of

docs/source/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
# The short X.Y version.
7373
version = '5.6'
7474
# The full version, including alpha/beta/rc tags.
75-
release = '5.6
75+
release = '5.6'
7676

7777
# The language for content autogenerated by Sphinx. Refer to documentation
7878
# for a list of supported languages.

pySDC/helpers/fieldsIO.py

Lines changed: 34 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -44,14 +44,13 @@
4444
4545
Warning
4646
-------
47-
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.initMPI` (cf their docstring).
47+
To use MPI collective writing, you need to call first the class methods :class:`Rectilinear.setupMPI` (cf their docstring).
4848
Also, `Rectilinear.setHeader` **must be given the global grids coordinates**, whether the code is run in parallel or not.
4949
"""
5050
import os
5151
import numpy as np
5252
from typing import Type, TypeVar
5353
import logging
54-
import itertools
5554

5655
T = TypeVar("T")
5756

@@ -61,11 +60,17 @@
6160
except ImportError:
6261
pass
6362
from mpi4py import MPI
63+
from mpi4py.util.dtlib import from_numpy_dtype as MPI_DTYPE
6464
except ImportError:
6565

6666
class MPI:
6767
COMM_WORLD = None
6868
Intracomm = T
69+
File = T
70+
Datatype = T
71+
72+
def MPI_DTYPE():
73+
pass
6974

7075

7176
# Supported data types
@@ -417,6 +422,8 @@ def setHeader(self, nVar, coords):
417422
coords = self.setupCoords(*coords)
418423
self.header = {"nVar": int(nVar), "coords": coords}
419424
self.nItems = nVar * self.nDoF
425+
if self.MPI_ON:
426+
self.MPI_SETUP()
420427

421428
@property
422429
def hInfos(self):
@@ -438,6 +445,8 @@ def readHeader(self, f):
438445
gridSizes = np.fromfile(f, dtype=np.int32, count=dim)
439446
coords = [np.fromfile(f, dtype=np.float64, count=n) for n in gridSizes]
440447
self.setHeader(nVar, coords)
448+
if self.MPI_ON:
449+
self.MPI_SETUP()
441450

442451
def reshape(self, fields: np.ndarray):
443452
"""Reshape the fields to a N-d array (inplace operation)"""
@@ -498,7 +507,6 @@ def toVTR(self, baseName, varNames, idxFormat="{:06d}"):
498507
# MPI-parallel implementation
499508
# -------------------------------------------------------------------------
500509
comm: MPI.Intracomm = None
501-
_nCollectiveIO = None
502510

503511
@classmethod
504512
def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
@@ -519,20 +527,8 @@ def setupMPI(cls, comm: MPI.Intracomm, iLoc, nLoc):
519527
cls.iLoc = iLoc
520528
cls.nLoc = nLoc
521529
cls.mpiFile: MPI.File = None
522-
cls._nCollectiveIO = None
523-
524-
@property
525-
def nCollectiveIO(self):
526-
"""
527-
Number of collective IO operations over all processes, when reading or writing a field.
528-
529-
Returns:
530-
--------
531-
int: Number of collective IO accesses
532-
"""
533-
if self._nCollectiveIO is None:
534-
self._nCollectiveIO = self.comm.allreduce(self.nVar * np.prod(self.nLoc[:-1]), op=MPI.MAX)
535-
return self._nCollectiveIO
530+
cls.mpiType: MPI.Datatype = None
531+
cls.mpiFileType: MPI.Datatype = None
536532

537533
@property
538534
def MPI_ON(self):
@@ -548,6 +544,16 @@ def MPI_ROOT(self):
548544
return True
549545
return self.comm.Get_rank() == 0
550546

547+
def MPI_SETUP(self):
548+
"""Setup subarray masks for each processes"""
549+
self.mpiType = MPI_DTYPE(self.dtype)
550+
self.mpiFileType = self.mpiType.Create_subarray(
551+
[self.nVar, *self.gridSizes], # Global array sizes
552+
[self.nVar, *self.nLoc], # Local array sizes
553+
[0, *self.iLoc], # Global starting indices of local blocks
554+
)
555+
self.mpiFileType.Commit()
556+
551557
def MPI_FILE_OPEN(self, mode):
552558
"""Open the binary file in MPI mode"""
553559
amode = {
@@ -572,7 +578,8 @@ def MPI_WRITE_AT_ALL(self, offset, data: np.ndarray):
572578
data : np.ndarray
573579
Data to be written in the binary file.
574580
"""
575-
self.mpiFile.Write_at_all(offset, data)
581+
self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType)
582+
self.mpiFile.Write_all(data)
576583

577584
def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
578585
"""
@@ -586,7 +593,8 @@ def MPI_READ_AT_ALL(self, offset, data: np.ndarray):
586593
data : np.ndarray
587594
Array on which to read the data from the binary file.
588595
"""
589-
self.mpiFile.Read_at_all(offset, data)
596+
self.mpiFile.Set_view(disp=offset, etype=self.mpiType, filetype=self.mpiFileType)
597+
self.mpiFile.Read_all(data)
590598

591599
def MPI_FILE_CLOSE(self):
592600
"""Close the binary file in MPI mode"""
@@ -637,33 +645,15 @@ def addField(self, time, field):
637645
*self.nLoc,
638646
), f"expected {(self.nVar, *self.nLoc)} shape, got {field.shape}"
639647

640-
offset0 = self.fileSize
648+
offset = self.fileSize
641649
self.MPI_FILE_OPEN(mode="a")
642-
nWrites = 0
643-
nCollectiveIO = self.nCollectiveIO
644650

645651
if self.MPI_ROOT:
646652
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
647-
offset0 += self.tSize
648-
649-
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
650-
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
651-
self.MPI_WRITE_AT_ALL(offset, field[(iVar, *iBeg)])
652-
nWrites += 1
653-
654-
for _ in range(nCollectiveIO - nWrites):
655-
# Additional collective write to catch up with other processes
656-
self.MPI_WRITE_AT_ALL(offset0, field[:0])
657-
653+
offset += self.tSize
654+
self.MPI_WRITE_AT_ALL(offset, field)
658655
self.MPI_FILE_CLOSE()
659656

660-
def iPos(self, iVar, iX):
661-
iPos = iVar * self.nDoF
662-
for axis in range(self.dim - 1):
663-
iPos += (self.iLoc[axis] + iX[axis]) * np.prod(self.gridSizes[axis + 1 :])
664-
iPos += self.iLoc[-1]
665-
return iPos
666-
667657
def readField(self, idx):
668658
"""
669659
Read one field stored in the binary file, corresponding to the given
@@ -689,26 +679,15 @@ def readField(self, idx):
689679
return super().readField(idx)
690680

691681
idx = self.formatIndex(idx)
692-
offset0 = self.hSize + idx * (self.tSize + self.fSize)
682+
offset = self.hSize + idx * (self.tSize + self.fSize)
693683
with open(self.fileName, "rb") as f:
694-
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset0)[0])
695-
offset0 += self.tSize
684+
t = float(np.fromfile(f, dtype=T_DTYPE, count=1, offset=offset)[0])
685+
offset += self.tSize
696686

697687
field = np.empty((self.nVar, *self.nLoc), dtype=self.dtype)
698688

699689
self.MPI_FILE_OPEN(mode="r")
700-
nReads = 0
701-
nCollectiveIO = self.nCollectiveIO
702-
703-
for (iVar, *iBeg) in itertools.product(range(self.nVar), *[range(n) for n in self.nLoc[:-1]]):
704-
offset = offset0 + self.iPos(iVar, iBeg) * self.itemSize
705-
self.MPI_READ_AT_ALL(offset, field[(iVar, *iBeg)])
706-
nReads += 1
707-
708-
for _ in range(nCollectiveIO - nReads):
709-
# Additional collective read to catch up with other processes
710-
self.MPI_READ_AT_ALL(offset0, field[:0])
711-
690+
self.MPI_READ_AT_ALL(offset, field)
712691
self.MPI_FILE_CLOSE()
713692

714693
return t, field

pySDC/projects/GPU/analysis_scripts/parallel_scaling.py

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -81,13 +81,17 @@ def run_scaling_test(self, **kwargs):
8181
**kwargs,
8282
)
8383

84-
def plot_scaling_test(self, ax, quantity='time', **plotting_params): # pragma: no cover
84+
def plot_scaling_test(self, ax, quantity='time', space_time=None, **plotting_params): # pragma: no cover
8585
from matplotlib.colors import TABLEAU_COLORS
8686

8787
cmap = TABLEAU_COLORS
8888
colors = list(cmap.values())
8989

9090
for experiment in self.experiments:
91+
if space_time is not None:
92+
if not experiment.PinT == space_time:
93+
continue
94+
9195
tasks_time = self.tasks_time if experiment.PinT else 1
9296
timings = {}
9397

@@ -141,20 +145,30 @@ def plot_scaling_test(self, ax, quantity='time', **plotting_params): # pragma:
141145
elif quantity == 'throughput_per_task':
142146
timings[np.prod(procs)] = experiment.res**self.ndim / t_mean
143147
elif quantity == 'efficiency':
148+
if type(config).__name__ == 'GrayScottScaling3D':
149+
norm = 13216322.909
150+
else:
151+
norm = 1
144152
timings[np.prod(procs) / self.tasks_per_node] = (
145-
experiment.res**self.ndim / t_mean / np.prod(procs)
153+
experiment.res**self.ndim / t_mean / np.prod(procs) / norm
146154
)
147155
elif quantity == 'time':
148156
timings[np.prod(procs) / self.tasks_per_node] = t_mean
149157
elif quantity == 'time_per_task':
150158
timings[np.prod(procs)] = t_mean
151159
elif quantity == 'min_time_per_task':
152160
timings[np.prod(procs)] = t_min
161+
elif quantity == 'min_time':
162+
timings[np.prod(procs) / self.tasks_per_node] = t_min
153163
else:
154164
raise NotImplementedError
155165
except (FileNotFoundError, ValueError):
156166
pass
157167

168+
if quantity == 'efficiency' and type(config).__name__ == 'RayleighBenard_scaling':
169+
norm = max(timings.values())
170+
timings = {key: value / norm for key, value in timings.items()}
171+
158172
ax.loglog(
159173
timings.keys(),
160174
timings.values(),
@@ -171,7 +185,8 @@ def plot_scaling_test(self, ax, quantity='time', **plotting_params): # pragma:
171185
'time': r'$t_\mathrm{step}$ / s',
172186
'time_per_task': r'$t_\mathrm{step}$ / s',
173187
'min_time_per_task': r'minimal $t_\mathrm{step}$ / s',
174-
'efficiency': 'efficiency / DoF/s/task',
188+
'min_time': r'minimal $t_\mathrm{step}$ / s',
189+
'efficiency': r'parallel efficiency / \%',
175190
}
176191
ax.set_ylabel(labels[quantity])
177192

@@ -331,17 +346,28 @@ class RayleighBenardDedalusComparisonGPU(GPUConfig, ScalingConfig):
331346
]
332347

333348

334-
def plot_scalings(problem, **kwargs): # pragma: no cover
349+
def plot_scalings(problem, XPU=None, space_time=None, **kwargs): # pragma: no cover
335350
if problem == 'GS3D':
336-
configs = [
337-
GrayScottSpaceScalingCPU3D(),
338-
GrayScottSpaceScalingGPU3D(),
339-
]
351+
if XPU == 'CPU':
352+
configs = [GrayScottSpaceScalingCPU3D()]
353+
elif XPU == 'GPU':
354+
configs = [GrayScottSpaceScalingGPU3D()]
355+
else:
356+
configs = [GrayScottSpaceScalingCPU3D(), GrayScottSpaceScalingGPU3D()]
340357
elif problem == 'RBC':
341-
configs = [
342-
RayleighBenardSpaceScalingGPU(),
343-
RayleighBenardSpaceScalingCPU(),
344-
]
358+
if XPU == 'CPU':
359+
configs = [
360+
RayleighBenardSpaceScalingCPU(),
361+
]
362+
elif XPU == 'GPU':
363+
configs = [
364+
RayleighBenardSpaceScalingGPU(),
365+
]
366+
else:
367+
configs = [
368+
RayleighBenardSpaceScalingGPU(),
369+
RayleighBenardSpaceScalingCPU(),
370+
]
345371
elif problem == 'RBC_dedalus':
346372
configs = [
347373
RayleighBenardDedalusComparison(),
@@ -358,31 +384,26 @@ def plot_scalings(problem, **kwargs): # pragma: no cover
358384
('RBC', 'time'): {'x': [1 / 10, 64], 'y': [60, 60 / 640]},
359385
('RBC', 'time_per_task'): {'x': [1, 640], 'y': [60, 60 / 640]},
360386
('RBC', 'min_time_per_task'): {'x': [1, 640], 'y': [60, 60 / 640]},
387+
('RBC', 'min_time'): {'x': [1, 640], 'y': [60, 60 / 640]},
361388
('RBC', 'throughput_per_task'): {'x': [1 / 1, 640], 'y': [2e4, 2e4 * 640]},
362389
}
363390

364-
fig, ax = plt.subplots(figsize=figsize_by_journal('TUHH_thesis', 1, 0.6))
365-
configs[1].plot_scaling_test(ax=ax, quantity='efficiency')
366-
# ax.legend(frameon=False)
367-
box = ax.get_position()
368-
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
369-
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
370-
371-
ax.set_yscale('linear')
372-
path = f'{PROJECT_PATH}/plots/scaling_{problem}_efficiency.pdf'
373-
fig.savefig(path, bbox_inches='tight')
374-
print(f'Saved {path!r}', flush=True)
375-
376-
for quantity in ['time', 'throughput', 'time_per_task', 'throughput_per_task', 'min_time_per_task'][::-1]:
391+
for quantity in ['time', 'throughput', 'time_per_task', 'throughput_per_task', 'min_time_per_task', 'efficiency'][
392+
::-1
393+
]:
377394
fig, ax = plt.subplots(figsize=figsize_by_journal('TUHH_thesis', 1, 0.6))
378395
for config in configs:
379-
config.plot_scaling_test(ax=ax, quantity=quantity)
396+
config.plot_scaling_test(ax=ax, quantity=quantity, space_time=space_time)
380397
if (problem, quantity) in ideal_lines.keys():
381398
ax.loglog(*ideal_lines[(problem, quantity)].values(), color='black', ls=':', label='ideal')
399+
elif quantity == 'efficiency':
400+
ax.axhline(1, color='black', ls=':', label='ideal')
401+
ax.set_yscale('linear')
402+
ax.set_ylim(0, 1.1)
382403
box = ax.get_position()
383404
ax.set_position([box.x0, box.y0, box.width * 0.8, box.height])
384405
ax.legend(loc='center left', bbox_to_anchor=(1, 0.5))
385-
path = f'{PROJECT_PATH}/plots/scaling_{problem}_{quantity}.pdf'
406+
path = f'{PROJECT_PATH}/plots/scaling_{problem}_{quantity}_{XPU}_{space_time}.pdf'
386407
fig.savefig(path, bbox_inches='tight')
387408
print(f'Saved {path!r}', flush=True)
388409

@@ -393,8 +414,8 @@ def plot_scalings(problem, **kwargs): # pragma: no cover
393414
parser = argparse.ArgumentParser()
394415
parser.add_argument('--mode', type=str, choices=['run', 'plot'], default='run')
395416
parser.add_argument('--problem', type=str, default='GS')
396-
parser.add_argument('--XPU', type=str, choices=['CPU', 'GPU'], default='CPU')
397-
parser.add_argument('--space_time', type=str, choices=['True', 'False'], default='False')
417+
parser.add_argument('--XPU', type=str, choices=['CPU', 'GPU', 'both'], default='CPU')
418+
parser.add_argument('--space_time', type=str, choices=['True', 'False', 'None'], default='False')
398419
parser.add_argument('--submit', type=str, choices=['True', 'False'], default='True')
399420
parser.add_argument('--nsys_profiling', type=str, choices=['True', 'False'], default='False')
400421

@@ -403,6 +424,13 @@ def plot_scalings(problem, **kwargs): # pragma: no cover
403424
submit = args.submit == 'True'
404425
nsys_profiling = args.nsys_profiling == 'True'
405426

427+
if args.space_time == 'True':
428+
space_time = True
429+
elif args.space_time == 'False':
430+
space_time = False
431+
else:
432+
space_time = None
433+
406434
config_classes = []
407435

408436
if args.problem == 'GS3D':
@@ -429,6 +457,6 @@ def plot_scalings(problem, **kwargs): # pragma: no cover
429457
if args.mode == 'run':
430458
config.run_scaling_test(submit=submit, nsys_profiling=nsys_profiling)
431459
elif args.mode == 'plot':
432-
plot_scalings(problem=args.problem)
460+
plot_scalings(problem=args.problem, XPU=args.XPU, space_time=space_time)
433461
else:
434462
raise NotImplementedError(f'Don\'t know mode {args.mode!r}')

0 commit comments

Comments
 (0)