Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions pylops_mpi/DistributedArray.py
Original file line number Diff line number Diff line change
Expand Up @@ -694,14 +694,25 @@ def _compute_vector_norm(self, local_array: NDArray,
recv_buf = self._allreduce_subcomm(ncp.count_nonzero(local_array, axis=axis).astype(ncp.float64))
elif ord == ncp.inf:
# Calculate max followed by max reduction
recv_buf = self._allreduce_subcomm(ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64),
recv_buf, op=MPI.MAX)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
# TODO (tharitt): currently CuPy + MPI does not work well with buffered communication, particularly
# with MAX, MIN operator. Here we copy the array back to CPU, transfer, and copy them back to GPUs
send_buf = ncp.max(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MAX)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MAX)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
elif ord == -ncp.inf:
# Calculate min followed by min reduction
recv_buf = self._allreduce_subcomm(ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64),
recv_buf, op=MPI.MIN)
recv_buf = ncp.squeeze(recv_buf, axis=axis)
# TODO (tharitt): see the comment above in infinity norm
send_buf = ncp.min(ncp.abs(local_array), axis=axis).astype(ncp.float64)
if self.engine == "cupy" and self.base_comm_nccl is None:
recv_buf = self._allreduce_subcomm(send_buf.get(), recv_buf.get(), op=MPI.MIN)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))
else:
recv_buf = self._allreduce_subcomm(send_buf, recv_buf, op=MPI.MIN)
recv_buf = ncp.asarray(ncp.squeeze(recv_buf, axis=axis))

else:
recv_buf = self._allreduce_subcomm(ncp.sum(ncp.abs(ncp.float_power(local_array, ord)), axis=axis))
Expand Down
6 changes: 4 additions & 2 deletions pylops_mpi/basicoperators/MatrixMult.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ def _matvec(self, x: DistributedArray) -> DistributedArray:
mask=x.mask,
partition=Partition.SCATTER,
dtype=self.dtype,
base_comm=self.base_comm
base_comm=self.base_comm,
engine=x.engine
)

my_own_cols = self._rank_col_lens[self.rank]
Expand All @@ -257,7 +258,8 @@ def _rmatvec(self, x: DistributedArray) -> DistributedArray:
mask=x.mask,
partition=Partition.SCATTER,
dtype=self.dtype,
base_comm=self.base_comm
base_comm=self.base_comm,
engine=x.engine
)

x_arr = x.local_array.reshape((self.N, self._local_ncols)).astype(self.dtype)
Expand Down
34 changes: 26 additions & 8 deletions tests/test_blockdiag.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,19 @@
Designed to run with n processes
$ mpiexec -n 10 pytest test_blockdiag.py --with-mpi
"""
import os

if int(os.environ.get("TEST_CUPY_PYLOPS", 0)):
import cupy as np
from cupy.testing import assert_allclose

backend = "cupy"
else:
import numpy as np
from numpy.testing import assert_allclose

backend = "numpy"
from mpi4py import MPI
import numpy as np
from numpy.testing import assert_allclose
import pytest

import pylops
Expand All @@ -17,6 +27,14 @@
par2j = {'ny': 301, 'nx': 101, 'dtype': np.complex128}

np.random.seed(42)
rank = MPI.COMM_WORLD.Get_rank()
if backend == "cupy":
device_count = np.cuda.runtime.getDeviceCount()
device_id = int(
os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK")
or rank % np.cuda.runtime.getDeviceCount()
)
np.cuda.Device(device_id).use()


@pytest.mark.mpi(min_size=2)
Expand All @@ -27,11 +45,11 @@ def test_blockdiag(par):
Op = pylops.MatrixMult(A=((rank + 1) * np.ones(shape=(par['ny'], par['nx']))).astype(par['dtype']))
BDiag_MPI = pylops_mpi.MPIBlockDiag(ops=[Op, ])

x = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'], engine=backend)
x[:] = np.ones(shape=par['nx'], dtype=par['dtype'])
x_global = x.asarray()

y = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'])
y = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'], engine=backend)
y[:] = np.ones(shape=par['ny'], dtype=par['dtype'])
y_global = y.asarray()

Expand Down Expand Up @@ -68,16 +86,16 @@ def test_stacked_blockdiag(par):
FirstDeriv_MPI = pylops_mpi.MPIFirstDerivative(dims=(par['ny'], par['nx']), dtype=par['dtype'])
StackedBDiag_MPI = pylops_mpi.MPIStackedBlockDiag(ops=[BDiag_MPI, FirstDeriv_MPI])

dist1 = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'])
dist1 = pylops_mpi.DistributedArray(global_shape=size * par['nx'], dtype=par['dtype'], engine=backend)
dist1[:] = np.ones(dist1.local_shape, dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'], engine=backend)
dist2[:] = np.ones(dist2.local_shape, dtype=par['dtype'])
x = pylops_mpi.StackedDistributedArray(distarrays=[dist1, dist2])
x_global = x.asarray()

dist1 = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'])
dist1 = pylops_mpi.DistributedArray(global_shape=size * par['ny'], dtype=par['dtype'], engine=backend)
dist1[:] = np.ones(dist1.local_shape, dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'])
dist2 = pylops_mpi.DistributedArray(global_shape=par['nx'] * par['ny'], dtype=par['dtype'], engine=backend)
dist2[:] = np.ones(dist2.local_shape, dtype=par['dtype'])
y = pylops_mpi.StackedDistributedArray(distarrays=[dist1, dist2])
y_global = y.asarray()
Expand Down
73 changes: 46 additions & 27 deletions tests/test_derivative.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,20 @@
Designed to run with n processes
$ mpiexec -n 10 pytest test_derivative.py --with-mpi
"""
import numpy as np
import os

if int(os.environ.get("TEST_CUPY_PYLOPS", 0)):
import cupy as np
from cupy.testing import assert_allclose

backend = "cupy"
else:
import numpy as np
from numpy.testing import assert_allclose

backend = "numpy"
import numpy as npp
from mpi4py import MPI
from numpy.testing import assert_allclose
import pytest

import pylops
Expand All @@ -14,6 +25,14 @@
np.random.seed(42)
rank = MPI.COMM_WORLD.Get_rank()
size = MPI.COMM_WORLD.Get_size()
if backend == "cupy":
device_count = np.cuda.runtime.getDeviceCount()
device_id = int(
os.environ.get("OMPI_COMM_WORLD_LOCAL_RANK")
or rank % np.cuda.runtime.getDeviceCount()
)
np.cuda.Device(device_id).use()


par1 = {
"nz": 600,
Expand Down Expand Up @@ -189,8 +208,8 @@ def test_first_derivative_forward(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="forward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -200,7 +219,7 @@ def test_first_derivative_forward(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -223,8 +242,8 @@ def test_first_derivative_backward(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="backward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -234,7 +253,7 @@ def test_first_derivative_backward(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -258,8 +277,8 @@ def test_first_derivative_centered(par):
Fop_MPI = pylops_mpi.MPIFirstDerivative(dims=par['nz'], sampling=par['dz'],
kind="centered", edge=par['edge'],
order=order, dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -269,7 +288,7 @@ def test_first_derivative_centered(par):
y_adj_dist = Fop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Fop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Fop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Fop = pylops.FirstDerivative(dims=par['nz'], axis=0,
Expand All @@ -292,8 +311,8 @@ def test_second_derivative_forward(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="forward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -303,7 +322,7 @@ def test_second_derivative_forward(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -326,8 +345,8 @@ def test_second_derivative_backward(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="backward", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -337,7 +356,7 @@ def test_second_derivative_backward(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -360,8 +379,8 @@ def test_second_derivative_centered(par):
Sop_MPI = pylops_mpi.basicoperators.MPISecondDerivative(dims=par['nz'], sampling=par['dz'],
kind="centered", edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['nz']), dtype=par['dtype'],
partition=par['partition'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -371,7 +390,7 @@ def test_second_derivative_centered(par):
y_adj_dist = Sop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Sop_MPI, x, y_dist, np.prod(par['nz']), np.prod(par['nz']))
dottest(Sop_MPI, x, y_dist, npp.prod(par['nz']), npp.prod(par['nz']))

if rank == 0:
Sop = pylops.SecondDerivative(dims=par['nz'], axis=0,
Expand All @@ -394,7 +413,7 @@ def test_laplacian(par):
weights=par['weights'], sampling=par['sampling'],
kind=kind, edge=par['edge'],
dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=np.prod(par['n']), dtype=par['dtype'])
x = pylops_mpi.DistributedArray(global_shape=npp.prod(par['n']), dtype=par['dtype'], engine=backend)
x[:] = np.random.normal(rank, 10, x.local_shape)
x_global = x.asarray()
# Forward
Expand All @@ -404,7 +423,7 @@ def test_laplacian(par):
y_adj_dist = Lop_MPI.H @ x
y_adj = y_adj_dist.asarray()
# Dot test
dottest(Lop_MPI, x, y_dist, np.prod(par['n']), np.prod(par['n']))
dottest(Lop_MPI, x, y_dist, npp.prod(par['n']), npp.prod(par['n']))

if rank == 0:
Lop = pylops.Laplacian(dims=par['n'], axes=par['axes'],
Expand All @@ -426,7 +445,7 @@ def test_gradient(par):
Gop_MPI = pylops_mpi.basicoperators.MPIGradient(dims=par['n'], sampling=par['sampling'],
kind=kind, edge=par['edge'],
dtype=par['dtype'])
x_fwd = pylops_mpi.DistributedArray(global_shape=np.prod(par['n']), dtype=par['dtype'])
x_fwd = pylops_mpi.DistributedArray(global_shape=npp.prod(par['n']), dtype=par['dtype'], engine=backend)
x_fwd[:] = np.random.normal(rank, 10, x_fwd.local_shape)
x_global = x_fwd.asarray()

Expand All @@ -436,11 +455,11 @@ def test_gradient(par):
y = y_dist.asarray()

# Adjoint
x_adj_dist1 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist1 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist1[:] = np.random.normal(rank, 10, x_adj_dist1.local_shape)
x_adj_dist2 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist2 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist2[:] = np.random.normal(rank, 20, x_adj_dist2.local_shape)
x_adj_dist3 = pylops_mpi.DistributedArray(global_shape=int(np.prod(par['n'])), dtype=par['dtype'])
x_adj_dist3 = pylops_mpi.DistributedArray(global_shape=int(npp.prod(par['n'])), dtype=par['dtype'], engine=backend)
x_adj_dist3[:] = np.random.normal(rank, 30, x_adj_dist3.local_shape)
x_adj = pylops_mpi.StackedDistributedArray(distarrays=[x_adj_dist1, x_adj_dist2, x_adj_dist3])
x_adj_global = x_adj.asarray()
Expand All @@ -449,7 +468,7 @@ def test_gradient(par):
y_adj = y_adj_dist.asarray()

# Dot test
dottest(Gop_MPI, x_fwd, y_dist, len(par['n']) * np.prod(par['n']), np.prod(par['n']))
dottest(Gop_MPI, x_fwd, y_dist, len(par['n']) * npp.prod(par['n']), npp.prod(par['n']))

if rank == 0:
Gop = pylops.Gradient(dims=par['n'], sampling=par['sampling'],
Expand Down
Loading
Loading