Skip to content

Commit ea2c387

Browse files
authored
Merge pull request #103 from PyLops/fredholm
feat: added MPIFredholm1 and MPIMDC
2 parents e6b49d6 + 31efeb2 commit ea2c387

File tree

10 files changed

+729
-10
lines changed

10 files changed

+729
-10
lines changed

docs/source/api/index.rst

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,28 @@ Derivatives
6161
MPILaplacian
6262
MPIGradient
6363

64+
Signal Processing
65+
~~~~~~~~~~~~~~~~~
66+
67+
.. currentmodule:: pylops_mpi.signalprocessing
68+
69+
.. autosummary::
70+
:toctree: generated/
71+
72+
MPIFredholm1
73+
74+
75+
Wave-Equation processing
76+
~~~~~~~~~~~~~~~~~~~~~~~~
77+
78+
.. currentmodule:: pylops_mpi.waveeqprocessing
79+
80+
.. autosummary::
81+
:toctree: generated/
82+
83+
MPIMDC
84+
85+
6486
Solvers
6587
-------
6688

examples/plot_mdc.py

Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
"""
2+
Multi-Dimensional Convolution
3+
=============================
4+
This example shows how to use the :py:class:`pylops_mpi.waveeqprocessing.MPIMDC` operator
5+
to convolve a 3D kernel with an input seismic data in a distributed fashion (where
6+
parallelism is harnessed over the frequency axis when performing repeated matrix-vector
7+
or matrix-matrix multiplications).
8+
9+
"""
10+
from matplotlib import pyplot as plt
11+
import numpy as np
12+
from mpi4py import MPI
13+
from pylops.utils.seismicevents import hyperbolic2d, makeaxis
14+
from pylops.utils.tapers import taper3d
15+
from pylops.utils.wavelets import ricker
16+
17+
from pylops_mpi.DistributedArray import local_split, Partition
18+
import pylops_mpi
19+
20+
plt.close("all")
21+
np.random.seed(42)
22+
23+
rank = MPI.COMM_WORLD.Get_rank()
24+
size = MPI.COMM_WORLD.Get_size()
25+
dtype = np.float32
26+
cdtype = np.complex64
27+
28+
###############################################################################
29+
# Let's start by creating a set of hyperbolic events to be used as our MDC kernel
30+
31+
# Input parameters
32+
par = {
33+
"ox": -300,
34+
"dx": 10,
35+
"nx": 61,
36+
"oy": -500,
37+
"dy": 10,
38+
"ny": 101,
39+
"ot": 0,
40+
"dt": 0.004,
41+
"nt": 400,
42+
"f0": 20,
43+
"nfmax": 200,
44+
}
45+
46+
t0_m = 0.2
47+
vrms_m = 1100.0
48+
amp_m = 1.0
49+
50+
t0_G = (0.2, 0.5, 0.7)
51+
vrms_G = (1200.0, 1500.0, 2000.0)
52+
amp_G = (1.0, 0.6, 0.5)
53+
54+
# Taper
55+
tap = taper3d(par["nt"], (par["ny"], par["nx"]), (5, 5), tapertype="hanning")
56+
57+
# Create axis
58+
t, t2, x, y = makeaxis(par)
59+
60+
# Create wavelet
61+
wav = ricker(t[:41], f0=par["f0"])[0]
62+
63+
# Generate model
64+
m, mwav = hyperbolic2d(x, t, t0_m, vrms_m, amp_m, wav)
65+
66+
# Generate operator
67+
G, Gwav = np.zeros((par["ny"], par["nx"], par["nt"])), np.zeros(
68+
(par["ny"], par["nx"], par["nt"])
69+
)
70+
for iy, y0 in enumerate(y):
71+
G[iy], Gwav[iy] = hyperbolic2d(x - y0, t, t0_G, vrms_G, amp_G, wav)
72+
G, Gwav = G * tap, Gwav * tap
73+
74+
# Add negative part to data and model
75+
m = np.concatenate((np.zeros((par["nx"], par["nt"] - 1)), m), axis=-1)
76+
mwav = np.concatenate((np.zeros((par["nx"], par["nt"] - 1)), mwav), axis=-1)
77+
Gwav2 = np.concatenate((np.zeros((par["ny"], par["nx"], par["nt"] - 1)), Gwav), axis=-1)
78+
79+
# Move to frequency
80+
Gwav_fft = np.fft.rfft(Gwav2, 2 * par["nt"] - 1, axis=-1)
81+
Gwav_fft = Gwav_fft[..., : par["nfmax"]]
82+
83+
# Move frequency/time to first axis
84+
m, mwav = m.T, mwav.T
85+
Gwav_fft = Gwav_fft.transpose(2, 0, 1)
86+
87+
###############################################################################
88+
# Now that we have created the kernel of our MDC operator in ``Gwav_fft``, we
89+
# are ready to define a strategy on how to split it along the first
90+
# (i.e., frequency) axis over different ranks. In practical applications, one
91+
# would of course pre-compute the kernel and just load the relevant part in
92+
# each rank from file.
93+
94+
# Choose how to split sources to ranks
95+
nf = par["nfmax"]
96+
nf_rank = local_split((nf, ), MPI.COMM_WORLD, Partition.SCATTER, 0)
97+
nf_ranks = np.concatenate(MPI.COMM_WORLD.allgather(nf_rank))
98+
ifin_rank = np.insert(np.cumsum(nf_ranks)[:-1] , 0, 0)[rank]
99+
ifend_rank = np.cumsum(nf_ranks)[rank]
100+
print(f'Rank: {rank}, nf: {nf_rank}, ifin: {ifin_rank}, ifend: {ifend_rank}')
101+
102+
# Extract part of kernel of interest
103+
G = Gwav_fft[ifin_rank:ifend_rank].astype(cdtype)
104+
print(f'Rank: {rank}, G: {G.shape}')
105+
106+
###############################################################################
107+
# We can finally create the MDC operator using
108+
# :py:class:`pylops_mpi.waveeqprocessing.MPIMDC` so that the most
109+
# demanding computations can be run in parallel.
110+
111+
# Define operator
112+
Fop = pylops_mpi.waveeqprocessing.MPIMDC(
113+
G, nt=2 * par["nt"] - 1, nv=1, nfreq=nf,
114+
dt=0.004, dr=1.0, twosided=True)
115+
116+
# Apply forward
117+
md = pylops_mpi.DistributedArray(global_shape=(2 * par["nt"] - 1) * par["nx"] * 1,
118+
partition=pylops_mpi.Partition.BROADCAST,
119+
dtype=dtype)
120+
md[:] = m.astype(dtype).ravel()
121+
122+
dd = Fop @ md
123+
d = dd.asarray().real
124+
d = d.reshape(2 * par["nt"] - 1, par["ny"])
125+
126+
# Apply adjoint
127+
madjd = Fop.H @ dd
128+
madj = madjd.asarray().real
129+
madj = madj.reshape(2 * par["nt"] - 1, par["nx"])
130+
131+
###############################################################################
132+
# Finally let's display input model, data and adjoint model
133+
134+
if rank == 0:
135+
fig, axs = plt.subplots(1, 3, figsize=(9, 6))
136+
axs[0].imshow(
137+
mwav,
138+
aspect="auto",
139+
interpolation="nearest",
140+
cmap="gray",
141+
vmin=-mwav.max(),
142+
vmax=mwav.max(),
143+
extent=(x.min(), x.max(), t2.max(), t2.min()),
144+
)
145+
axs[0].set_title(r"$m$", fontsize=15)
146+
axs[0].set_xlabel("r")
147+
axs[0].set_ylabel("t")
148+
axs[1].imshow(
149+
d,
150+
aspect="auto",
151+
interpolation="nearest",
152+
cmap="gray",
153+
vmin=-d.max(),
154+
vmax=d.max(),
155+
extent=(x.min(), x.max(), t2.max(), t2.min()),
156+
)
157+
axs[1].set_title(r"$d$", fontsize=15)
158+
axs[1].set_xlabel("s")
159+
axs[1].set_ylabel("t")
160+
axs[2].imshow(
161+
madj,
162+
aspect="auto",
163+
interpolation="nearest",
164+
cmap="gray",
165+
vmin=-madj.max(),
166+
vmax=madj.max(),
167+
extent=(x.min(), x.max(), t2.max(), t2.min()),
168+
)
169+
axs[2].set_title(r"$m_{adj}$", fontsize=15)
170+
axs[2].set_xlabel("s")
171+
axs[2].set_ylabel("t")
172+
fig.tight_layout()

pylops_mpi/__init__.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,9 @@
55
from . import (
66
basicoperators,
77
optimization,
8-
plotting
8+
plotting,
9+
signalprocessing,
10+
waveeqprocessing,
911
)
1012
from .plotting.plotting import *
1113
from .optimization.basic import *

pylops_mpi/basicoperators/__init__.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
Basic Linear Operators using MPI
33
================================
44
5-
The subpackage basicoperators extends some of the basic linear algebra
6-
operations provided by numpy providing forward and adjoint functionalities
7-
using MPI.
5+
The subpackage basicoperators extends some of the basic operations
6+
provided by pylops.basicoperators providing forward and adjoint
7+
functionalities using MPI.
88
9-
A list of operators present in pylops_mpi.basicoperators :
9+
A list of operators present in pylops_mpi.basicoperators:
1010
MPIBlockDiag Block Diagonal arrangement of PyLops operators
1111
MPIStackedBlockDiag Block Diagonal arrangement of PyLops-MPI operators
1212
MPIVStack Vertical Stacking of PyLops operators
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
import numpy as np
2+
3+
from mpi4py import MPI
4+
from pylops.utils.backend import get_module
5+
from pylops.utils.typing import DTypeLike, NDArray
6+
7+
from pylops_mpi import (
8+
DistributedArray,
9+
MPILinearOperator,
10+
Partition
11+
)
12+
13+
14+
class MPIFredholm1(MPILinearOperator):
15+
r"""Fredholm integral of first kind.
16+
17+
Implement a multi-dimensional Fredholm integral of first kind distributed
18+
across the first dimension
19+
20+
Parameters
21+
----------
22+
G : :obj:`numpy.ndarray`
23+
Multi-dimensional convolution kernel of size
24+
:math:`[n_{\text{slice}} \times n_x \times n_y]`
25+
nz : :obj:`int`, optional
26+
Additional dimension of model
27+
saveGt : :obj:`bool`, optional
28+
Save ``G`` and ``G.H`` to speed up the computation of adjoint
29+
(``True``) or create ``G.H`` on-the-fly (``False``)
30+
Note that ``saveGt=True`` will double the amount of required memory
31+
usematmul : :obj:`bool`, optional
32+
Use :func:`numpy.matmul` (``True``) or for-loop with :func:`numpy.dot`
33+
(``False``). As it is not possible to define which approach is more
34+
performant (this is highly dependent on the size of ``G`` and input
35+
arrays as well as the hardware used in the computation), we advise users
36+
to time both methods for their specific problem prior to making a
37+
choice.
38+
base_comm : :obj:`mpi4py.MPI.Comm`, optional
39+
MPI Base Communicator. Defaults to ``mpi4py.MPI.COMM_WORLD``.
40+
dtype : :obj:`str`, optional
41+
Type of elements in input array.
42+
43+
Attributes
44+
----------
45+
shape : :obj:`tuple`
46+
Operator shape
47+
48+
Raises
49+
------
50+
NotImplementedError
51+
If the size of the first dimension of ``G`` is equal to 1 in any of the ranks
52+
53+
Notes
54+
-----
55+
A multi-dimensional Fredholm integral of first kind can be expressed as
56+
57+
.. math::
58+
59+
d(k, x, z) = \int{G(k, x, y) m(k, y, z) \,\mathrm{d}y}
60+
\quad \forall k=1,\ldots,n_{slice}
61+
62+
on the other hand its adjoint is expressed as
63+
64+
.. math::
65+
66+
m(k, y, z) = \int{G^*(k, y, x) d(k, x, z) \,\mathrm{d}x}
67+
\quad \forall k=1,\ldots,n_{\text{slice}}
68+
69+
This integral is implemented in a distributed fashion, where ``G``
70+
is split across ranks along its first dimension. The inputs
71+
of both the forward and adjoint are distributed arrays with broadcast partion:
72+
each rank takes a portion of such arrays, computes a partial integral, and
73+
the resulting outputs are then gathered by all ranks to return a
74+
distributed arrays with broadcast partion.
75+
76+
"""
77+
78+
def __init__(
79+
self,
80+
G: NDArray,
81+
nz: int = 1,
82+
saveGt: bool = False,
83+
usematmul: bool = True,
84+
base_comm: MPI.Comm = MPI.COMM_WORLD,
85+
dtype: DTypeLike = "float64",
86+
) -> None:
87+
self.nz = nz
88+
self.nsl, self.nx, self.ny = G.shape
89+
self.nsls = base_comm.allgather(self.nsl)
90+
if base_comm.Get_rank() == 0 and 1 in self.nsls:
91+
raise NotImplementedError(f'All ranks must have at least 2 or more '
92+
f'elements in the first dimension: '
93+
f'local split is instead {self.nsls}...')
94+
nslstot = base_comm.allreduce(self.nsl)
95+
self.islstart = np.insert(np.cumsum(self.nsls)[:-1], 0, 0)
96+
self.islend = np.cumsum(self.nsls)
97+
self.rank = base_comm.Get_rank()
98+
self.dims = (nslstot, self.ny, self.nz)
99+
self.dimsd = (nslstot, self.nx, self.nz)
100+
shape = (np.prod(self.dimsd),
101+
np.prod(self.dims))
102+
super().__init__(shape=shape, dtype=np.dtype(dtype), base_comm=base_comm)
103+
104+
self.G = G
105+
if saveGt:
106+
self.GT = G.transpose((0, 2, 1)).conj()
107+
self.usematmul = usematmul
108+
109+
def _matvec(self, x: DistributedArray) -> DistributedArray:
110+
ncp = get_module(x.engine)
111+
if x.partition is not Partition.BROADCAST:
112+
raise ValueError(f"x should have partition={Partition.BROADCAST}, {x.partition} != {Partition.BROADCAST}")
113+
y = DistributedArray(global_shape=self.shape[0], partition=Partition.BROADCAST,
114+
engine=x.engine, dtype=self.dtype)
115+
x = x.local_array.reshape(self.dims).squeeze()
116+
x = x[self.islstart[self.rank]:self.islend[self.rank]]
117+
# apply matmul for portion of the rank of interest
118+
if self.usematmul:
119+
if self.nz == 1:
120+
x = x[..., ncp.newaxis]
121+
y1 = ncp.matmul(self.G, x)
122+
else:
123+
y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.nx, self.nz), dtype=self.dtype))
124+
for isl in range(self.nsls[self.rank]):
125+
y1[isl] = ncp.dot(self.G[isl], x[isl])
126+
# gather results
127+
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
128+
return y
129+
130+
def _rmatvec(self, x: NDArray) -> NDArray:
131+
ncp = get_module(x.engine)
132+
if x.partition is not Partition.BROADCAST:
133+
raise ValueError(f"x should have partition={Partition.BROADCAST}, {x.partition} != {Partition.BROADCAST}")
134+
y = DistributedArray(global_shape=self.shape[1], partition=Partition.BROADCAST,
135+
engine=x.engine, dtype=self.dtype)
136+
x = x.local_array.reshape(self.dimsd).squeeze()
137+
x = x[self.islstart[self.rank]:self.islend[self.rank]]
138+
# apply matmul for portion of the rank of interest
139+
if self.usematmul:
140+
if self.nz == 1:
141+
x = x[..., ncp.newaxis]
142+
if hasattr(self, "GT"):
143+
y1 = ncp.matmul(self.GT, x)
144+
else:
145+
y1 = (
146+
ncp.matmul(x.transpose(0, 2, 1).conj(), self.G)
147+
.transpose(0, 2, 1)
148+
.conj()
149+
)
150+
else:
151+
y1 = ncp.squeeze(ncp.zeros((self.nsls[self.rank], self.ny, self.nz), dtype=self.dtype))
152+
if hasattr(self, "GT"):
153+
for isl in range(self.nsls[self.rank]):
154+
y1[isl] = ncp.dot(self.GT[isl], x[isl])
155+
else:
156+
for isl in range(self.nsl):
157+
y1[isl] = ncp.dot(x[isl].T.conj(), self.G[isl]).T.conj()
158+
159+
# gather results
160+
y[:] = np.vstack(self.base_comm.allgather(y1)).ravel()
161+
return y

0 commit comments

Comments
 (0)