Skip to content

Commit bae13da

Browse files
committed
ENH(PLAN) hide Recompute_TILL from user; +1.6TC; ...
+ enh: recompute_TILL := Outputs. + refact: pass a copy-graph in recompute, for client-code to inspect it if need. + fix: empties TC ok, now parametrixzed.
1 parent 6848dfc commit bae13da

File tree

4 files changed

+88
-75
lines changed

4 files changed

+88
-75
lines changed

graphtik/base.py

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -784,7 +784,6 @@ def compute(
784784
named_inputs,
785785
outputs=None,
786786
recompute_from=None,
787-
recompute_till=None,
788787
*kw,
789788
):
790789
"""
@@ -801,9 +800,6 @@ def compute(
801800
:param recompute_from:
802801
recompute all downstream from those dependencies,
803802
see :meth:`.Pipeline.compute()`.
804-
:param recompute_till:
805-
(UNSTABLE) recompute all upstream from those dependencies,
806-
see :meth:`.Pipeline.compute()`.
807803
808804
:returns list:
809805
Should return a list values representing

graphtik/pipeline.py

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -322,7 +322,6 @@ def compile(
322322
inputs=None,
323323
outputs=UNSET,
324324
recompute_from=None,
325-
recompute_till=None,
326325
*,
327326
predicate: "NodePredicate" = UNSET,
328327
) -> "ExecutionPlan":
@@ -337,8 +336,6 @@ def compile(
337336
If not given, those set by a previous call to :meth:`withset()` or cstor are used.
338337
:param recompute_from:
339338
Described in :meth:`.Pipeline.compute()`.
340-
:param recompute_till:
341-
(UNSTABLE) Described in :meth:`.Pipeline.compute()`.
342339
:param predicate:
343340
Will be stored and applied on the next :meth:`compute()` or :meth:`compile()`.
344341
If not given, those set by a previous call to :meth:`withset()` or cstor are used.
@@ -361,16 +358,13 @@ def compile(
361358
if predicate == UNSET:
362359
predicate = self.predicate
363360

364-
return self.net.compile(
365-
inputs, outputs, recompute_from, recompute_till, predicate=predicate
366-
)
361+
return self.net.compile(inputs, outputs, recompute_from, predicate=predicate)
367362

368363
def compute(
369364
self,
370365
named_inputs: Mapping = None,
371366
outputs: Items = UNSET,
372367
recompute_from: Items = None,
373-
recompute_till: Items = None,
374368
*,
375369
predicate: "NodePredicate" = UNSET,
376370
pre_callback=None,
@@ -397,19 +391,13 @@ def compute(
397391
*strictly downstream (excluding themselves)* from the dependencies
398392
listed here, as missing from `named_inputs`.
399393
400-
* If also `recompute_till` is given, traversing downstream stops
401-
when arriving in any dependency contained in that list.
394+
* Traversing downstream stops when arriving at any dep in `outputs`.
402395
* Any dependencies here unreachable downstreams from values in `named_inputs`
403396
are ignored, but logged.
404397
* Any dependencies here unreachable upstreams from `outputs` (if given)
405398
are ignored, but logged.
406399
* Results may differ even if graph is unchanged, in the presence
407400
of :term:`overwrite`\\s.
408-
:param recompute_till:
409-
(UNSTABLE) Refresh all computations arriving to these (string or list) dependencies.
410-
In effect, it clears all values in `named_inputs` reachable UPstreams.
411-
412-
* See bullet notes in `recompute_from`, above.
413401
:param predicate:
414402
filter-out nodes before compiling
415403
If not given, those set by a previous call to :meth:`withset()` or cstor are used.
@@ -471,7 +459,6 @@ def compute(
471459
named_inputs.keys(),
472460
outputs,
473461
recompute_from,
474-
recompute_till,
475462
predicate=predicate,
476463
)
477464

graphtik/planning.py

Lines changed: 42 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
Mapping,
1818
NamedTuple,
1919
Optional,
20+
Sequence,
2021
Tuple,
2122
Union,
2223
)
@@ -239,18 +240,23 @@ def _topo_sort_nodes(dag) -> iset:
239240
raise nx.NetworkXUnfeasible(msg).with_traceback(tb)
240241

241242

242-
def recompute_inputs(
243-
graph, inputs, recompute_from, recompute_till
243+
def inputs_for_recompute(
244+
graph,
245+
inputs: Sequence[str],
246+
recompute_from: Sequence[str],
247+
recompute_till: Sequence[str] = None,
244248
) -> Tuple[iset, iset]:
245249
"""
246250
Clears the inputs between `recompute_from >--<= recompute_till` to clear.
247251
252+
:param graph:
253+
MODIFIED, at most 2 helper nodes inserted
248254
:param inputs:
249-
None or a sequence
255+
a sequence
250256
:param recompute_from:
251-
None or a sequence
257+
None or a sequence, including any out-of-graph deps (logged))
252258
:param recompute_till:
253-
(UNSTABLE) None or a sequence
259+
(optional) a sequence, only in-graph deps.
254260
255261
:return:
256262
a 2-tuple with the reduced `inputs` by the dependencies that must
@@ -260,51 +266,40 @@ def recompute_inputs(
260266
261267
strict-descendants(recompute_from) & ancestors(recompute_till)
262268
263-
FIXME: Should recompute() while travesing unsatisfied? Is `till` relevant??
269+
FIXME: merge recompute() with travesing unsatisfied (see ``test_recompute_NEEDS_FIX``)
270+
bc it clears inputs of unsatisfied ops (cannot be replaced later)
264271
"""
265-
start, stop = "_TMP.RECOMPUTE_FROM", "_TMP.RECOMPUTE_TILL"
266-
graph = graph.copy()
267-
268-
datanodes = iset(yield_datanodes(graph.nodes))
269-
downstreams_strict = datanodes
270-
if recompute_from is not None:
271-
recompute_from = iset(recompute_from) # traversed in logs
272-
bad = recompute_from - datanodes
273-
if bad:
274-
log.info(
275-
"... ignoring unknown `recompute_from` dependencies: %s", list(bad)
276-
)
277-
recompute_from = recompute_from & datanodes
278-
if recompute_from:
279-
graph.add_edges_from((start, i) for i in recompute_from)
272+
START, STOP = "_TMP.RECOMPUTE_FROM", "_TMP.RECOMPUTE_TILL"
280273

281-
downstreams_strict = (
282-
iset(yield_datanodes(nx.descendants(graph, start))) - recompute_from
283-
)
274+
deps = set(yield_datanodes(graph.nodes))
275+
recompute_from = iset(recompute_from) # traversed in logs
276+
inputs = iset(inputs) # returned
277+
bad = recompute_from - deps
278+
if bad:
279+
log.info("... ignoring unknown `recompute_from` dependencies: %s", list(bad))
280+
recompute_from = recompute_from & deps # avoid sideffect in `recompute_from`
281+
assert recompute_from, f"Given unknown-only `recompute_from` {locals()}"
284282

285-
if recompute_till is None:
286-
upstreams = datanodes
287-
else:
288-
recompute_till = iset(recompute_till) # traversed in logs
289-
bad = recompute_till - datanodes
290-
if bad:
291-
log.info(
292-
"... ignoring unknown `recompute_till` dependencies: %s", list(bad)
293-
)
294-
recompute_till = recompute_till & datanodes
295-
graph.add_edges_from((i, stop) for i in recompute_till) # edge reversed!
283+
graph.add_edges_from((START, i) for i in recompute_from)
284+
285+
# strictly-downstreams from START
286+
between_deps = iset(nx.descendants(graph, START)) & deps - recompute_from
287+
288+
if recompute_till:
289+
graph.add_edges_from((i, STOP) for i in recompute_till) # edge reversed!
296290

297-
upstreams = iset(yield_datanodes(nx.ancestors(graph, stop)))
291+
# upstreams from STOP
292+
upstreams = set(nx.ancestors(graph, STOP)) & deps
293+
between_deps &= upstreams
298294

299-
between = downstreams_strict & upstreams
300-
recomputes = [i for i in inputs if i in between]
301-
new_inputs = iset(inputs) - between
295+
recomputes = between_deps & inputs
296+
new_inputs = iset(inputs) - recomputes
302297

303298
if log.isEnabledFor(logging.DEBUG):
304299
log.debug(
305300
"... recompute x%i data%s means deleting x%i inputs%s, to arrive from x%i %s -> x%i %s.",
306-
len(between),
307-
list(between),
301+
len(between_deps),
302+
list(between_deps),
308303
len(recomputes),
309304
list(recomputes),
310305
len(inputs),
@@ -866,7 +861,6 @@ def compile(
866861
inputs: Items = None,
867862
outputs: Items = None,
868863
recompute_from=None,
869-
recompute_till=None,
870864
*,
871865
predicate=None,
872866
) -> "ExecutionPlan":
@@ -886,8 +880,6 @@ def compile(
886880
If string, it is converted to a single-element collection.
887881
:param recompute_from:
888882
Described in :meth:`.Pipeline.compute()`.
889-
:param recompute_till:
890-
(UNSTABLE) Described in :meth:`.Pipeline.compute()`.
891883
:param predicate:
892884
the :term:`node predicate` is a 2-argument callable(op, node-data)
893885
that should return true for nodes to include; if None, all nodes included.
@@ -909,15 +901,15 @@ def compile(
909901

910902
ok = False
911903
try:
912-
## Make a stable cache-key.
904+
## Make a stable cache-key,
905+
# ignoring out-of-graph nodes (2nd results).
913906
#
914907
inputs, k1 = self._deps_tuplized(inputs, "inputs")
915908
outputs, k2 = self._deps_tuplized(outputs, "outputs")
916909
recompute_from, k3 = self._deps_tuplized(recompute_from, "recompute_from")
917-
recompute_till, k4 = self._deps_tuplized(recompute_till, "recompute_till")
918910
if not predicate:
919911
predicate = None
920-
cache_key = (k1, k2, k3, k4, predicate, is_skip_evictions())
912+
cache_key = (k1, k2, k3, predicate, is_skip_evictions())
921913

922914
## Build (or retrieve from cache) execution plan
923915
# for the given dep-lists (excluding any unknown node-names).
@@ -926,9 +918,9 @@ def compile(
926918
log.debug("... compile cache-hit key: %s", cache_key)
927919
plan = self._cached_plans[cache_key]
928920
else:
929-
if recompute_from is not None or recompute_till is not None:
930-
inputs, recomputes = recompute_inputs(
931-
self.graph, inputs, recompute_from, recompute_till
921+
if recompute_from:
922+
inputs, recomputes = inputs_for_recompute(
923+
self.graph.copy(), inputs, recompute_from, k2
932924
)
933925

934926
_prune_results = self._prune_graph(inputs, outputs, predicate)

test/test_graphtik.py

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
vararg,
2222
varargs,
2323
)
24-
from graphtik.base import IncompleteExecutionError
24+
from graphtik.base import IncompleteExecutionError, UNSET
2525
from graphtik.config import debug_enabled, evictions_skipped, operations_endured
2626

2727
from .helpers import addall, exe_params
@@ -1014,17 +1014,26 @@ def test_rerun_resched(quarantine_pipeline):
10141014
assert exe_ops(sol) == ["get_out_or_stay_home", "exercise"]
10151015

10161016

1017-
@pytest.mark.xfail(reason="Badly-specified `recompute` empties AND algorithm!")
1018-
def test_recompute_empties(samplenet):
1017+
@pytest.fixture
1018+
def recompute_sol(samplenet):
10191019
pipe = compose(..., samplenet, excludes="sum_op1")
10201020
sol = pipe.compute({"c": 3, "d": 4}, recompute_from=())
10211021
assert sol == {"c": 3, "d": 4, "sum2": 7, "sum3": 10}
10221022
assert exe_ops(sol) == ["sum_op2", "sum_op3"]
1023+
1024+
return pipe, sol
1025+
1026+
1027+
@pytest.mark.parametrize("outs", [None, UNSET, ()])
1028+
@pytest.mark.parametrize("recomputes", [None, ()])
1029+
def test_recompute_empties(recompute_sol, outs, recomputes):
1030+
pipe, sol = recompute_sol
10231031
exp = dict(sol)
10241032

1025-
sol = pipe.compute(sol, recompute_from=())
1026-
assert sol == exp
1027-
assert exe_ops(sol) == ["sum_op2", "sum_op3"]
1033+
with pytest.raises(ValueError, match="^Unsolvable"):
1034+
pipe.compute(sol, outputs=outs, recompute_from=recomputes)
1035+
with pytest.raises(ValueError, match="^Unsolvable"):
1036+
pipe.compute({}, outputs=outs, recompute_from=recomputes)
10281037

10291038

10301039
@pytest.mark.parametrize(
@@ -1131,6 +1140,35 @@ def test_recompute_resched_false(quarantine_pipeline, recompute, exp_ops):
11311140
assert exe_ops(sol) == exp_ops
11321141

11331142

1143+
def test_recompute_till():
1144+
def by2(n):
1145+
return 2 * n
1146+
1147+
pipe = compose(
1148+
...,
1149+
operation(by2, "f0", "a0", "a1"),
1150+
operation(by2, "f1", "a1", "a2"),
1151+
operation(by2, "f2", "a2", "a3"),
1152+
operation(by2, "f3", "a3", "a4"),
1153+
)
1154+
sol = pipe(a0=1)
1155+
assert exe_ops(sol) == ["f0", "f1", "f2", "f3"]
1156+
assert sol == {"a0": 1, "a1": 2, "a2": 4, "a3": 8, "a4": 16}
1157+
1158+
inp = dict(sol)
1159+
inp["a1"] = 3
1160+
1161+
sol = pipe.compute(inp, outputs="a3", recompute_from="a1")
1162+
assert exe_ops(sol) == ["f1", "f2"]
1163+
assert sol == {"a3": 12}
1164+
1165+
with evictions_skipped(True):
1166+
sol = pipe.compute(inp, outputs="a3", recompute_from="a1")
1167+
sol.plot("t.pdf")
1168+
assert exe_ops(sol) == ["f1", "f2"]
1169+
assert sol == {"a0": 1, "a1": 3, "a2": 6, "a3": 12, "a4": 16}
1170+
1171+
11341172
def test_recompute_NEEDS_FIX():
11351173
pipe = compose(
11361174
...,

0 commit comments

Comments
 (0)