Skip to content

Commit c35cec6

Browse files
committed
FEAT(exec,.TC): +TASK_CONTEXT (not-nested), XFail ...
parallel+marshal. Tried but din't fixed it: self.result = copy_context().run(self.op.compute, self.sol) Neglible burden on TCs executions ~2.5s: pytest -k 'not site and not directive and test_graphtik'
1 parent 5fa6357 commit c35cec6

File tree

3 files changed

+42
-7
lines changed

3 files changed

+42
-7
lines changed

docs/source/operations.rst

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -50,10 +50,10 @@ positional parameters are not supported:
5050
>>> add_op(a=3, b=4)
5151
{'a_plus_b': 7}
5252

53-
Calling an operation, it delegates to :meth:`.Operation.compute()` method,
54-
which checks the inputs, match the *needs*/*provides* to function arguments,
55-
calls the function, and finally "zip" the function results with the operation's
56-
*provides*. (read more on :ref:`graph-computations`).
53+
.. tip::
54+
In case your function needs to access the :mod:`.execution` machinery
55+
or its wrapping operation, it can do that through the :data:`.task_context`
56+
(unstable API).
5757

5858

5959
Builder pattern

graphtik/execution.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import random
66
import time
77
from collections import ChainMap, abc, defaultdict, namedtuple
8+
from contextvars import ContextVar
89
from functools import partial
910
from itertools import chain
1011
from typing import Any, Collection, List, Mapping, Optional, Tuple, Union
@@ -17,6 +18,9 @@
1718
AbortedException,
1819
IncompleteExecutionError,
1920
Items,
21+
Operation,
22+
PlotArgs,
23+
Plottable,
2024
aslist,
2125
astuple,
2226
first_solid,
@@ -39,7 +43,6 @@
3943
yield_node_names,
4044
yield_ops,
4145
)
42-
from .base import Operation, PlotArgs, Plottable
4346

4447
#: If this logger is *eventually* DEBUG-enabled,
4548
#: the string-representation of network-objects (network, plan, solution)
@@ -302,7 +305,13 @@ def __call__(self):
302305
self.result = None
303306
log = logging.getLogger(self.logname)
304307
log.debug("+++ (%s) Executing %s...", self.solid, self)
305-
self.result = self.op.compute(self.sol)
308+
token = task_context.set(self)
309+
try:
310+
## Not really needed ...
311+
# self.result = copy_context().run(self.op.compute, self.sol)
312+
self.result = self.op.compute(self.sol)
313+
finally:
314+
task_context.reset(token)
306315

307316
return self.result
308317

@@ -316,6 +325,11 @@ def __repr__(self):
316325
return f"OpTask({self.op}, sol_keys={sol_items!r})"
317326

318327

328+
#: Populated with the :class:`_OpTask` for the currently executing operation.
329+
#: It does not work for :term:`parallel execution`.
330+
task_context: ContextVar[_OpTask] = ContextVar("task_context")
331+
332+
319333
def _do_task(task):
320334
"""
321335
Un-dill the *simpler* :class:`_OpTask` & Dill the results, to pass through pool-processes.

test/test_graphtik.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
operations_reschedullled,
3535
tasks_marshalled,
3636
)
37-
from graphtik.execution import Solution
37+
from graphtik.execution import Solution, task_context
3838
from graphtik.base import Operation
3939
from graphtik.modifiers import dep_renamed, optional, sfx, sfxed, vararg
4040
from graphtik.op import NO_RESULT, NO_RESULT_BUT_SFX, operation
@@ -285,6 +285,27 @@ def powers_in_range(a, exponent):
285285
assert sol == exp
286286

287287

288+
def test_task_context(exemethod):
289+
def check_task_context():
290+
assert task_context.get().op == next(iop)
291+
292+
n_ops = 30
293+
pipe = compose(
294+
"t",
295+
*(operation(check_task_context, f"op{i}", provides="a") for i in range(n_ops)),
296+
parallel=exemethod,
297+
)
298+
iop = iter(pipe.ops)
299+
if exemethod and is_marshal_tasks():
300+
with pytest.raises(AssertionError, match="^assert FunctionalOperation"):
301+
pipe.compute()
302+
raise pytest.xfail("Cannot marshal `task_context` :-(.")
303+
else:
304+
pipe.compute()
305+
with pytest.raises(StopIteration):
306+
next(iop)
307+
308+
288309
def test_compose_rename_dict(caplog):
289310
pip = compose(
290311
"t",

0 commit comments

Comments
 (0)