Skip to content

Commit 77cbe4a

Browse files
authored
Add more unit tests in the get_app function, document chaining (#4014)
# Description I wanted to make sure that Parsl futures created with ParslPoolExecutor work with Parsl function chaining. As expected, they do. These changes make the compatibility between PPE and other Parsl routes clear in the documentation too # Changed Behaviour No changes in functionality. I did make a method public so the unit tests were clearer and open that route as an acceptable option in a workflow I'm tinkering with. # Fixes N/A ## Type of change - Update to human readable text: Documentation/error messages/comments
1 parent 165fdc5 commit 77cbe4a

File tree

4 files changed

+81
-7
lines changed

4 files changed

+81
-7
lines changed

docs/index.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ Parsl lets you chain functions together and will launch each function as inputs
3838
3939
Start with the `configuration quickstart <quickstart.html#getting-started>`_ to learn how to tell Parsl how to use your computing resource,
4040
see if `a template configuration for your supercomputer <userguide/configuration/examples.html>`_ is already available,
41-
then explore the `parallel computing patterns <userguide/workflows/workflows.html>`_ to determine how to use parallelism best in your application.
41+
then explore the `parallel computing patterns <userguide/workflows/workflow.html>`_ to determine how to use parallelism best in your application.
4242

4343
Parsl is an open-source code, and available on GitHub: https://github.com/parsl/parsl/
4444

docs/reference.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Core
1313
parsl.app.app.join_app
1414
parsl.dataflow.futures.AppFuture
1515
parsl.dataflow.dflow.DataFlowKernelLoader
16+
parsl.concurrent.ParslPoolExecutor
1617
parsl.dataflow.dependency_resolvers.DependencyResolver
1718
parsl.dataflow.dependency_resolvers.DEEP_DEPENDENCY_RESOLVER
1819
parsl.dataflow.dependency_resolvers.SHALLOW_DEPENDENCY_RESOLVER

parsl/concurrent/__init__.py

Lines changed: 52 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,26 @@ class ParslPoolExecutor(Executor, AbstractContextManager):
2323
2. Supplying an already-started Parsl :class:`~parsl.DataFlowKernel` (DFK).
2424
The executor assumes you will start and stop the Parsl DFK outside the Executor.
2525
26-
Note: Parsl does not support canceling tasks. The :meth:`map` method does not cancel work
26+
The futures returned by :meth:`submit` and :meth:`map` are Parsl futures and will work
27+
with the same function chaining mechanisms as when using Parsl with decorators.
28+
29+
.. code-block:: python
30+
31+
def f(x):
32+
return x + 1
33+
34+
@python_app
35+
def parity(x):
36+
return 'odd' if x % 2 == 1 else 'even'
37+
38+
with ParslPoolExecutor(config=my_parsl_config) as executor:
39+
future_1 = executor.submit(f, 1)
40+
assert parity(future_1) == 'even' # Function chaining, as expected
41+
42+
future_2 = executor.submit(f, future_1)
43+
assert future_2.result() == 3 # Chaining works with `submit` too
44+
45+
Parsl does not support canceling tasks. The :meth:`map` method does not cancel work
2746
when one member of the run fails or a timeout is reached
2847
and :meth:`shutdown` does not cancel work on completion.
2948
"""
@@ -63,7 +82,7 @@ def app_count(self):
6382
"""Number of functions currently registered with the executor"""
6483
return len(self._app_cache)
6584

66-
def _get_app(self, fn: Callable) -> PythonApp:
85+
def get_app(self, fn: Callable) -> PythonApp:
6786
"""Create a PythonApp for a function
6887
6988
Args:
@@ -78,19 +97,48 @@ def _get_app(self, fn: Callable) -> PythonApp:
7897
return app
7998

8099
def submit(self, fn, *args, **kwargs):
100+
"""Submits a callable to be executed with the given arguments.
101+
102+
Schedules the callable to be executed as ``fn(*args, **kwargs)`` and returns
103+
a Future instance representing the execution of the callable.
104+
105+
Returns:
106+
A Future representing the given call.
107+
"""
108+
81109
if self._dfk is None:
82110
raise RuntimeError('Executor has been shut down.')
83-
app = self._get_app(fn)
111+
app = self.get_app(fn)
84112
return app(*args, **kwargs)
85113

86114
# TODO (wardlt): This override can go away when Parsl supports cancel
87115
def map(self, fn: Callable, *iterables: Iterable, timeout: Optional[float] = None, chunksize: int = 1) -> Iterator:
116+
"""Returns an iterator equivalent to map(fn, iter).
117+
118+
Args:
119+
fn: A callable that will take as many arguments as there are
120+
passed iterables.
121+
timeout: The maximum number of seconds to wait. If None, then there
122+
is no limit on the wait time.
123+
chunksize: If greater than one, the iterables will be chopped into
124+
chunks of size chunksize and submitted to the process pool.
125+
If set to one, the items in the list will be sent one at a time.
126+
127+
Returns:
128+
An iterator equivalent to: map(func, ``*iterables``) but the calls may
129+
be evaluated out-of-order.
130+
131+
Raises:
132+
TimeoutError: If the entire result iterator could not be generated
133+
before the given timeout.
134+
Exception: If ``fn(*args)`` raises for any values.
135+
"""
88136
# This is a version of the CPython 3.9 `.map` implementation modified to not use `cancel`
89137
if timeout is not None:
90138
end_time = timeout + time.monotonic()
91139

92140
# Submit the applications
93-
app = self._get_app(fn)
141+
app = self.get_app(fn)
94142
fs = [app(*args) for args in zip(*iterables)]
95143

96144
# Yield the futures as completed

parsl/tests/sites/test_concurrent.py

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,26 @@
11
"""Tests of the interfaces to Python's concurrent library"""
22
from pytest import mark, raises, warns
33

4-
from parsl import Config, HighThroughputExecutor, load
4+
from parsl import Config, HighThroughputExecutor, load, python_app
55
from parsl.concurrent import ParslPoolExecutor
66

77

88
def f(x):
99
return x + 1
1010

1111

12+
def g(x):
13+
return 2 * x
14+
15+
16+
@python_app
17+
def is_odd(x):
18+
if x % 2 == 1:
19+
return 1
20+
else:
21+
return 0
22+
23+
1224
def make_config():
1325
return Config(
1426
executors=[
@@ -68,4 +80,17 @@ def test_with_dfk():
6880
with load(config) as dfk, ParslPoolExecutor(dfk=dfk, executors=['test_executor']) as exc:
6981
future = exc.submit(f, 1)
7082
assert future.result() == 2
71-
assert next(iter(exc._app_cache.values())).executors == ['test_executor'], 'Executors were not passed through'
83+
assert exc.get_app(f).executors == ['test_executor']
84+
85+
86+
@mark.local
87+
def test_chaining():
88+
"""Make sure the executor functions can be chained together"""
89+
config = make_config()
90+
91+
with ParslPoolExecutor(config) as exc:
92+
future_odd = exc.submit(f, 10)
93+
assert is_odd(future_odd).result()
94+
95+
future_even = exc.submit(g, future_odd)
96+
assert not is_odd(future_even).result()

0 commit comments

Comments
 (0)