Skip to content

Commit 961e956

Browse files
committed
ENH(Net,TCs): Nest pipelines with NestArgs to hooks
+ doc(terms): split mest/mege terms
1 parent a9c5e47 commit 961e956

File tree

5 files changed

+131
-81
lines changed

5 files changed

+131
-81
lines changed

docs/source/arch.rst

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -61,28 +61,33 @@ Architecture
6161
- Use :func:`.compose()` factory to build :class:`.NetworkOperation`
6262
instances (a.k.a. pipelines).
6363

64-
operation merging
65-
operation nesting
64+
combine pipelines
6665
When `operation`\s and/or `pipeline`\s are `compose`\d together, there are
67-
two ways to combine the operations contained into the new pipeline,
68-
controlled by the ``nest`` parameter of :func:`.compose()` factory
69-
:small:`(read its docstring for details on the accepted values)`:
66+
two ways to combine the operations contained into the new pipeline:
67+
`operation merging` (default) and `operation nesting`.
7068

71-
merging
72-
(default) any identically-named operations override each other,
73-
with the operations added earlier in the ``.compose()`` call
74-
(further to the left) winning over those added later (further to the right).
69+
They are selected by the ``nest`` functional parameter of :func:`.compose()`
70+
factory.
71+
72+
operation merging
73+
The default method to `combine pipelines`, also applied when simply merging `operation`\s.
7574

76-
:seealso: :ref:`operation-merging`
75+
Any identically-named operations override each other,
76+
with the operations added earlier in the ``.compose()`` call
77+
(further to the left) winning over those added later (further to the right).
78+
79+
:seealso: :ref:`operation-merging`
80+
81+
operation nesting
82+
The elaborate method to `combine pipelines` forming *clusters*.
7783

78-
nesting
79-
the original pipelines are preserved intact in "isolated" clusters,
80-
by prefixing the names of their operations (and optionally data)
81-
by the name of the respective original pipeline that contained them
82-
(or the user defines the renames).
84+
The original pipelines are preserved intact in "isolated" clusters,
85+
by prefixing the names of their operations (and optionally data)
86+
by the name of the respective original pipeline that contained them
87+
(or the user defines the renames).
8388

84-
:seealso: :ref:`operation-nesting`, :func:`.nest_any_node()`,
85-
:func:`.dep_renamed()`, :attr:`.PlotArgs.clusters`
89+
:seealso: :ref:`operation-nesting`, :func:`.compose`, :class:`.NestArgs`,
90+
:func:`.nest_any_node()`, :func:`.dep_renamed()`, :attr:`.PlotArgs.clusters`
8691

8792
compile
8893
compilation

docs/source/pipelines.rst

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -129,10 +129,9 @@ Extending pipelines
129129
Sometimes we begin with existing computation graph(s) to which we want to extend
130130
with other operations and/or pipelines.
131131

132-
There are 2 ways to combine operations together, :term:`merging
132+
There are 2 ways to :term:`combine pipelines` together, :term:`merging
133133
<operation merging>` (the default) and :term:`nesting <operation nesting>`.
134134

135-
136135
.. _operation-merging:
137136

138137
Merging
@@ -202,12 +201,12 @@ Let's suppose we want to break the isolation, and have all sub-pipelines
202201
consume & produce from a common "backlog" (n.b. in real life, we would have
203202
a "feeder" & "collector" operations).
204203

205-
We do that by passing a :func:`.callable` as the ``nest`` parameter,
206-
which will decide which of the nodes of the original pipeline, both operations & data,
207-
should be prefixed (see :func:`.compose` for the exact specs of that param):
204+
We do that by passing as the ``nest`` parameter a :func:`.callable` which will decide
205+
which names of the original pipeline (operations & dependencies) should be prefixed
206+
(see also :func:`.compose` & :class:`.NestArgs` for how to use that param):
208207

209-
>>> def rename_predicate(p, node, parent):
210-
... if node not in ("backlog", "tasks done", "todos"):
208+
>>> def rename_predicate(nest_args):
209+
... if nest_args.name not in ("backlog", "tasks done", "todos"):
211210
... return True
212211

213212
>>> week = compose("week", *weekdays, nest=rename_predicate)

graphtik/network.py

Lines changed: 68 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,17 @@
66
import logging
77
from collections import abc, defaultdict
88
from itertools import count
9-
from typing import Any, Callable, Collection, List, Mapping, Optional, Tuple, Union
9+
from typing import (
10+
Any,
11+
Callable,
12+
Collection,
13+
List,
14+
Mapping,
15+
NamedTuple,
16+
Optional,
17+
Tuple,
18+
Union,
19+
)
1020

1121
import networkx as nx
1222
from boltons.setutils import IndexedSet as iset
@@ -599,6 +609,20 @@ def compile(
599609
return plan
600610

601611

612+
class NestArgs(NamedTuple):
613+
""":term:`operation nesting` callables receive instances of this class."""
614+
615+
#: the parent :class:`.NetworkOperation` of the operation currently being processed
616+
parent: "NetworkOperation"
617+
#: what is currently being renamed,
618+
#: one of the string: ``(op | needs | provides)``
619+
typ: str
620+
#: the operation currently being processed
621+
op: Operation
622+
# the name of the item to be renamed/nested
623+
name: str
624+
625+
602626
def build_network(
603627
operations,
604628
rescheduled=None,
@@ -608,7 +632,12 @@ def build_network(
608632
nest=None,
609633
node_props=None,
610634
):
611-
"""The :term:`network` factory that does :term:`operation merging` before constructing it. """
635+
"""
636+
The :term:`network` factory that does :term:`operation merging` before constructing it.
637+
638+
:param nest:
639+
see same-named param in :func:`.compose`
640+
"""
612641
from boltons.setutils import IndexedSet as iset
613642
from .op import NULL_OP
614643
from .pipeline import NetworkOperation
@@ -622,7 +651,7 @@ def proc_op(op, parent=None):
622651
#
623652
if (
624653
node_props
625-
or (nest and parent)
654+
or nest
626655
or rescheduled is not None
627656
or endured is not None
628657
or parallel is not None
@@ -646,43 +675,51 @@ def proc_op(op, parent=None):
646675
## If `nest`, rename the op & data (predicated by `nest`)
647676
# by prefixing them with their parent netop.
648677
#
678+
nest_args = NestArgs(parent, op, None, None)
649679
if nest:
650-
kw["name"] = node_renamed("operation", op, parent)
651-
kw["needs"] = [node_renamed("needs", n, parent) for n in op.needs]
680+
kw["name"] = rename(nest_args._replace(typ="op", name=op.name))
681+
kw["needs"] = [
682+
rename(nest_args._replace(typ="needs", name=n)) for n in op.needs
683+
]
652684
kw["provides"] = [
653-
node_renamed("provides", n, parent) for n in op.provides
685+
rename(nest_args._replace(typ="provides", name=n))
686+
for n in op.provides
654687
]
655688

656689
op = op.withset(**kw)
657690

658691
return op
659692

660-
def node_renamed(typ, node, parent) -> str:
693+
def rename(nest_args: NestArgs) -> str:
661694
"""Handle user's or default `nest` callable's results."""
662-
assert callable(nest), (nest, operations)
663-
664-
ret = nest(typ, node, parent)
695+
assert callable(nest), (nest, nest_args)
696+
697+
ok = False
698+
try:
699+
ret = nest(nest_args)
700+
if not ret:
701+
# A falsy means don't touch the node.
702+
ret = nest_args.name
703+
elif not isinstance(ret, str):
704+
# Truthy but not str values mean apply default nesting.
705+
ret = nest_any_node(nest_args)
706+
707+
ok = True
708+
return ret
709+
finally:
710+
if not ok:
711+
log.warning("Failed to nest-rename %s", nest_args)
665712

666-
if not ret:
667-
# A falsy means don't touch the node.
668-
return node.name if isinstance(node, Operation) else node
669-
670-
if not isinstance(ret, str):
671-
# Truthy but not str values mean nest!
672-
ret = nest_any_node(typ, node, parent)
673-
674-
return ret
675-
676-
## Set default nesting if requested by user.
677-
#
678713
if nest:
714+
## Set default nesting if not one provided by user.
715+
#
679716
if not callable(nest):
680717
nest = nest_any_node
681718

682719
merge_set = iset() # Preseve given node order.
683720
for op in operations:
684721
if isinstance(op, NetworkOperation):
685-
merge_set.update(proc_op(s, op.name) for s in yield_ops(op.net.graph))
722+
merge_set.update(proc_op(s, op) for s in yield_ops(op.net.graph))
686723
else:
687724
merge_set.add(proc_op(op))
688725
merge_set = iset(i for i in merge_set if not isinstance(i, NULL_OP))
@@ -693,22 +730,18 @@ def node_renamed(typ, node, parent) -> str:
693730
return net
694731

695732

696-
def nest_any_node(
697-
node_type: str, node: Union[Operation, str], parent: "NetworkOperation"
698-
) -> str:
699-
"""Nest both operation & data `node` under `parent` (if given).
733+
def nest_any_node(nest_args: NestArgs) -> str:
734+
"""Nest both operation & data under `parent`'s name (if given).
700735
701736
:return:
702737
the nested name of the operation or data
703738
"""
704739

705740
def prefixed(name):
706-
return f"{parent}.{name}" if parent else name
741+
return f"{nest_args.parent.name}.{name}" if nest_args.parent else name
707742

708-
if isinstance(node, Operation):
709-
new_name = prefixed(node.name)
710-
else:
711-
assert isinstance(node, str), locals()
712-
new_name = dep_renamed(node, prefixed)
713-
714-
return new_name
743+
return (
744+
prefixed(nest_args.name)
745+
if nest_args.typ == "op"
746+
else dep_renamed(nest_args.name, prefixed)
747+
)

graphtik/pipeline.py

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ def compose(
331331
endured=None,
332332
parallel=None,
333333
marshalled=None,
334-
nest: Union[Callable[[str], str], Union[bool, str]] = None,
334+
nest: Union[Callable[["NestArgs"], str], Union[bool, str]] = None,
335335
node_props=None,
336336
) -> NetworkOperation:
337337
"""
@@ -350,29 +350,26 @@ def compose(
350350
``operation``.
351351
:param nest:
352352
- If false (default), applies :term:`operation merging`, not *nesting*.
353-
- if true, applies :term:`operation nesting` to :class:`.FunctionalOperation`\\s
354-
only (performed by :func:`.op.nest_any_node()`;
355-
- if it is :func:`.callable`, it is given each node and the parent pipeline
356-
(if combining pipeline(s)) to decide the node's name.
353+
- if true, applies :term:`operation nesting` to :all types of nodes
354+
(performed by :func:`.nest_any_node()`;
355+
- if it is a :func:`.callable`, it is given a :class:`.NestArgs` instance
356+
to decide the node's name.
357357
358-
The callable may return the new-name (str), or a true/false to apply
359-
the default renaming which is to rename all nodes (as performed by
360-
:func:`.op.nest_any_node()`).
358+
The callable may return a *str* for the new-name, or any other true/false
359+
to apply default nesting which is to rename all nodes, as performed
360+
by :func:`.nest_any_node()`.
361361
362-
For example, to nest just the operations, call::
362+
For example, to nest just operation's names (but not their dependencies),
363+
call::
363364
364365
compose(
365366
...,
366-
nest=lambda typ, node, parent: isinstance(n, Operation)
367+
nest=lambda nest_args: nest_args.typ == "op"
367368
)
368369
369-
...where `typ` is one of ``(operation | needs | provides)`` strings.
370-
371-
Of course the callable can apply a renaming irrelevant from the `parent`.
372-
373370
.. Attention::
374-
The callable must preserve any :term:`modifier` on data nodes,
375-
so :func:`.dep_renamed` should be used.
371+
The callable SHOULD wish to preserve any :term:`modifier` on dependencies,
372+
and use :func:`.dep_renamed`.
376373
377374
:seealso: :ref:`operation-nesting` for examples
378375

test/test_graphtik.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -280,9 +280,25 @@ def powers_in_range(a, exponent):
280280
assert sol == exp
281281

282282

283+
def test_network_nest_errors(caplog):
284+
def screamy_nester(nest_args):
285+
raise RuntimeError("Bluff")
286+
287+
with pytest.raises(RuntimeError, match="Bluff"):
288+
compose(
289+
"test_nest_err",
290+
operation(str, "op1"),
291+
operation(str, "op2"),
292+
nest=screamy_nester,
293+
)
294+
for record in caplog.records:
295+
if record.levelname == "WARNING":
296+
assert "(parent=None, typ='op', op=None, name='op1')" in record.message
297+
298+
283299
def test_network_nest_ops_only():
284-
def ops_only(t, n, p):
285-
return isinstance(n, Operation)
300+
def ops_only(nest_args):
301+
return nest_args.typ == "op"
286302

287303
sum_op1 = operation(name="sum_op1", needs=["a", "b"], provides="sum1")(add)
288304
sum_op2 = operation(name="sum_op2", needs=["a", "b"], provides="sum2")(add)
@@ -397,11 +413,11 @@ def test_network_merge_in_doctests():
397413
week = compose("week", *weekdays, nest=True)
398414
assert len(week.ops) == 6
399415

400-
def rename_predicate(typ, node, parent):
401-
if node not in ("backlog", "tasks done", "todos"):
416+
def nester(nest_args):
417+
if nest_args.name not in ("backlog", "tasks done", "todos"):
402418
return True
403419

404-
week = compose("week", *weekdays, nest=rename_predicate)
420+
week = compose("week", *weekdays, nest=nester)
405421
assert len(week.ops) == 6
406422
sol = week.compute({"backlog": "a lot!"})
407423
assert sol == {
@@ -1623,7 +1639,7 @@ def test_combine_networks(exemethod, bools):
16231639
name="sub2", needs=["a_minus_ab", "c"], provides="a_minus_ab_minus_c"
16241640
)(sub),
16251641
parallel=parallel2,
1626-
nest=lambda t, n, p: isinstance(n, Operation),
1642+
nest=lambda nest_args: nest_args.typ == "op",
16271643
)
16281644
## Ensure all old-nodes were prefixed.
16291645
#

0 commit comments

Comments
 (0)