Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
89717b5
Update storage locs, latest_loop_iter; start sets Input output
acl-cqc Jan 21, 2026
92de749
Also update run_graph/resume_graph
acl-cqc Jan 21, 2026
70fb35e
WIP walk_loop...also will need to update WalkResult??
acl-cqc Jan 14, 2026
0af5259
start_nodes => start_tasks, Task = NodeRunData | unused LoopIterTask
acl-cqc Jan 21, 2026
2e48e46
Fix walk_loop to return LoopIterTask
acl-cqc Jan 14, 2026
ecf0e08
and walk_map
acl-cqc Jan 14, 2026
9ea962a
RIP BODY_PORT
acl-cqc Jan 14, 2026
ace7c46
Some loop fixes
acl-cqc Jan 14, 2026
233c2b2
comment
acl-cqc Jan 14, 2026
12090f1
Fix iter0 of loop not being marked as started by writing dummy nodede…
acl-cqc Jan 14, 2026
d0f0bc0
Silently carry on past missing inputs
acl-cqc Jan 21, 2026
a66eb64
Add enable_logging to start_tasks
acl-cqc Jan 21, 2026
0386882
Remove obsolete/wrong comment
acl-cqc Feb 2, 2026
d29ce52
NodeRunData -> RunNodeTask
acl-cqc Feb 2, 2026
3ea56de
visualization/eval.py: avoid Storage::read_node_def, use is_node_started
acl-cqc Feb 10, 2026
a3c47de
visualization/data: Be clearer about what map_node/loop_node are doing
acl-cqc Feb 10, 2026
3a35fb8
doc viz -> vis
acl-cqc Feb 10, 2026
163b90c
Merge branch 'acl/viz_tidy' into acl/start_graph
acl-cqc Feb 10, 2026
4ef2440
Add GraphData::output_ports
acl-cqc Feb 13, 2026
d35bd23
tierkreis_visualization duplicate full path logic
acl-cqc Feb 13, 2026
cb939f7
Revert "tierkreis_visualization duplicate full path logic"
acl-cqc Feb 13, 2026
4b50336
Merge remote-tracking branch 'origin/main' into HEAD
acl-cqc Feb 13, 2026
ef04bd4
Enforce that Consts have no inputs, and Output nodes have no outputs
acl-cqc Feb 10, 2026
a19ce7c
Split (read/write)_node_def by Locn adding _graph_def variant
acl-cqc Feb 13, 2026
c2e26d0
Remove bogus tests/cases graph_node_from_loc/GraphDataStorage::read_o…
acl-cqc Feb 13, 2026
844c089
Redo graph_node_from_loc + GraphDataStorage::read_graph_def
acl-cqc Feb 13, 2026
e027bfc
test_graphdata_storage: new tests
acl-cqc Feb 13, 2026
6ff25b6
visualization, I hope
acl-cqc Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion tierkreis/tests/controller/test_locs.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ def test_pop_last_multiple() -> None:
(node_location_2, 0),
(node_location_3, 0),
(node_location_4, 0),
(Loc().N(-1), -1),
],
)
def test_get_last_index(node_location: Loc, index: int) -> None:
Expand Down
6 changes: 5 additions & 1 deletion tierkreis/tierkreis/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ def namespace(self) -> str:
class GraphBuilder[Inputs: TModel, Outputs: TModel]:
outputs_type: type
inputs: Inputs
body_input: ValueRef | None

def __init__(
self,
Expand All @@ -72,12 +73,15 @@ def __init__(
self.outputs_type = outputs_type
inputs = [self.data.input(x) for x in model_fields(inputs_type)]
self.inputs = init_tmodel(self.inputs_type, inputs)
self.body_input = None

def get_data(self) -> GraphData:
return self.data

def ref(self) -> TypedGraphRef[Inputs, Outputs]:
return TypedGraphRef((-1, "body"), self.outputs_type, self.inputs_type)
if self.body_input is None:
self.body_input = self.data.input("body")
return TypedGraphRef(self.body_input, self.outputs_type, self.inputs_type)

def outputs(self, outputs: Outputs):
self.data.output(inputs=dict_from_tmodel(outputs))
Expand Down
28 changes: 13 additions & 15 deletions tierkreis/tierkreis/controller/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,18 @@
from time import sleep

from tierkreis.builder import GraphBuilder
from tierkreis.controller.data.graph import Eval, GraphData
from tierkreis.controller.data.location import Loc
from tierkreis.controller.data.graph import GraphData
from tierkreis.controller.data.location import Loc, OutputLoc
from tierkreis.controller.data.types import PType, bytes_from_ptype, ptype_from_bytes
from tierkreis.controller.executor.protocol import ControllerExecutor
from tierkreis.controller.start import NodeRunData, start, start_nodes
from tierkreis.controller.start import start_graph, start_tasks
from tierkreis.logger_setup import set_tkr_logger
from tierkreis.controller.storage.protocol import ControllerStorage
from tierkreis.controller.storage.walk import walk_node
from tierkreis.controller.data.core import PortID, ValueRef
from tierkreis.controller.data.core import PortID
from tierkreis.exceptions import TierkreisError

root_loc = Loc("")
root_loc = Loc("") # Special, used as inputs to the toplevel graph
logger = logging.getLogger(__name__)


Expand All @@ -35,20 +35,18 @@ def run_graph(
if len(remaining_inputs) > 0:
logger.warning(f"Some inputs were not provided: {remaining_inputs}")

storage.write_metadata(Loc(""))
storage.write_metadata(root_loc) # Should this be Loc() ?
if enable_logging:
set_tkr_logger(storage.logs_path)

for name, value in graph_inputs.items():
storage.write_output(root_loc.N(-1), name, bytes_from_ptype(value))
storage.write_output(root_loc, name, bytes_from_ptype(value))
storage.write_output(root_loc, "body", bytes_from_ptype(g))

storage.write_output(root_loc.N(-1), "body", bytes_from_ptype(g))

inputs: dict[PortID, ValueRef] = {
k: (-1, k) for k, _ in graph_inputs.items() if k != "body"
inputs: dict[PortID, OutputLoc] = {
k: (root_loc, k) for k, _ in graph_inputs.items() if k != "body"
}
node_run_data = NodeRunData(Loc(), Eval((-1, "body"), inputs), [])
start(storage, executor, node_run_data, enable_logging)
start_graph(storage, executor, Loc(), (root_loc, "body"), inputs)
resume_graph(storage, executor, n_iterations, polling_interval_seconds)


Expand All @@ -58,7 +56,7 @@ def resume_graph(
n_iterations: int = 10000,
polling_interval_seconds: float = 0.01,
) -> None:
message = storage.read_output(Loc().N(-1), "body")
message = storage.read_output(root_loc, "body")
graph = ptype_from_bytes(message, GraphData)

for _ in range(n_iterations):
Expand All @@ -82,7 +80,7 @@ def resume_graph(
print("--- Tierkreis graph errors above this line. ---\n\n")
raise TierkreisError("Graph encountered errors")

start_nodes(storage, executor, walk_results.inputs_ready)
start_tasks(storage, executor, walk_results.inputs_ready)
if storage.is_node_finished(Loc()):
break
sleep(polling_interval_seconds)
1 change: 0 additions & 1 deletion tierkreis/tierkreis/controller/consts.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import os
from pathlib import Path

BODY_PORT = "body"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm, perhaps I should use this everywhere (and move to labels.py) rather than delete it!?

PACKAGE_PATH = Path(os.path.dirname(os.path.realpath(__file__)))
1 change: 1 addition & 0 deletions tierkreis/tierkreis/controller/data/location.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __new__(cls, k: str = "-") -> "Loc":
return super(Loc, cls).__new__(cls, k)

def N(self, idx: int) -> "Loc":
assert idx >= 0
return Loc(str(self) + f".N{idx}")

def L(self, idx: int) -> "Loc":
Expand Down
107 changes: 66 additions & 41 deletions tierkreis/tierkreis/controller/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from pathlib import Path
import subprocess
import sys
from typing import Sequence

from tierkreis.controller.data.core import PortID
from tierkreis.controller.data.types import bytes_from_ptype, ptype_from_bytes
Expand All @@ -22,24 +23,44 @@
logger = logging.getLogger(__name__)


# ALAN this should really be NodeRunTask (or RunNodeTask)
@dataclass
class NodeRunData:
node_location: Loc
node: NodeDef
output_list: list[PortID]


def start_nodes(
@dataclass
class LoopIterTask:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is actually StartGraphTask just that we only use it for loop iters...

iter_location: Loc
graph_input: OutputLoc
inputs: dict[PortID, OutputLoc]


Task = NodeRunData | LoopIterTask


def start_tasks(
storage: ControllerStorage,
executor: ControllerExecutor,
node_run_data: list[NodeRunData],
tasks: Sequence[Task],
enable_logging: bool = True,
) -> None:
started_locs: set[Loc] = set()
for node_run_datum in node_run_data:
if node_run_datum.node_location in started_locs:
continue
start(storage, executor, node_run_datum)
started_locs.add(node_run_datum.node_location)
for task in tasks:
if isinstance(task, LoopIterTask):
start_graph(
storage,
executor,
task.iter_location,
task.graph_input,
task.inputs,
)
started_locs.add(task.iter_location)
elif task.node_location not in started_locs:
start(storage, executor, task, enable_logging)
started_locs.add(task.node_location)


def run_builtin(def_path: Path, logs_path: Path) -> None:
Expand Down Expand Up @@ -92,9 +113,7 @@ def start(
executor.run(launcher_name, call_args_path)

elif node.type == "input":
input_loc = parent.N(-1)
storage.link_outputs(node_location, node.name, input_loc, node.name)
storage.mark_node_finished(node_location)
assert storage.is_node_finished(node_location)

elif node.type == "output":
storage.mark_node_finished(node_location)
Expand All @@ -108,51 +127,31 @@ def start(
storage.mark_node_finished(node_location)

elif node.type == "eval":
message = storage.read_output(parent.N(node.graph[0]), node.graph[1])
g = ptype_from_bytes(message, GraphData)
ins["body"] = (parent.N(node.graph[0]), node.graph[1])
ins.update(g.fixed_inputs)

pipe_inputs_to_output_location(storage, node_location.N(-1), ins)
graph_input = (parent.N(node.graph[0]), node.graph[1])
start_graph(storage, executor, node_location, graph_input, ins)

elif node.type == "loop":
ins["body"] = (parent.N(node.body[0]), node.body[1])
pipe_inputs_to_output_location(storage, node_location.N(-1), ins)
graph_input = (parent.N(node.body[0]), node.body[1])
if (
node.name is not None
): # should we do this only in debug mode? -> need to think through how this would work
storage.write_debug_data(node.name, node_location)
start(
storage,
executor,
NodeRunData(
node_location.L(0),
Eval((-1, "body"), {k: (-1, k) for k, _ in ins.items()}, node.outputs),
output_list,
),
)
start_graph(storage, executor, node_location.L(0), graph_input, ins)

elif node.type == "map":
first_ref = next(x for x in ins.values() if x[1] == "*")
map_eles = outputs_iter(storage, first_ref[0])
if not map_eles:
storage.mark_node_finished(node_location)
graph_input = (parent.N(node.body[0]), node.body[1])
for idx, p in map_eles:
eval_inputs: dict[PortID, tuple[Loc, PortID]] = {}
eval_inputs["body"] = (parent.N(node.body[0]), node.body[1])
for k, (i, port) in ins.items():
if port == "*":
eval_inputs[k] = (i, p)
else:
eval_inputs[k] = (i, port)
pipe_inputs_to_output_location(
storage, node_location.M(idx).N(-1), eval_inputs
)
# Necessary in the node visualization
storage.write_node_def(
node_location.M(idx), Eval((-1, "body"), node.inputs, node.outputs)
start_graph(
storage,
executor,
node_location.M(idx),
graph_input,
{k: (i, p if port == "*" else port) for k, (i, port) in ins.items()},
)

elif node.type == "ifelse":
pass

Expand All @@ -162,6 +161,32 @@ def start(
assert_never(node)


def start_graph(
storage: ControllerStorage,
executor: ControllerExecutor,
loc: Loc,
graph_input: OutputLoc,
ins: dict[PortID, OutputLoc],
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could add an enable_logging here, to match start; there'd be no uses of it, but there are none in start either....

) -> None:
# We have to write something here to mark the node/graph as started.
# TODO ALAN - pass NodeDef? But can't really make valid/correct inputs.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit dubious, as per PR description

# For now just write a dummy, but don't overwrite if there's a better one already!
if not storage.is_node_started(loc):
storage.write_node_def(loc, Eval((-1, "body"), {}))
message = storage.read_output(*graph_input)
g = ptype_from_bytes(message, GraphData)
ins["body"] = graph_input
ins.update(g.fixed_inputs)
for i, n in enumerate(g.nodes):
if n.type == "input":
input_loc = loc.N(i)
if value := ins.get(n.name):
storage.link_outputs(input_loc, n.name, *value)
# else, ideally we'd check if that input is optional and error if not,
# but since we don't have the graph type here, we'll assume it's optional!
storage.mark_node_finished(input_loc)


def pipe_inputs_to_output_location(
storage: ControllerStorage,
output_loc: Loc,
Expand Down
6 changes: 3 additions & 3 deletions tierkreis/tierkreis/controller/storage/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,11 +214,11 @@ def is_node_started(self, node_location: Loc) -> bool:
def is_node_finished(self, node_location: Loc) -> bool:
return self.exists(self._done_path(node_location))

def latest_loop_iteration(self, loc: Loc) -> Loc:
def latest_loop_iteration(self, loc: Loc) -> int:
i = 0
while self.is_node_started(loc.L(i + 1)):
i += 1
return loc.L(i)
return i

def node_has_error(self, node_location: Loc) -> bool:
return self.exists(self._error_path(node_location))
Expand Down Expand Up @@ -299,7 +299,7 @@ def dependents(self, loc: Loc) -> set[Loc]:
case ("M", _):
descs.update(self.dependents(parent))
case ("L", idx):
latest_idx = self.latest_loop_iteration(parent).peek_index()
latest_idx = self.latest_loop_iteration(parent)
[descs.add(parent.L(i)) for i in range(idx + 1, latest_idx + 1)]
descs.update(self.dependents(parent))
case _:
Expand Down
Loading