Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 77 additions & 2 deletions doc/source/analyzing/parallel_computation.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Currently, yt is able to perform the following actions in parallel:
* Halo analysis (:ref:`halo-analysis`)
* Volume rendering (:ref:`volume_rendering`)
* Isocontours & flux calculations (:ref:`extracting-isocontour-information`)
* Parallelization over data containers (:ref:`_Data-objects`)

This list covers just about every action yt can take! Additionally, almost all
scripts will benefit from parallelization with minimal modification. The goal
Expand Down Expand Up @@ -219,6 +220,7 @@ The following operations use chunk decomposition:
* Derived Quantities (see :ref:`derived-quantities`)
* 1-, 2-, and 3-D profiles (see :ref:`generating-profiles-and-histograms`)
* Isocontours & flux calculations (see :ref:`surfaces`)
* Parallelization over data containers using ``piter`` (see :ref:`_Data-objects`)

Parallelization over Multiple Objects and Datasets
++++++++++++++++++++++++++++++++++++++++++++++++++
Expand All @@ -229,8 +231,81 @@ independently to several different objects or datasets, a so-called
task, yt can do that easily. See the sections below on
:ref:`parallelizing-your-analysis` and :ref:`parallel-time-series-analysis`.

Use of ``piter()``
^^^^^^^^^^^^^^^^^^
Use of ``piter()`` on a data container
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

You can parallelize I/O on any data container using the
:func:`~yt.data_objects.selection_objects.data_selection_objects.YTSelectionContainer.piter` function.
It dispatches all the chunks that make up the data container to different processors, and gives you the ability to store results from each chunk.

Take the following example

.. code-block:: python

import yt

yt.enable_parallelism()

ds = yt.load("output_00080")
ad = ds.all_data()

# Each processor reads a different chunk
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage):
sto.result = {}
sto.result["gas", "density"] = chunk["gas", "density"]

my_storage # now contains one entry per chunk
my_storage[0]["gas", "density"] # contains the density for chunk 0
...
my_storage[15]["gas", "density"] # contains the density for chunk 15 (out of 15)


Sometimes, it is also useful to combine the results. This is implemented using an optional ``reduction`` keyword argument to the
:func:`~yt.data_objects.selection_objects.data_selection_objects.YTSelectionContainer.piter` function.
The following reductions are implemented:
- ``None`` (default): no reduction, ``my_storage`` will contain one entry per chunk
- ``"cat"``: concatenate the results from all chunks, ``my_storage`` will contain a flattened list of all results
- ``"cat_on_root"``: concatenate the results from all chunks, but only on the root process. The other processes will have ``None`` in ``my_storage``
- ``"sum"``, ``"min"`` or ``"max"``: reduce the results from all chunks using the specified operation, ``my_storage`` will contain the reduced result on all processes

.. code-block:: python

# Gathering everything on root
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="cat_on_root"):
sto.result = {}
sto.result["gas", "density"] = chunk["gas", "density"]

# On root:
if yt.is_root():
my_storage["gas", "density"] == ad["gas", "density"] # True for all entries
else:
my_storage["gas", "density"] is None

# Gather everything on all processes
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="cat"):
sto.result = {}
sto.result["gas", "density"] = chunk["gas", "density"]

my_storage["gas", "density"] == ad[
"gas", "density"
] # True for all entries on all processes

# Summing over all chunks
my_storage = {}
for sto, chunk in ad.piter(storage=my_storage, reduction="sum"):
sto.result = {}
sto.result["gas", "total_mass"] = chunk["gas", "cell_mass"].sum()
sto.result["gas", "total_volume"] = chunk["gas", "cell_volume"].sum()

my_storage["gas", "total_mass"] # contains the total mass
my_storage["gas", "total_volume"] # contains the total volume


Use of ``piter()`` over objects and datasets
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

If you use parallelism over objects or datasets, you will encounter
the :func:`~yt.data_objects.time_series.DatasetSeries.piter` function.
Expand Down
103 changes: 103 additions & 0 deletions yt/data_objects/selection_objects/data_selection_objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from yt.utilities.logger import ytLogger as mylog
from yt.utilities.parallel_tools.parallel_analysis_interface import (
ParallelAnalysisInterface,
parallel_objects,
)

if sys.version_info >= (3, 11):
Expand Down Expand Up @@ -535,6 +536,108 @@ def min_level(self, value):
self._current_chunk = None
self._min_level = value

def piter(
self,
njobs: int = 0,
storage: dict | None = None,
barrier: bool = True,
dynamic: bool = False,
reduction: Literal[None, "sum", "max", "min", "cat", "cat_on_root"] = None,
Copy link
Contributor

@chrishavlin chrishavlin Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ya know, with these changes, i don't think it'd be too hard to allow dynamic MPI reduction operations (i.e, https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Op.html#mpi4py-mpi-op ) to allow arbitrary reductions in parallel. that'd be neat! (obviously not something to do in this PR !)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point; I actually thought about doing exactly this and then gave up for the sake of converging!

):
"""Iterate in parallel over data in the container.

Parameters
----------
njobs : int
How many jobs to spawn. By default, one job will be dispatched for
each available processor.
storage : dict
This is a dictionary, which will be filled with results during the
course of the iteration. The keys will be the dataset
indices and the values will be whatever is assigned to the *result*
attribute on the storage during iteration.
barrier : bool
Should a barrier be placed at the end of iteration?
dynamic : bool
This governs whether or not dynamic load balancing will be enabled.
This requires one dedicated processor; if this is enabled with a set of
128 processors available, only 127 will be available to iterate over
objects as one will be load balancing the rest.
reduction : Literal[None, "sum", "max", "min", "cat", "cat_on_root"]
This specifies the reduction operation to be applied to the results
from each processor.
- None: no reduction will be applied and the storage object will
contain one result per chunk in the container.
- cat: the storage object will contain a flattened list of
each result.
- cat_on_root: same as cat, but only the root processor will
contain anything.
- sum, min, max: the storage object will contain the result
of applying the operation until getting a single value.

Important limitation
--------------------
When using `storage`, the result *must* be a dictionary. See the
examples below.

Example
-------

Here is an example of how to gather all data on root, reading in
parallel. Other MPI tasks will have nothing in `my_storage`.

>>> import yt
>>> ds = yt.load("output_00080")
... ad = ds.all_data()
>>> my_storage = {}
... for sto, chunk in ad.piter(storage=my_storage, reduction="cat_on_root"):
... sto.result = {
... ("gas", "density"): chunk["gas", "density"],
... ("gas", "temperature"): chunk["gas", "temperature"],
... }
... if yt.is_root():
... # Contains *all* the gas densities
... my_storage["gas", "density"]
... # Contains *all* the gas temperatures
... my_storage["gas", "temperature"]

Here is an example of how to sum the total mass of all gas cells in
the dataset, storing the result in `my_storage` on all processors.

>>> my_storage = {}
... for sto, chunk in ad.piter(storage=my_storage, reduction="sum"):
... sto.result = {
... "total_mass": chunk["gas", "cell_mass"].sum(),
... }
... print("Total mass: ", my_storage["total_mass"])

Here is an example of how to read all data in parallel and
have the results available on all processors.

>>> my_storage = {}
... for sto, chunk in ad.piter(storage=my_storage, reduction="cat"):
... sto.result = {("gas", "density"): chunk["gas", "density"]}
... print(my_storage["gas", "density"])

This is equivalent (but faster, since reading is parallelized) to the
following

>>> my_storage = {("gas", "density"): ad["gas", "density"]}
"""
if reduction is not None and storage is None:
raise ValueError(
"If reduction is specified, you must pass in a storage dictionary."
)

yield from parallel_objects(
self.chunks([], "io"),
njobs=njobs,
storage=storage,
barrier=barrier,
dynamic=dynamic,
reduction=reduction,
)


class YTSelectionContainer0D(YTSelectionContainer):
_spatial = False
Expand Down
11 changes: 11 additions & 0 deletions yt/frontends/ramses/data_structures.py
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,17 @@ def _identify_base_chunk(self, dobj):
mylog.info("Identified %s intersecting domains", len(domains))
base_region = getattr(dobj, "base_region", dobj)

# In case of MPI parallelism, we need to keep the intersection
# of all domains across all ranks
if self.comm.size > 1:
idomains = {dom.domain_id for dom in domains}
idomains = self.comm.comm.allreduce(
set(idomains), op=lambda a, b: a & b
)

# Keep domains that every rank has
domains = [dom for dom in domains if dom.domain_id in idomains]

subsets = [
RAMSESDomainSubset(
base_region,
Expand Down
Loading
Loading