Skip to content

Commit 5a5375e

Browse files
committed
TL: block decomposition and first tests for fieldsIO
1 parent 684d3bf commit 5a5375e

File tree

4 files changed

+395
-48
lines changed

4 files changed

+395
-48
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Testing the FieldsIO
2+
3+
- generate seq, read MPI

pySDC/playgrounds/dedalus/fieldsIO.py renamed to pySDC/playgrounds/dedalus/fieldsIO/base.py

Lines changed: 67 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,17 @@
44
Base generic script for fields IO
55
"""
66
import os
7+
import sys
78
import numpy as np
89
from typing import Type, TypeVar
9-
from mpi4py import MPI
10-
from time import time
10+
try:
11+
from mpi4py import MPI
12+
except ImportError:
13+
pass
14+
15+
16+
from time import time, sleep
17+
from blocks import BlockDecomposition
1118

1219
T = TypeVar("T")
1320

@@ -230,6 +237,7 @@ def setupMPI(cls, comm:MPI.Intracomm, iLocX, nLocX):
230237
cls.comm = comm
231238
cls.iLocX = iLocX
232239
cls.nLocX = nLocX
240+
cls.mpiFile = None
233241

234242
@property
235243
def MPI_ON(self):
@@ -241,18 +249,33 @@ def MPI_ROOT(self):
241249
if self.comm is None: return True
242250
return self.comm.Get_rank() == 0
243251

244-
def MPI_FILE_OPEN(self, mode)->MPI.File:
252+
def MPI_FILE_OPEN(self, mode):
245253
amode = {
246254
"r": MPI.MODE_RDONLY,
247255
"a": MPI.MODE_WRONLY | MPI.MODE_APPEND,
248256
}[mode]
249-
return MPI.File.Open(self.comm, self.fileName, amode)
257+
self.mpiFile = MPI.File.Open(self.comm, self.fileName, amode)
258+
259+
def MPI_WRITE(self, data):
260+
self.mpiFile.Write(data)
261+
262+
def MPI_WRITE_AT(self, offset, data:np.ndarray):
263+
self.mpiFile.Write_at(offset, data)
264+
265+
def MPI_READ_AT(self, offset, data):
266+
self.mpiFile.Read_at(offset, data)
267+
268+
def MPI_FILE_CLOSE(self):
269+
self.mpiFile.Close()
270+
self.mpiFile = None
250271

251272
def initialize(self):
252273
if self.MPI_ROOT:
253274
super().initialize()
254-
self.comm.Barrier()
255-
self.initialized = True
275+
if self.MPI_ON:
276+
self.comm.Barrier()
277+
self.initialized = True
278+
256279

257280
def addField(self, time, field):
258281
if not self.MPI_ON: return super().addField(time, field)
@@ -265,15 +288,15 @@ def addField(self, time, field):
265288
f"expected {(self.nVar, self.nLocX)} shape, got {field.shape}"
266289

267290
offset0 = self.fileSize
268-
mpiFile = self.MPI_FILE_OPEN(mode="a")
291+
self.MPI_FILE_OPEN(mode="a")
269292
if self.MPI_ROOT:
270-
mpiFile.Write(np.array(time, dtype=T_DTYPE))
293+
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
271294
offset0 += self.tSize
272295

273296
for iVar in range(self.nVar):
274297
offset = offset0 + (iVar*self.nX + self.iLocX)*self.itemSize
275-
mpiFile.Write_at_all(offset, field[iVar])
276-
mpiFile.Close()
298+
self.MPI_WRITE_AT(offset, field[iVar])
299+
self.MPI_FILE_CLOSE()
277300

278301

279302
def readField(self, idx):
@@ -287,11 +310,11 @@ def readField(self, idx):
287310

288311
field = np.empty((self.nVar, self.nLocX), dtype=self.dtype)
289312

290-
mpiFile = self.MPI_FILE_OPEN(mode="r")
313+
self.MPI_FILE_OPEN(mode="r")
291314
for iVar in range(self.nVar):
292315
offset = offset0 + (iVar*self.nX + self.iLocX)*self.itemSize
293-
mpiFile.Read_at_all(offset, field[iVar])
294-
mpiFile.Close()
316+
self.MPI_READ_AT(offset, field[iVar])
317+
self.MPI_FILE_CLOSE()
295318

296319
return t, field
297320

@@ -331,9 +354,7 @@ def readHeader(self, f):
331354
# -------------------------------------------------------------------------
332355
@classmethod
333356
def setupMPI(cls, comm:MPI.Intracomm, iLocX, nLocX, iLocY, nLocY):
334-
cls.comm = comm
335-
cls.iLocX = iLocX
336-
cls.nLocX = nLocX
357+
super().setupMPI(comm, iLocX, nLocX)
337358
cls.iLocY = iLocY
338359
cls.nLocY = nLocY
339360

@@ -349,18 +370,18 @@ def addField(self, time, field):
349370
f"expected {(self.nVar, self.nLocX, self.nLocY)} shape, got {field.shape}"
350371

351372
offset0 = self.fileSize
352-
mpiFile = self.MPI_FILE_OPEN(mode="a")
373+
self.MPI_FILE_OPEN(mode="a")
353374
if self.MPI_ROOT:
354-
mpiFile.Write(np.array(time, dtype=T_DTYPE))
375+
self.MPI_WRITE(np.array(time, dtype=T_DTYPE))
355376
offset0 += self.tSize
356377

357378
for iVar in range(self.nVar):
358379
for iX in range(self.nLocX):
359380
offset = offset0 + (
360381
iVar*self.nX*self.nY + (self.iLocX + iX)*self.nY + self.iLocY
361382
)*self.itemSize
362-
mpiFile.Write_at_all(offset, field[iVar, iX])
363-
mpiFile.Close()
383+
self.MPI_WRITE_AT(offset, field[iVar, iX])
384+
self.MPI_FILE_CLOSE()
364385

365386

366387
def readField(self, idx):
@@ -374,14 +395,14 @@ def readField(self, idx):
374395

375396
field = np.empty((self.nVar, self.nLocX, self.nLocY), dtype=self.dtype)
376397

377-
mpiFile = self.MPI_FILE_OPEN(mode="r")
398+
self.MPI_FILE_OPEN(mode="r")
378399
for iVar in range(self.nVar):
379400
for iX in range(self.nLocX):
380401
offset = offset0 + (
381402
iVar*self.nX*self.nY + (self.iLocX + iX)*self.nY + self.iLocY
382403
)*self.itemSize
383-
mpiFile.Read_at_all(offset, field[iVar, iX])
384-
mpiFile.Close()
404+
self.MPI_READ_AT(offset, field[iVar, iX])
405+
self.MPI_FILE_CLOSE()
385406

386407
return t, field
387408

@@ -404,7 +425,7 @@ def readField(self, idx):
404425
y = np.linspace(0, 1, num=64, endpoint=False)
405426
nY = y.size
406427

407-
dim = 2
428+
dim = 1
408429
dType = np.float64
409430

410431
if dim == 1:
@@ -416,42 +437,40 @@ def readField(self, idx):
416437
comm = MPI.COMM_WORLD
417438
MPI_SIZE = comm.Get_size()
418439
MPI_RANK = comm.Get_rank()
440+
441+
gridSizes = u0.shape[1:]
442+
algo = sys.argv[1] if len(sys.argv) > 1 else "ChatGPT"
443+
blocks = BlockDecomposition(MPI_SIZE, gridSizes, algo, MPI_RANK)
444+
bounds = blocks.localBounds
419445
if MPI_SIZE > 1:
420446
fileName = "test_MPI.pysdc"
421-
if dim == 1:
422-
pSizeX = MPI_SIZE
423-
pRankX = MPI_RANK
424-
if dim == 2:
425-
assert MPI_SIZE == 4
426-
pSizeX = MPI_SIZE // 2
427-
pRankX = MPI_RANK // 2
428-
pSizeY = MPI_SIZE // 2
429-
pRankY = MPI_RANK % 2
430-
else:
431-
pSizeX, pRankX = 1, 0
432-
pSizeY, pRankY = 1, 0
433-
434-
def decomposeDirection(nItems, pSize, pRank):
435-
n0 = nItems // pSize
436-
nRest = nItems - pSize*n0
437-
nLoc = n0 + 1*(pRank < nRest)
438-
iLoc = pRank*n0 + nRest*(pRank >= nRest) + pRank*(pRank < nRest)
439-
return iLoc, nLoc
440-
441-
442-
iLocX, nLocX = decomposeDirection(nX, pSizeX, pRankX)
447+
448+
443449
if dim == 1:
450+
(iLocX, ), (nLocX, ) = bounds
451+
pRankX, = blocks.ranks
444452
Cart1D.setupMPI(comm, iLocX, nLocX)
445453
u0 = u0[:, iLocX:iLocX+nLocX]
446454

455+
MPI.COMM_WORLD.Barrier()
456+
sleep(0.01*MPI_RANK)
457+
print(f"[Rank {MPI_RANK}] pRankX={pRankX} ({iLocX}, {nLocX})")
458+
MPI.COMM_WORLD.Barrier()
459+
447460
f1 = Cart1D(dType, fileName)
448461
f1.setHeader(nVar=u0.shape[0], gridX=x)
449462

450463
if dim == 2:
451-
iLocY, nLocY = decomposeDirection(nY, pSizeY, pRankY)
464+
(iLocX, iLocY), (nLocX, nLocY) = bounds
465+
pRankX, pRankY = blocks.ranks
452466
Cart2D.setupMPI(comm, iLocX, nLocX, iLocY, nLocY)
453467
u0 = u0[:, iLocX:iLocX+nLocX, iLocY:iLocY+nLocY]
454468

469+
MPI.COMM_WORLD.Barrier()
470+
sleep(0.01*MPI_RANK)
471+
print(f"[Rank {MPI_RANK}] pRankX={pRankX} ({iLocX}, {nLocX}), pRankY={pRankY} ({iLocY}, {nLocY})")
472+
MPI.COMM_WORLD.Barrier()
473+
455474
f1 = Cart2D(dType, fileName)
456475
f1.setHeader(nVar=u0.shape[0], gridX=x, gridY=y)
457476

@@ -465,7 +484,7 @@ def decomposeDirection(nItems, pSize, pRank):
465484
for t in np.arange(nTimes)/nTimes:
466485
f1.addField(t, t*u0)
467486
if MPI_RANK == 0:
468-
print(f" -> done in {time()-tBeg:1.2f}s !")
487+
print(f" -> done in {time()-tBeg:1.4f}s !")
469488

470489
f2 = FieldsIO.fromFile(fileName)
471490
t, u = f2.readField(2)
Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
2+
class BlockDecomposition(object):
3+
"""
4+
Class decomposing a cartesian space domain (1D to 3D) into a given number of processors.
5+
6+
Parameters
7+
----------
8+
nProcs : int
9+
Total number of processors for space block decomposition.
10+
gridSizes : list[int]
11+
Number of grid points in each dimension
12+
algo : str, optional
13+
Algorithm used for hte block decomposition :
14+
15+
- Hybrid : approach minimizing interface communication, taken from
16+
the `[Hybrid CFD solver] <https://web.stanford.edu/group/ctr/ResBriefs07/5_larsson1_pp47_58.pdf>`_.
17+
- ChatGPT : quickly generated using `[ChatGPT] <https://chatgpt.com>`_.
18+
19+
The default is "Hybrid".
20+
gRank : int, optional
21+
If provided, the global rank that will determine the local block distribution. Default is None.
22+
order : str, optional
23+
The order used when computing the rank block distribution. Default is `C`.
24+
"""
25+
26+
def __init__(self, nProcs, gridSizes, algo="Hybrid", gRank=None, order="C"):
27+
dim = len(gridSizes)
28+
assert dim in [1, 2, 3], "block decomposition only works for 1D, 2D or 3D domains"
29+
30+
if algo == "ChatGPT":
31+
32+
nBlocks = [1]*dim
33+
for i in range(2, int(nProcs**0.5) + 1):
34+
while nProcs % i == 0:
35+
nBlocks[0] *= i
36+
nProcs //= i
37+
nBlocks.sort()
38+
39+
if nProcs > 1:
40+
nBlocks[0] *= nProcs
41+
42+
nBlocks.sort()
43+
while len(nBlocks) < dim:
44+
smallest = nBlocks.pop(0)
45+
nBlocks += [1, smallest]
46+
nBlocks.sort()
47+
48+
while len(nBlocks) > dim:
49+
smallest = nBlocks.pop(0)
50+
next_smallest = nBlocks.pop(0)
51+
nBlocks.append(smallest * next_smallest)
52+
nBlocks.sort()
53+
54+
elif algo == "Hybrid":
55+
rest = nProcs
56+
facs = {
57+
1: [1],
58+
2: [2, 1],
59+
3: [2, 3, 1],
60+
}[dim]
61+
exps = [0]*dim
62+
for n in range(dim-1):
63+
while (rest % facs[n]) == 0:
64+
exps[n] = exps[n] + 1
65+
rest = rest // facs[n]
66+
if (rest > 1):
67+
facs[dim-1] = rest
68+
exps[dim-1] = 1
69+
70+
nBlocks = [1]*dim
71+
for n in range(dim-1, -1, -1):
72+
while exps[n] > 0:
73+
dummymax = -1
74+
dmax = 0
75+
for d, nPts in enumerate(gridSizes):
76+
dummy = (nPts + nBlocks[d] - 1) // nBlocks[d]
77+
if (dummy >= dummymax):
78+
dummymax = dummy
79+
dmax = d
80+
nBlocks[dmax] = nBlocks[dmax] * facs[n]
81+
exps[n] = exps[n] - 1
82+
83+
else:
84+
raise NotImplementedError(f"algo={algo}")
85+
86+
# Store attributes
87+
self.dim = dim
88+
self.nBlocks = nBlocks
89+
self.gridSizes = gridSizes
90+
91+
# Used for rank block distribution
92+
self.gRank = gRank
93+
self.order = order
94+
95+
@property
96+
def ranks(self):
97+
gRank, order = self.gRank, self.order
98+
assert gRank is not None, "gRank attribute need to be set"
99+
dim, nBlocks = self.dim, self.nBlocks
100+
if dim == 1:
101+
return (gRank, )
102+
elif dim == 2:
103+
div = nBlocks[-1] if order == "C" else nBlocks[0]
104+
return (gRank // div, gRank % div)
105+
else:
106+
raise NotImplementedError(f"dim={dim}")
107+
108+
@property
109+
def localBounds(self):
110+
iLocList, nLocList = [], []
111+
for rank, nPoints, nBlocks in zip(self.ranks, self.gridSizes, self.nBlocks):
112+
n0 = nPoints // nBlocks
113+
nRest = nPoints - nBlocks*n0
114+
nLoc = n0 + 1*(rank < nRest)
115+
iLoc = rank*n0 + nRest*(rank >= nRest) + rank*(rank < nRest)
116+
117+
iLocList.append(iLoc)
118+
nLocList.append(nLoc)
119+
return iLocList, nLocList
120+
121+
122+
if __name__ == "__main__":
123+
from mpi4py import MPI
124+
from time import sleep
125+
126+
comm:MPI.Intracomm = MPI.COMM_WORLD
127+
MPI_SIZE = comm.Get_size()
128+
MPI_RANK = comm.Get_rank()
129+
130+
blocks = BlockDecomposition(MPI_SIZE, [256, 64], gRank=MPI_RANK)
131+
if MPI_RANK == 0:
132+
print(f"nBlocks : {blocks.nBlocks}")
133+
134+
ranks = blocks.ranks
135+
bounds = blocks.localBounds
136+
137+
comm.Barrier()
138+
sleep(0.01*MPI_RANK)
139+
print(f"[Rank {MPI_RANK}] pRankX={ranks}, bounds={bounds}")

0 commit comments

Comments
 (0)