Skip to content

Commit 302bd4b

Browse files
authored
Merge branch 'main' into actual-SUMMA
2 parents 0c5cb7e + d1ae9b1 commit 302bd4b

File tree

12 files changed

+371
-7
lines changed

12 files changed

+371
-7
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,10 +53,11 @@ tests:
5353
tests_nccl:
5454
mpiexec -n $(NUM_PROCESSES) pytest tests_nccl/ --with-mpi
5555

56+
# sphinx-build does not work well with NCCL
5657
doc:
5758
cd docs && rm -rf source/api/generated && rm -rf source/gallery &&\
5859
rm -rf source/tutorials && rm -rf build &&\
59-
cd .. && sphinx-build -b html docs/source docs/build
60+
cd .. && NCCL_PYLOPS_MPI=0 sphinx-build -b html docs/source docs/build
6061

6162
doc_cupy:
6263
cp tutorials_cupy/* tutorials/

docs/source/api/index.rst

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,11 @@ Utils
127127
local_block_split
128128
active_grid_comm
129129

130-
131-
.. currentmodule:: pylops_mpi.utils.dottest
130+
.. currentmodule:: pylops_mpi.utils
132131

133132
.. autosummary::
134133
:toctree: generated/
135134

136-
dottest
135+
dottest
136+
benchmark
137+
mark

docs/source/benchmarking.rst

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
.. _benchmarkutility:
2+
3+
Benchmarking
4+
============
5+
6+
PyLops-MPI users can convenienly benchmark the performance of their code with a simple decorator.
7+
:py:func:`pylops_mpi.utils.benchmark` and :py:func:`pylops_mpi.utils.mark` support various
8+
function calling patterns that may arise when benchmarking distributed code.
9+
10+
- :py:func:`pylops_mpi.utils.benchmark` is a **decorator** used to time the execution of entire functions.
11+
- :py:func:`pylops_mpi.utils.mark` is a **function** used inside decorated functions to insert fine-grained time measurements.
12+
13+
.. note::
14+
This benchmark utility is enabled by default i.e., if the user decorates the function with :py:func:`@benchmark`, the function will go through
15+
the time measurements, adding overheads. Users can turn off the benchmark while leaving the decorator in-place with
16+
17+
.. code-block:: bash
18+
19+
>> export BENCH_PYLOPS_MPI=0
20+
21+
The usage can be as simple as:
22+
23+
.. code-block:: python
24+
25+
@benchmark
26+
def function_to_time():
27+
# Your computation
28+
29+
The result will print out to the standard output.
30+
For fine-grained time measurements, :py:func:`pylops_mpi.utils.mark` can be inserted in the code region of benchmarked functions:
31+
32+
.. code-block:: python
33+
34+
@benchmark
35+
def funtion_to_time():
36+
# You computation that you may want to ignore it in benchmark
37+
mark("Begin Region")
38+
# You computation
39+
mark("Finish Region")
40+
41+
You can also nest benchmarked functions to track execution times across layers of function calls with the output being correctly formatted.
42+
Additionally, the result can also be exported to the text file. For completed and runnable examples, visit :ref:`sphx_glr_tutorials_benchmarking.py`

docs/source/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ class and implementing the ``_matvec`` and ``_rmatvec``.
7676
self
7777
installation.rst
7878
gpu.rst
79+
benchmarking.rst
7980

8081
.. toctree::
8182
:maxdepth: 2

pylops_mpi/DistributedArray.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -452,7 +452,7 @@ def _check_local_shapes(self, local_shapes):
452452
elif self.partition is Partition.SCATTER:
453453
local_shape = local_shapes[self.rank]
454454
# Check if local shape sum up to global shape and other dimensions align with global shape
455-
if self._allreduce(local_shape[self.axis]) != self.global_shape[self.axis] or \
455+
if self.base_comm.allreduce(local_shape[self.axis]) != self.global_shape[self.axis] or \
456456
not np.array_equal(np.delete(local_shape, self.axis), np.delete(self.global_shape, self.axis)):
457457
raise ValueError(f"Local shapes don't align with the global shape;"
458458
f"{local_shapes} != {self.global_shape}")

pylops_mpi/utils/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
# isort: skip_file
22

3+
from .benchmark import *
34
from .dottest import *
45
from .deps import *

pylops_mpi/utils/_nccl.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
__all__ = [
22
"_prepare_nccl_allgather_inputs",
33
"_unroll_nccl_allgather_recv",
4+
"_nccl_sync",
45
"initialize_nccl_comm",
56
"nccl_split",
67
"nccl_allgather",
@@ -19,7 +20,6 @@
1920
import cupy as cp
2021
import cupy.cuda.nccl as nccl
2122

22-
2323
cupy_to_nccl_dtype = {
2424
"float32": nccl.NCCL_FLOAT32,
2525
"float64": nccl.NCCL_FLOAT64,
@@ -63,6 +63,13 @@ def _nccl_buf_size(buf, count=None):
6363
return count if count else buf.size
6464

6565

66+
def _nccl_sync():
67+
"""A thin wrapper of CuPy's synchronization for protected import"""
68+
if cp.cuda.runtime.getDeviceCount() == 0:
69+
return
70+
cp.cuda.runtime.deviceSynchronize()
71+
72+
6673
def _prepare_nccl_allgather_inputs(send_buf, send_buf_shapes) -> Tuple[cp.ndarray, cp.ndarray]:
6774
r""" Prepare send_buf and recv_buf for NCCL allgather (nccl_allgather)
6875

pylops_mpi/utils/benchmark.py

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
__all__ = ["benchmark",
2+
"mark",
3+
]
4+
5+
import functools
6+
import logging
7+
import os
8+
import time
9+
from typing import Callable, Optional, List
10+
from mpi4py import MPI
11+
12+
from pylops.utils import deps as pylops_deps # avoid namespace crashes with pylops_mpi.utils
13+
from pylops_mpi.utils import deps
14+
15+
cupy_message = pylops_deps.cupy_import("the benchmark module")
16+
nccl_message = deps.nccl_import("the benchmark module")
17+
18+
if nccl_message is None and cupy_message is None:
19+
from pylops_mpi.utils._nccl import _nccl_sync
20+
else:
21+
def _nccl_sync():
22+
pass
23+
24+
# Benchmark is enabled by default
25+
ENABLE_BENCHMARK = int(os.getenv("BENCH_PYLOPS_MPI", 1)) == 1
26+
27+
# Stack of active mark functions for nested support
28+
_mark_func_stack = []
29+
_markers = []
30+
31+
32+
def _parse_output_tree(markers: List[str]):
33+
"""This function parses the list of strings gathered during the benchmark call and output them
34+
as one properly formatted string. The format of output string follows the hierarchy of function calls
35+
i.e., the nested funtion calls are indented.
36+
37+
Parameters
38+
----------
39+
markers: :obj:`list`, optional
40+
A list of markers/labels generated from the benchmark call
41+
"""
42+
global _markers
43+
output = []
44+
stack = []
45+
i = 0
46+
while i < len(markers):
47+
label, time, level = markers[i]
48+
if label.startswith("[decorator]"):
49+
indent = "\t" * (level - 1)
50+
output.append(f"{indent}{label}: total runtime: {time:6f} s\n")
51+
else:
52+
if stack:
53+
prev_label, prev_time, prev_level = stack[-1]
54+
if prev_level == level:
55+
indent = "\t" * level
56+
output.append(f"{indent}{prev_label}-->{label}: {time - prev_time:6f} s\n")
57+
stack.pop()
58+
59+
# Push to the stack only if it is going deeper or still at the same level
60+
if i + 1 <= len(markers) - 1:
61+
_, _ , next_level = markers[i + 1]
62+
if next_level >= level:
63+
stack.append(markers[i])
64+
i += 1
65+
# reset markers, allowing other benchmarked function to start fresh
66+
_markers = []
67+
return output
68+
69+
70+
def _sync():
71+
"""Synchronize all MPI processes or CUDA Devices"""
72+
_nccl_sync()
73+
MPI.COMM_WORLD.Barrier()
74+
75+
76+
def mark(label: str):
77+
"""This function allows users to measure time arbitary lines of the function
78+
79+
Parameters
80+
----------
81+
label: :obj:`str`
82+
A label of the mark. This signifies both 1) the end of the
83+
previous mark 2) the beginning of the new mark
84+
"""
85+
if not ENABLE_BENCHMARK:
86+
return
87+
if not _mark_func_stack:
88+
raise RuntimeError("mark() called outside of a benchmarked region")
89+
_mark_func_stack[-1](label)
90+
91+
92+
def benchmark(func: Optional[Callable] = None,
93+
description: Optional[str] = "",
94+
logger: Optional[logging.Logger] = None,
95+
):
96+
"""A wrapper for code injection for time measurement.
97+
98+
This wrapper measures the start-to-end time of the wrapped function when
99+
decorated without any argument.
100+
101+
It also allows users to put a call to mark() anywhere inside the wrapped function
102+
for fine-grain time benchmark. This wrapper defines the local_mark() and pushes it
103+
to the _mark_func_stack for isolation in case of nested call.
104+
The user-facing mark() will always call the function at the top of the _mark_func_stack.
105+
106+
Parameters
107+
----------
108+
func : :obj:`callable`, optional
109+
Function to be decorated. Defaults to ``None``.
110+
description : :obj:`str`, optional
111+
Description for the output text. Defaults to ``''``.
112+
logger: :obj:`logging.Logger`, optional
113+
A `logging.Logger` object for logging the benchmark text output. This logger must be setup before
114+
passing to this function to either writing output to a file or log to stdout. If `logger`
115+
is not provided, the output is printed to stdout.
116+
"""
117+
118+
def noop_decorator(func):
119+
@functools.wraps(func)
120+
def wrapped(*args, **kwargs):
121+
return func(*args, **kwargs)
122+
return wrapped
123+
124+
@functools.wraps(func)
125+
def decorator(func):
126+
def wrapper(*args, **kwargs):
127+
rank = MPI.COMM_WORLD.Get_rank()
128+
129+
level = len(_mark_func_stack) + 1
130+
# The header is needed for later tree parsing. Here it is allocating its spot.
131+
# the tuple at this index will be replaced after elapsed time is calculated.
132+
_markers.append((f"[decorator]{description or func.__name__}", None, level))
133+
header_index = len(_markers) - 1
134+
135+
def local_mark(label):
136+
_markers.append((label, time.perf_counter(), level))
137+
138+
_mark_func_stack.append(local_mark)
139+
140+
_sync()
141+
start_time = time.perf_counter()
142+
# the mark() called in wrapped function will now call local_mark
143+
result = func(*args, **kwargs)
144+
_sync()
145+
end_time = time.perf_counter()
146+
147+
elapsed = end_time - start_time
148+
_markers[header_index] = (f"[decorator]{description or func.__name__}", elapsed, level)
149+
150+
# In case of nesting, the wrapped callee must pop its closure from stack so that
151+
# when the callee returns, the wrapped caller operates on its closure (and its level label), which now becomes
152+
# the top of the stack.
153+
_mark_func_stack.pop()
154+
155+
# all the calls have fininshed
156+
if not _mark_func_stack:
157+
if rank == 0:
158+
output = _parse_output_tree(_markers)
159+
if logger:
160+
logger.info("".join(output))
161+
else:
162+
print("".join(output))
163+
return result
164+
return wrapper
165+
166+
# The code still has to return decorator so that the in-place decorator with arguments
167+
# like @benchmark(logger=logger) does not throw the error and can be kept untouched.
168+
if not ENABLE_BENCHMARK:
169+
return noop_decorator if func is None else noop_decorator(func)
170+
171+
return decorator if func is None else decorator(func)

0 commit comments

Comments
 (0)