Skip to content

Commit 03fa3f2

Browse files
committed
feat: added MPIFredholm1 and MPIMDC
1 parent fbf892e commit 03fa3f2

File tree

10 files changed

+744
-6
lines changed

10 files changed

+744
-6
lines changed

docs/source/api/index.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,28 @@ Derivatives
6161
MPILaplacian
6262
MPIGradient
6363

64+
Signal Processing
65+
~~~~~~~~~~~~~~~~~
66+
67+
.. currentmodule:: pylops_mpi.signalprocessing
68+
69+
.. autosummary::
70+
:toctree: generated/
71+
72+
MPIFredholm1
73+
74+
75+
Wave-Equation processing
76+
~~~~~~~~~~~~~~~~~~~~~~~~
77+
78+
.. currentmodule:: pylops_mpi.waveeqprocessing
79+
80+
.. autosummary::
81+
:toctree: generated/
82+
83+
MPIMDC
84+
85+
6486
Solvers
6587
-------
6688

examples/plot_mdc.py

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
"""
2+
Multi-Dimensional Convolution
3+
=============================
4+
This example shows how to use the :py:class:`pylops_mpi.waveeqprocessing.MPIMDC` operator
5+
to convolve a 3D kernel with an input seismic data in a distributed fashion (where
6+
parallelism is harnessed over the frequency axis when performing repeated matrix-vector
7+
or matrix-matrix multiplications).
8+
9+
"""
10+
from matplotlib import pyplot as plt
11+
import numpy as np
12+
from mpi4py import MPI
13+
from pylops.utils.seismicevents import hyperbolic2d, makeaxis
14+
from pylops.utils.tapers import taper3d
15+
from pylops.utils.wavelets import ricker
16+
17+
from pylops_mpi.DistributedArray import local_split, Partition
18+
import pylops_mpi
19+
20+
plt.close("all")
21+
np.random.seed(42)
22+
23+
rank = MPI.COMM_WORLD.Get_rank()
24+
size = MPI.COMM_WORLD.Get_size()
25+
dtype = np.float32
26+
cdtype = np.complex64
27+
28+
###############################################################################
29+
# Let's start by creating a set of hyperbolic events to be used as our MDC kernel
30+
31+
# Input parameters
32+
par = {
33+
"ox": -300,
34+
"dx": 10,
35+
"nx": 61,
36+
"oy": -500,
37+
"dy": 10,
38+
"ny": 101,
39+
"ot": 0,
40+
"dt": 0.004,
41+
"nt": 400,
42+
"f0": 20,
43+
"nfmax": 200,
44+
}
45+
46+
t0_m = 0.2
47+
vrms_m = 1100.0
48+
amp_m = 1.0
49+
50+
t0_G = (0.2, 0.5, 0.7)
51+
vrms_G = (1200.0, 1500.0, 2000.0)
52+
amp_G = (1.0, 0.6, 0.5)
53+
54+
# Taper
55+
tap = taper3d(par["nt"], (par["ny"], par["nx"]), (5, 5), tapertype="hanning")
56+
57+
# Create axis
58+
t, t2, x, y = makeaxis(par)
59+
60+
# Create wavelet
61+
wav = ricker(t[:41], f0=par["f0"])[0]
62+
63+
# Generate model
64+
m, mwav = hyperbolic2d(x, t, t0_m, vrms_m, amp_m, wav)
65+
66+
# Generate operator
67+
G, Gwav = np.zeros((par["ny"], par["nx"], par["nt"])), np.zeros(
68+
(par["ny"], par["nx"], par["nt"])
69+
)
70+
for iy, y0 in enumerate(y):
71+
G[iy], Gwav[iy] = hyperbolic2d(x - y0, t, t0_G, vrms_G, amp_G, wav)
72+
G, Gwav = G * tap, Gwav * tap
73+
74+
# Add negative part to data and model
75+
m = np.concatenate((np.zeros((par["nx"], par["nt"] - 1)), m), axis=-1)
76+
mwav = np.concatenate((np.zeros((par["nx"], par["nt"] - 1)), mwav), axis=-1)
77+
Gwav2 = np.concatenate((np.zeros((par["ny"], par["nx"], par["nt"] - 1)), Gwav), axis=-1)
78+
79+
# Move to frequency
80+
Gwav_fft = np.fft.rfft(Gwav2, 2 * par["nt"] - 1, axis=-1)
81+
Gwav_fft = Gwav_fft[..., : par["nfmax"]]
82+
83+
# Move frequency/time to first axis
84+
m, mwav = m.T, mwav.T
85+
Gwav_fft = Gwav_fft.transpose(2, 0, 1)
86+
87+
88+
###############################################################################
89+
# Now that we have created the kernel of our MDC operator in ``Gwav_fft``, we
90+
# are ready to define a strategy on how to split it along the first
91+
# (i.e., frequency) axis over different ranks. In practical applications, one
92+
# would of course pre-compute the kernel and just load the relevant part in
93+
# each rank from file.
94+
95+
# Choose how to split sources to ranks
96+
nf = par["nfmax"]
97+
nf_rank = local_split((nf, ), MPI.COMM_WORLD, Partition.SCATTER, 0)
98+
nf_ranks = np.concatenate(MPI.COMM_WORLD.allgather(nf_rank))
99+
ifin_rank = np.insert(np.cumsum(nf_ranks)[:-1] , 0, 0)[rank]
100+
ifend_rank = np.cumsum(nf_ranks)[rank]
101+
print(f'Rank: {rank}, nf: {nf_rank}, ifin: {ifin_rank}, ifend: {ifend_rank}')
102+
103+
# Extract part of kernel of interest
104+
G = Gwav_fft[ifin_rank:ifend_rank].astype(cdtype)
105+
print(f'Rank: {rank}, G: {G.shape}')
106+
107+
108+
###############################################################################
109+
# We can finally create the MDC operator using
110+
# :py:class:`pylops_mpi.waveeqprocessing.MPIMDC` so that the most
111+
# demanding computations can be run in parallel.
112+
113+
# Define operator
114+
Fop = pylops_mpi.waveeqprocessing.MPIMDC(
115+
G, nt=2 * par["nt"] - 1, nv=1, nfreq=nf,
116+
dt=0.004, dr=1.0, twosided=True)
117+
118+
# Apply forward
119+
md = pylops_mpi.DistributedArray(global_shape=(2 * par["nt"] - 1) * par["nx"] * 1,
120+
partition=pylops_mpi.Partition.BROADCAST,
121+
dtype=dtype)
122+
md[:] = m.astype(dtype).ravel()
123+
124+
dd = Fop @ md
125+
d = dd.asarray().real
126+
d = d.reshape(2 * par["nt"] - 1, par["ny"])
127+
128+
# Apply adjoint
129+
madjd = Fop.H @ dd
130+
madj = madjd.asarray().real
131+
madj = madj.reshape(2 * par["nt"] - 1, par["nx"])
132+
133+
###############################################################################
134+
# Finally let's display input model, data and adjoint model
135+
136+
if rank == 0:
137+
fig, axs = plt.subplots(1, 3, figsize=(9, 6))
138+
axs[0].imshow(
139+
mwav,
140+
aspect="auto",
141+
interpolation="nearest",
142+
cmap="gray",
143+
vmin=-mwav.max(),
144+
vmax=mwav.max(),
145+
extent=(x.min(), x.max(), t2.max(), t2.min()),
146+
)
147+
axs[0].set_title(r"$m$", fontsize=15)
148+
axs[0].set_xlabel("r")
149+
axs[0].set_ylabel("t")
150+
axs[1].imshow(
151+
d,
152+
aspect="auto",
153+
interpolation="nearest",
154+
cmap="gray",
155+
vmin=-d.max(),
156+
vmax=d.max(),
157+
extent=(x.min(), x.max(), t2.max(), t2.min()),
158+
)
159+
axs[1].set_title(r"$d$", fontsize=15)
160+
axs[1].set_xlabel("s")
161+
axs[1].set_ylabel("t")
162+
axs[2].imshow(
163+
madj,
164+
aspect="auto",
165+
interpolation="nearest",
166+
cmap="gray",
167+
vmin=-madj.max(),
168+
vmax=madj.max(),
169+
extent=(x.min(), x.max(), t2.max(), t2.min()),
170+
)
171+
axs[2].set_title(r"$m_{adj}$", fontsize=15)
172+
axs[2].set_xlabel("s")
173+
axs[2].set_ylabel("t")
174+
fig.tight_layout()

pylops_mpi/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from . import (
66
basicoperators,
77
optimization,
8-
plotting
8+
plotting,
9+
signalprocessing,
10+
waveeqprocessing,
911
)
1012
from .plotting.plotting import *
1113
from .optimization.basic import *

pylops_mpi/basicoperators/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
Basic Linear Operators using MPI
33
================================
44
5-
The subpackage basicoperators extends some of the basic linear algebra
6-
operations provided by numpy providing forward and adjoint functionalities
7-
using MPI.
5+
The subpackage basicoperators extends some of the basic operations
6+
provided by pylops.basicoperators providing forward and adjoint
7+
functionalities using MPI.
88
9-
A list of operators present in pylops_mpi.basicoperators :
9+
A list of operators present in pylops_mpi.basicoperators:
1010
MPIBlockDiag Block Diagonal arrangement of PyLops operators
1111
MPIStackedBlockDiag Block Diagonal arrangement of PyLops-MPI operators
1212
MPIVStack Vertical Stacking of PyLops operators
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
import numpy as np
2+
3+
from mpi4py import MPI
4+
from pylops.utils.backend import get_module
5+
from pylops.utils.typing import DTypeLike, NDArray
6+
7+
from pylops_mpi import (
8+
DistributedArray,
9+
MPILinearOperator,
10+
Partition
11+
)
12+
13+
from pylops_mpi.utils.decorators import reshaped
14+
15+
16+
class MPIFredholm1(MPILinearOperator):
17+
r"""Fredholm integral of first kind.
18+
19+
Implement a multi-dimensional Fredholm integral of first kind distributed
20+
across the first dimension
21+
22+
Parameters
23+
----------
24+
G : :obj:`numpy.ndarray`
25+
Multi-dimensional convolution kernel of size
26+
:math:`[n_{\text{slice}} \times n_x \times n_y]`
27+
nz : :obj:`int`, optional
28+
Additional dimension of model
29+
saveGt : :obj:`bool`, optional
30+
Save ``G`` and ``G.H`` to speed up the computation of adjoint
31+
(``True``) or create ``G.H`` on-the-fly (``False``)
32+
Note that ``saveGt=True`` will double the amount of required memory
33+
usematmul : :obj:`bool`, optional
34+
Use :func:`numpy.matmul` (``True``) or for-loop with :func:`numpy.dot`
35+
(``False``). As it is not possible to define which approach is more
36+
performant (this is highly dependent on the size of ``G`` and input
37+
arrays as well as the hardware used in the computation), we advise users
38+
to time both methods for their specific problem prior to making a
39+
choice.
40+
base_comm : :obj:`mpi4py.MPI.Comm`, optional
41+
MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``.
42+
dtype : :obj:`str`, optional
43+
Type of elements in input array.
44+
45+
Attributes
46+
----------
47+
shape : :obj:`tuple`
48+
Operator shape
49+
50+
Raises
51+
------
52+
NotImplementedError
53+
If the size of the first dimension of ``G`` is equal to 1 in any of the ranks
54+
55+
Notes
56+
-----
57+
A multi-dimensional Fredholm integral of first kind can be expressed as
58+
59+
.. math::
60+
61+
d(k, x, z) = \int{G(k, x, y) m(k, y, z) \,\mathrm{d}y}
62+
\quad \forall k=1,\ldots,n_{slice}
63+
64+
on the other hand its adjoint is expressed as
65+
66+
.. math::
67+
68+
m(k, y, z) = \int{G^*(k, y, x) d(k, x, z) \,\mathrm{d}x}
69+
\quad \forall k=1,\ldots,n_{\text{slice}}
70+
71+
This integral is implemented in a distributed fashion, where ``G``
72+
is split across ranks along its first dimension. The inputs
73+
of both the forward and adjoint are distributed arrays with broadcast partion:
74+
each rank takes a portion of such arrays, computes a partial integral, and
75+
the resulting outputs are then gathered by all ranks to return a
76+
distributed arrays with broadcast partion.
77+
78+
"""
79+
80+
def __init__(
81+
self,
82+
G: NDArray,
83+
nz: int = 1,
84+
saveGt: bool = False,
85+
usematmul: bool = True,
86+
base_comm: MPI.Comm = MPI.COMM_WORLD,
87+
dtype: DTypeLike = "float64",
88+
) -> None:
89+
self.nz = nz
90+
self.nsl, self.nx, self.ny = G.shape
91+
self.nsls = base_comm.allgather(self.nsl)
92+
if base_comm.Get_rank() == 0 and 1 in self.nsls:
93+
raise NotImplementedError(f'All ranks must have at least 2 or more '
94+
f'elements in the first dimension: '
95+
f'local split is instead {self.nsls}...')
96+
nslstot = base_comm.allreduce(self.nsl)
97+
self.islstart = np.insert(np.cumsum(self.nsls)[:-1], 0, 0)
98+
self.islend = np.cumsum(self.nsls)
99+
self.rank = base_comm.Get_rank()
100+
self.dims = (nslstot, self.ny, self.nz)
101+
self.dimsd = (nslstot, self.nx, self.nz)
102+
shape = (np.prod(self.dimsd),
103+
np.prod(self.dims))
104+
super().__init__(shape=shape, dtype=np.dtype(dtype), base_comm=base_comm)
105+
106+
self.G = G
107+
if saveGt:
108+
self.GT = G.transpose((0, 2, 1)).conj()
109+
self.usematmul = usematmul
110+
111+
def _matvec(self, x: DistributedArray) -> DistributedArray:
112+
ncp = get_module(x.engine)
113+
if x.partition is not Partition.BROADCAST:
114+
raise ValueError(f"x should have partition={Partition.BROADCAST}, {x.partition} != {Partition.BROADCAST}")
115+
y = DistributedArray(global_shape=self.shape[0], partition=Partition.BROADCAST,
116+
engine=x.engine, dtype=self.dtype)
117+
x = x.local_array.reshape(self.dims).squeeze()
118+
x = x[self.islstart[self.rank]:self.islend[self.rank]]
119+
# apply matmul for portion of the rank of interest
120+
if self.usematmul:
121+
if self.nz == 1:
122+
x = x[..., ncp.newaxis]
123+
y1 = ncp.matmul(self.G, x)
124+
else:
125+
y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.nx, self.nz), dtype=self.dtype))
126+
for isl in range(self.nsls[self.rank]):
127+
y1[isl] = ncp.dot(self.G[isl], x[isl])
128+
# gather results
129+
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
130+
return y
131+
132+
def _rmatvec(self, x: NDArray) -> NDArray:
133+
ncp = get_module(x.engine)
134+
if x.partition is not Partition.BROADCAST:
135+
raise ValueError(f"x should have partition={Partition.BROADCAST}, {x.partition} != {Partition.BROADCAST}")
136+
y = DistributedArray(global_shape=self.shape[1], partition=Partition.BROADCAST,
137+
engine=x.engine, dtype=self.dtype)
138+
x = x.local_array.reshape(self.dimsd).squeeze()
139+
x = x[self.islstart[self.rank]:self.islend[self.rank]]
140+
# apply matmul for portion of the rank of interest
141+
if self.usematmul:
142+
if self.nz == 1:
143+
x = x[..., ncp.newaxis]
144+
if hasattr(self, "GT"):
145+
y1 = ncp.matmul(self.GT, x)
146+
else:
147+
y1 = (
148+
ncp.matmul(x.transpose(0, 2, 1).conj(), self.G)
149+
.transpose(0, 2, 1)
150+
.conj()
151+
)
152+
else:
153+
y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.ny, self.nz), dtype=self.dtype))
154+
if hasattr(self, "GT"):
155+
for isl in range(self.nsls[self.rank]):
156+
y1[isl] = ncp.dot(self.GT[isl], x[isl])
157+
else:
158+
for isl in range(self.nsl):
159+
y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj()
160+
161+
# gather results
162+
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
163+
return y
164+

0 commit comments

Comments
 (0)