Skip to content

Commit 9dc00e7

Browse files
committed
FEAT(OP,.TC): +support node-kws (for predicate narrowing) ...
+ feat(op): +withset() method. + refact(netop): merge reworked. + test(op, netop): check also node_props reach network also on merges.
1 parent de146e3 commit 9dc00e7

File tree

5 files changed

+151
-34
lines changed

5 files changed

+151
-34
lines changed

graphtik/netop.py

Lines changed: 30 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from .base import Plotter, aslist, astuple, jetsam
1414
from .modifiers import optional, sideffect
1515
from .network import Network, yield_operations
16-
from .op import Operation, reparse_operation_data
16+
from .op import FunctionalOperation, Operation, reparse_operation_data
1717

1818
log = logging.getLogger(__name__)
1919

@@ -67,6 +67,7 @@ def __init__(
6767
self.name = name
6868
self.inputs = inputs
6969
self.provides = outputs
70+
# Prune network
7071
self.net = net.pruned(inputs, outputs)
7172
## Set data asap, for debugging, although `prune()` will reset them.
7273
self.set_execution_method(method)
@@ -256,6 +257,7 @@ def compose(
256257
needs=None,
257258
provides=None,
258259
merge=False,
260+
node_props=None,
259261
method=None,
260262
overwrites_collector=None,
261263
) -> NetworkOperation:
@@ -283,7 +285,9 @@ def compose(
283285
this ``compose`` instance). If any two operations are the same
284286
(based on name), then that operation is computed only once, instead
285287
of multiple times (one for each time the operation appears).
286-
288+
:param node_props:
289+
added as-is into NetworkX graph, to provide for filtering
290+
by :meth:`.NetworkOperation.narrow()`.
287291
:param method:
288292
either `parallel` or None (default);
289293
if ``"parallel"``, launches multi-threading.
@@ -305,21 +309,36 @@ def compose(
305309
if not all(isinstance(op, Operation) for op in operations):
306310
raise ValueError(f"Non-Operation instances given: {operations}")
307311

312+
def proc_op(op, parent=None):
313+
"""clone FuncOperation with certain props changed"""
314+
assert isinstance(op, FunctionalOperation), op
315+
316+
## Convey any node-props specified in the netop here
317+
# to all sub-operations.
318+
#
319+
if node_props or parent:
320+
kw = {}
321+
if node_props:
322+
op_node_props = op.node_props.copy()
323+
op_node_props.update(node_props)
324+
kw["node_props"] = op_node_props
325+
## If `merge` asked, leave original `name` to deduplicate operations,
326+
# otherwise rename the op by prefixing them with their parent netop.
327+
#
328+
if not merge and parent:
329+
kw["parents"] = (parent,) + (op.parents or ())
330+
op = op.withset(**kw)
331+
332+
return op
333+
308334
merge_set = iset() # Preseve given node order.
309335
for op in operations:
310336
if isinstance(op, NetworkOperation):
311-
# TODO: do we really need sorting ops when merging?
312-
netop_nodes = op.net.graph
313337
merge_set.update(
314-
# If merge is desired, set will deduplicate operations
315-
s if merge else
316-
# else rename added ops by prefixing them with their parent netop.
317-
s._adopted_by(op.name)
318-
for s in netop_nodes
319-
if isinstance(s, Operation)
338+
proc_op(s, op.name) for s in op.net.graph if isinstance(s, Operation)
320339
)
321340
else:
322-
merge_set.add(op)
341+
merge_set.add(proc_op(op))
323342
operations = merge_set
324343

325344
net = Network(*operations)

graphtik/network.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -565,7 +565,7 @@ def _append_operation(self, graph, operation: Operation):
565565
"""
566566
Adds the given operation and its data requirements to the network graph.
567567
568-
- Invoked dyuring constructor only (immutability).
568+
- Invoked during constructor only (immutability).
569569
- Identities are based on the name of the operation, the names of the operation's needs,
570570
and the names of the data it provides.
571571
@@ -584,6 +584,8 @@ def _append_operation(self, graph, operation: Operation):
584584
graph.add_node(_DataNode(n), sideffect=True)
585585
graph.add_edge(_DataNode(n), operation, **kw)
586586

587+
graph.add_node(operation, **operation.node_props)
588+
587589
# add nodes and edges to graph describing what this layer provides
588590
for n in operation.provides:
589591
kw = {}

graphtik/op.py

Lines changed: 54 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import abc
66
import logging
77
from collections import abc as cabc
8-
from typing import Callable, Collection, Tuple, Union
8+
from typing import Callable, Collection, Mapping, Tuple, Union
99

1010
from boltons.setutils import IndexedSet as iset
1111

@@ -41,7 +41,7 @@ def reparse_operation_data(name, needs, provides):
4141

4242
# TODO: immutable `Operation` by inheriting from `namedtuple`.
4343
class Operation(abc.ABC):
44-
"""An abstract class representing a data transformation by :meth:`.compute()`."""
44+
"""An abstract class representing an action with :meth:`.compute()`."""
4545

4646
@abc.abstractmethod
4747
def compute(self, named_inputs, outputs=None):
@@ -75,6 +75,7 @@ def __init__(
7575
provides: Union[Collection, str] = None,
7676
*,
7777
parents: Tuple = None,
78+
node_props: Mapping = None,
7879
returns_dict=None,
7980
):
8081
"""
@@ -87,18 +88,21 @@ def __init__(
8788
Names of input data objects this operation requires.
8889
:param provides:
8990
Names of output data objects this provides.
90-
:param parent:
91+
:param parents:
9192
a tuple wth the names of the parents, prefixing `name`,
9293
but also kept for equality/hash check.
93-
94+
:param node_props:
95+
added as-is into NetworkX graph
9496
"""
9597
## Set op-data early, for repr() to work on errors.
9698
self.fn = fn
9799
self.name = name
98100
self.needs = needs
99101
self.provides = provides
100102
self.parents = parents
103+
self.node_props = node_props = node_props if node_props else {}
101104
self.returns_dict = returns_dict
105+
102106
if not fn or not callable(fn):
103107
raise ValueError(
104108
f"Operation was not provided with a callable: {fn}\n {self}"
@@ -107,6 +111,9 @@ def __init__(
107111
raise ValueError(
108112
f"Operation `parents` must be tuple, was {parents}\n {self}"
109113
)
114+
if node_props is not None and not isinstance(node_props, cabc.Mapping):
115+
raise ValueError(f"`node_props` must be a mapping, got: {node_props!r}")
116+
110117
## Overwrite reparsed op-data.
111118
name = ".".join(str(pop) for pop in ((parents or ()) + (name,)))
112119
self.name, self.needs, self.provides = reparse_operation_data(
@@ -133,21 +140,20 @@ def __repr__(self):
133140
provides = aslist(self.provides, "provides")
134141
fn_name = self.fn and getattr(self.fn, "__name__", str(self.fn))
135142
returns_dict_marker = self.returns_dict and "{}" or ""
143+
nprops = f", x{len(self.node_props)}props" if self.node_props else ""
136144
return (
137145
f"FunctionalOperation(name={self.name!r}, needs={needs!r}, "
138-
f"provides={provides!r}, fn{returns_dict_marker}={fn_name!r})"
146+
f"provides={provides!r}, fn{returns_dict_marker}={fn_name!r}{nprops})"
139147
)
140148

141-
def _adopted_by(self, parent):
142-
"""Make a clone with the given parrent set."""
143-
return FunctionalOperation(
144-
self.fn,
145-
self.name,
146-
self.needs,
147-
self.provides,
148-
parents=(parent,) + (self.parents or ()),
149-
returns_dict=self.returns_dict,
150-
)
149+
def withset(self, **kw) -> "FunctionalOperation":
150+
"""Make a clone with the some values replaced."""
151+
fn = kw["fn"] if "fn" in kw else self.fn
152+
name = kw["name"] if "name" in kw else self.name
153+
needs = kw["needs"] if "needs" in kw else self.needs
154+
provides = kw["provides"] if "provides" in kw else self.provides
155+
156+
return FunctionalOperation(fn, name, needs, provides, **kw)
151157

152158
def _zip_results_with_provides(self, results, real_provides: iset) -> dict:
153159
"""Zip results with expected "real" (without sideffects) `provides`."""
@@ -201,7 +207,6 @@ def compute(self, named_inputs, outputs=None) -> dict:
201207
if not isinstance(n, (optional, sideffect))
202208
]
203209
except KeyError:
204-
# FIXME:
205210
compulsory = iset(
206211
n for n in self.needs if not isinstance(n, (optional, sideffect))
207212
)
@@ -283,7 +288,8 @@ class operation:
283288
elements must be returned
284289
:param bool returns_dict:
285290
if true, it means the `fn` returns a dictionary with all `provides`,
286-
and no further processing is done on them.
291+
:param node_props:
292+
added as-is into NetworkX graph
287293
288294
:return:
289295
when called, it returns a :class:`FunctionalOperation`
@@ -316,15 +322,24 @@ def __init__(
316322
needs=None,
317323
provides=None,
318324
returns_dict=None,
325+
node_props: Mapping = None,
319326
):
320327
self.fn = fn
321328
self.name = name
322329
self.needs = needs
323330
self.provides = provides
324331
self.returns_dict = returns_dict
332+
self.node_props = node_props
325333

326334
def withset(
327-
self, *, fn=None, name=None, needs=None, provides=None, returns_dict=None
335+
self,
336+
*,
337+
fn=None,
338+
name=None,
339+
needs=None,
340+
provides=None,
341+
returns_dict=None,
342+
node_props: Mapping = None,
328343
) -> "operation":
329344
if fn is not None:
330345
self.fn = fn
@@ -336,11 +351,20 @@ def withset(
336351
self.provides = provides
337352
if returns_dict is not None:
338353
self.returns_dict = returns_dict
354+
if node_props is not None:
355+
self.node_props = node_props
339356

340357
return self
341358

342359
def __call__(
343-
self, fn=None, *, name=None, needs=None, provides=None, returns_dict=None
360+
self,
361+
fn=None,
362+
*,
363+
name=None,
364+
needs=None,
365+
provides=None,
366+
returns_dict=None,
367+
node_props: Mapping = None,
344368
) -> FunctionalOperation:
345369
"""
346370
This enables ``operation`` to act as a decorator or as a functional
@@ -365,7 +389,12 @@ def myadd(a, b):
365389
"""
366390

367391
self.withset(
368-
fn=fn, name=name, needs=needs, provides=provides, returns_dict=returns_dict
392+
fn=fn,
393+
name=name,
394+
needs=needs,
395+
provides=provides,
396+
returns_dict=returns_dict,
397+
node_props=node_props,
369398
)
370399

371400
return FunctionalOperation(**vars(self))
@@ -377,4 +406,8 @@ def __repr__(self):
377406
needs = aslist(self.needs, "needs")
378407
provides = aslist(self.provides, "provides")
379408
fn_name = self.fn and getattr(self.fn, "__name__", str(self.fn))
380-
return f"operation(name={self.name!r}, needs={needs!r}, provides={provides!r}, fn={fn_name!r})"
409+
nprops = f", x{len(self.node_props)}props" if self.node_props else ""
410+
return (
411+
f"operation(name={self.name!r}, needs={needs!r}, "
412+
f"provides={provides!r}, fn={fn_name!r}{nprops})"
413+
)

test/test_base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,7 @@ def __init__(self):
166166
self.name = ("",)
167167
self.needs = ()
168168
self.provides = ("a",)
169+
self.node_props = {}
169170

170171
def compute(self, named_inputs, outputs=None):
171172
pass

test/test_op.py

Lines changed: 63 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@
33

44
import pytest
55

6-
from graphtik import operation, optional, vararg, varargs
6+
from graphtik import compose, operation, optional, vararg, varargs
7+
from graphtik.network import yield_operations
78
from graphtik.op import Operation, reparse_operation_data
89

910

@@ -133,3 +134,64 @@ def sumall(a, *args, b=0, **kwargs):
133134
assert op.compute(dict(a=1, arg1=2, arg2=3, b=6, c=7))["sum"] == exp - 4 - 5
134135
with pytest.raises(ValueError, match="Missing compulsory needs.+'a'"):
135136
assert op.compute(dict(arg1=2, arg2=3, b=6, c=7))
137+
138+
139+
def test_op_node_props_bad():
140+
op_factory = operation(lambda: None, name="a", node_props="SHOULD BE DICT")
141+
with pytest.raises(ValueError, match="`node_props` must be"):
142+
op_factory()
143+
144+
145+
def test_op_node_props():
146+
op_factory = operation(lambda: None, name="a", node_props=())
147+
assert op_factory.node_props == ()
148+
assert op_factory().node_props == {}
149+
150+
np = {"a": 1}
151+
op = operation(lambda: None, name="a", node_props=np)()
152+
assert op.node_props == np
153+
154+
155+
def _collect_op_props(netop):
156+
return {
157+
k.name: v
158+
for k, v in netop.net.graph.nodes(data=True)
159+
if isinstance(k, Operation)
160+
}
161+
162+
163+
def test_netop_node_props():
164+
op1 = operation(lambda: None, name="a", node_props={"a": 11, "b": 0, "bb": 2})()
165+
op2 = operation(lambda: None, name="b", node_props={"a": 3, "c": 4})()
166+
netop = compose("n", op1, op2, node_props={"bb": 22, "c": 44})
167+
168+
exp = {"a": {"a": 11, "b": 0, "bb": 22, "c": 44}, "b": {"a": 3, "bb": 22, "c": 44}}
169+
node_props = _collect_op_props(netop)
170+
assert node_props == exp
171+
172+
# Check node-prop sideffects are not modified
173+
#
174+
assert op1.node_props == {"a": 11, "b": 0, "bb": 2}
175+
assert op2.node_props == {"a": 3, "c": 4}
176+
177+
178+
def test_netop_merge_node_props():
179+
op1 = operation(lambda: None, name="a", node_props={"a": 1})()
180+
netop1 = compose("n1", op1)
181+
op2 = operation(lambda: None, name="a", node_props={"a": 11, "b": 0, "bb": 2})()
182+
op3 = operation(lambda: None, name="b", node_props={"a": 3, "c": 4})()
183+
netop2 = compose("n2", op2, op3)
184+
185+
netop = compose("n", netop1, netop2, node_props={"bb": 22, "c": 44}, merge=False)
186+
exp = {
187+
"n1.a": {"a": 1, "bb": 22, "c": 44},
188+
"n2.a": {"a": 11, "b": 0, "bb": 22, "c": 44},
189+
"n2.b": {"a": 3, "bb": 22, "c": 44},
190+
}
191+
node_props = _collect_op_props(netop)
192+
assert node_props == exp
193+
194+
netop = compose("n", netop1, netop2, node_props={"bb": 22, "c": 44}, merge=True)
195+
exp = {"a": {"a": 1, "bb": 22, "c": 44}, "b": {"a": 3, "bb": 22, "c": 44}}
196+
node_props = _collect_op_props(netop)
197+
assert node_props == exp

0 commit comments

Comments
 (0)