Skip to content

Commit 2ab5705

Browse files
authored
Merge pull request #322 from djarecka/enh/dotfile
creating dotfiles for graphs (closes #197)
2 parents a696e11 + cb48c2a commit 2ab5705

File tree

5 files changed

+660
-7
lines changed

5 files changed

+660
-7
lines changed

pydra/engine/core.py

Lines changed: 61 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -737,7 +737,7 @@ def __init__(
737737
rerun=rerun,
738738
)
739739

740-
self.graph = DiGraph()
740+
self.graph = DiGraph(name=name)
741741
self.name2obj = {}
742742

743743
# store output connections
@@ -828,15 +828,17 @@ def add(self, task):
828828
logger.debug(f"Added {task}")
829829
return self
830830

831-
def create_connections(self, task):
831+
def create_connections(self, task, detailed=False):
832832
"""
833833
Add and connect a particular task to existing nodes in the workflow.
834834
835835
Parameters
836836
----------
837837
task : :class:`TaskBase`
838838
The task to be added.
839-
839+
detailed : :obj:`bool`
840+
If True, `add_edges_description` is run for self.graph to add
841+
a detailed descriptions of the connections (input/output fields names)
840842
"""
841843
other_states = {}
842844
for field in attr_fields(task.inputs):
@@ -847,9 +849,12 @@ def create_connections(self, task):
847849
# adding an edge to the graph if task id expecting output from a different task
848850
if val.name != self.name:
849851
# checking if the connection is already in the graph
850-
if (getattr(self, val.name), task) in self.graph.edges:
851-
continue
852-
self.graph.add_edges((getattr(self, val.name), task))
852+
if (getattr(self, val.name), task) not in self.graph.edges:
853+
self.graph.add_edges((getattr(self, val.name), task))
854+
if detailed:
855+
self.graph.add_edges_description(
856+
(task.name, field.name, val.name, val.field)
857+
)
853858
logger.debug("Connecting %s to %s", val.name, task.name)
854859

855860
if (
@@ -861,6 +866,13 @@ def create_connections(self, task):
861866
getattr(self, val.name).state,
862867
field.name,
863868
)
869+
else: # LazyField with the wf input
870+
# connections with wf input should be added to the detailed graph description
871+
if detailed:
872+
self.graph.add_edges_description(
873+
(task.name, field.name, val.name, val.field)
874+
)
875+
864876
# if task has connections state has to be recalculated
865877
if other_states:
866878
if hasattr(task, "fut_combiner"):
@@ -1000,6 +1012,49 @@ def _collect_outputs(self):
10001012
raise ValueError(f"Task {val.name} raised an error")
10011013
return attr.evolve(output, **output_wf)
10021014

1015+
def create_dotfile(self, type="simple", export=None, name=None):
1016+
"""creating a graph - dotfile and optionally exporting to other formats"""
1017+
if not name:
1018+
name = f"graph_{self.name}"
1019+
if type == "simple":
1020+
for task in self.graph.nodes:
1021+
self.create_connections(task)
1022+
dotfile = self.graph.create_dotfile_simple(
1023+
outdir=self.output_dir, name=name
1024+
)
1025+
elif type == "nested":
1026+
for task in self.graph.nodes:
1027+
self.create_connections(task)
1028+
dotfile = self.graph.create_dotfile_nested(
1029+
outdir=self.output_dir, name=name
1030+
)
1031+
elif type == "detailed":
1032+
# create connections with detailed=True
1033+
for task in self.graph.nodes:
1034+
self.create_connections(task, detailed=True)
1035+
# adding wf outputs
1036+
for (wf_out, lf) in self._connections:
1037+
self.graph.add_edges_description((self.name, wf_out, lf.name, lf.field))
1038+
dotfile = self.graph.create_dotfile_detailed(
1039+
outdir=self.output_dir, name=name
1040+
)
1041+
else:
1042+
raise Exception(
1043+
f"type of the graph can be simple, detailed or nested, "
1044+
f"but {type} provided"
1045+
)
1046+
if not export:
1047+
return dotfile
1048+
else:
1049+
if export is True:
1050+
export = ["png"]
1051+
elif isinstance(export, str):
1052+
export = [export]
1053+
formatted_dot = []
1054+
for ext in export:
1055+
formatted_dot.append(self.graph.export_graph(dotfile=dotfile, ext=ext))
1056+
return dotfile, formatted_dot
1057+
10031058

10041059
def is_task(obj):
10051060
"""Check whether an object looks like a task."""

pydra/engine/graph.py

Lines changed: 230 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,15 @@
11
"""Data structure to support :class:`~pydra.engine.core.Workflow` tasks."""
22
from copy import copy
3+
from pathlib import Path
4+
import subprocess as sp
5+
36
from .helpers import ensure_list
47

58

69
class DiGraph:
710
"""A simple Directed Graph object."""
811

9-
def __init__(self, nodes=None, edges=None):
12+
def __init__(self, name=None, nodes=None, edges=None):
1013
"""
1114
Initialize a directed graph.
1215
@@ -19,13 +22,15 @@ def __init__(self, nodes=None, edges=None):
1922
the graph.
2023
2124
"""
25+
self.name = name
2226
self._nodes = []
2327
self.nodes = nodes
2428
self._edges = []
2529
self.edges = edges
2630
self._create_connections()
2731
self._sorted_nodes = None
2832
self._node_wip = []
33+
self._nodes_details = {}
2934

3035
def copy(self):
3136
"""
@@ -92,6 +97,20 @@ def edges_names(self):
9297
"""Get edges as pairs of the nodes they connect."""
9398
return [(edg[0].name, edg[1].name) for edg in self._edges]
9499

100+
@property
101+
def nodes_details(self):
102+
""" dictionary with details of the nodes
103+
for each task, there are inputs/outputs and connections
104+
(with input/output fields names)
105+
"""
106+
# removing repeated fields from inputs and outputs
107+
for el in self._nodes_details.values():
108+
el["inputs"] = list(set(el["inputs"]))
109+
el["inputs"].sort()
110+
el["outputs"] = list(set(el["outputs"]))
111+
el["outputs"].sort()
112+
return self._nodes_details
113+
95114
@property
96115
def sorted_nodes(self):
97116
"""Return sorted nodes (runs sorting if needed)."""
@@ -136,6 +155,19 @@ def add_edges(self, new_edges):
136155
# starting from the previous sorted list, so it's faster
137156
self.sorting(presorted=self.sorted_nodes + [])
138157

158+
def add_edges_description(self, new_edge_details):
159+
""" adding detailed description of the connections, filling _nodes_details"""
160+
in_nd, in_fld, out_nd, out_fld = new_edge_details
161+
for key in [in_nd, out_nd]:
162+
self._nodes_details.setdefault(
163+
key, {"inputs": [], "outputs": [], "connections": []}
164+
)
165+
166+
if (in_fld, out_nd, out_fld) not in self._nodes_details[in_nd]["connections"]:
167+
self._nodes_details[in_nd]["connections"].append((in_fld, out_nd, out_fld))
168+
self._nodes_details[in_nd]["inputs"].append(in_fld)
169+
self._nodes_details[out_nd]["outputs"].append(out_fld)
170+
139171
def sorting(self, presorted=None):
140172
"""
141173
Sort this graph.
@@ -318,3 +350,200 @@ def calculate_max_paths(self):
318350
for nm in first_nodes:
319351
self.max_paths[nm] = {}
320352
self._checking_path(node_name=nm, first_name=nm)
353+
354+
def create_dotfile_simple(self, outdir, name="graph"):
355+
""" creates a simple dotfile (no nested structure)"""
356+
from .core import is_workflow
357+
358+
dotstr = "digraph G {\n"
359+
for nd in self.nodes:
360+
# breakpoint()
361+
if is_workflow(nd):
362+
if nd.state:
363+
# adding color for wf with a state
364+
dotstr += f"{nd.name} [shape=box, color=blue]\n"
365+
else:
366+
dotstr += f"{nd.name} [shape=box]\n"
367+
else:
368+
if nd.state:
369+
# adding color for nd with a state
370+
dotstr += f"{nd.name} [color=blue]\n"
371+
else:
372+
dotstr += f"{nd.name}\n"
373+
for ed in self.edges_names:
374+
dotstr += f"{ed[0]} -> {ed[1]}\n"
375+
376+
dotstr += "}"
377+
Path(outdir).mkdir(parents=True, exist_ok=True)
378+
dotfile = Path(outdir) / f"{name}.dot"
379+
dotfile.write_text(dotstr)
380+
return dotfile
381+
382+
def create_dotfile_detailed(self, outdir, name="graph_det"):
383+
""" creates a detailed dotfile (detailed connections - input/output fields,
384+
but no nested structure)
385+
"""
386+
dotstr = "digraph structs {\n"
387+
dotstr += "node [shape=record];\n"
388+
if not self._nodes_details:
389+
raise Exception("node_details is empty, detailed dotfile can't be created")
390+
for nd_nm, nd_det in self.nodes_details.items():
391+
if nd_nm == self.name: # the main workflow itself
392+
# wf inputs
393+
wf_inputs_str = f'{{<{nd_det["outputs"][0]}> {nd_det["outputs"][0]}'
394+
for el in nd_det["outputs"][1:]:
395+
wf_inputs_str += f" | <{el}> {el}"
396+
wf_inputs_str += "}"
397+
dotstr += f'struct_{nd_nm} [color=red, label="{{WORKFLOW INPUT: | {wf_inputs_str}}}"];\n'
398+
# wf outputs
399+
wf_outputs_str = f'{{<{nd_det["inputs"][0]}> {nd_det["inputs"][0]}'
400+
for el in nd_det["inputs"][1:]:
401+
wf_outputs_str += f" | <{el}> {el}"
402+
wf_outputs_str += "}"
403+
dotstr += f'struct_{nd_nm}_out [color=red, label="{{WORKFLOW OUTPUT: | {wf_outputs_str}}}"];\n'
404+
# connections to the wf outputs
405+
for con in nd_det["connections"]:
406+
dotstr += (
407+
f"struct_{con[1]}:{con[2]} -> struct_{nd_nm}_out:{con[0]};\n"
408+
)
409+
else: # elements of the main workflow
410+
inputs_str = "{INPUT:"
411+
for inp in nd_det["inputs"]:
412+
inputs_str += f" | <{inp}> {inp}"
413+
inputs_str += "}"
414+
outputs_str = "{OUTPUT:"
415+
for out in nd_det["outputs"]:
416+
outputs_str += f" | <{out}> {out}"
417+
outputs_str += "}"
418+
dotstr += f'struct_{nd_nm} [shape=record, label="{inputs_str} | {nd_nm} | {outputs_str}"];\n'
419+
# connections between elements
420+
for con in nd_det["connections"]:
421+
dotstr += f"struct_{con[1]}:{con[2]} -> struct_{nd_nm}:{con[0]};\n"
422+
dotstr += "}"
423+
Path(outdir).mkdir(parents=True, exist_ok=True)
424+
dotfile = Path(outdir) / f"{name}.dot"
425+
dotfile.write_text(dotstr)
426+
return dotfile
427+
428+
def create_dotfile_nested(self, outdir, name="graph"):
429+
"""dotfile that includes the nested structures for workflows"""
430+
dotstr = "digraph G {\ncompound=true \n"
431+
dotstr += self._create_dotfile_single_graph(nodes=self.nodes, edges=self.edges)
432+
dotstr += "}"
433+
Path(outdir).mkdir(parents=True, exist_ok=True)
434+
dotfile = Path(outdir) / f"{name}.dot"
435+
dotfile.write_text(dotstr)
436+
return dotfile
437+
438+
def _create_dotfile_single_graph(self, nodes, edges):
439+
from .core import is_workflow
440+
441+
wf_asnd = []
442+
dotstr = ""
443+
for nd in nodes:
444+
if is_workflow(nd):
445+
wf_asnd.append(nd.name)
446+
for task in nd.graph.nodes:
447+
nd.create_connections(task)
448+
dotstr += f"subgraph cluster_{nd.name} {{\n" f"label = {nd.name} \n"
449+
dotstr += self._create_dotfile_single_graph(
450+
nodes=nd.graph.nodes, edges=nd.graph.edges
451+
)
452+
if nd.state:
453+
dotstr += "color=blue\n"
454+
dotstr += "}\n"
455+
else:
456+
if nd.state:
457+
dotstr += f"{nd.name} [color=blue]\n"
458+
else:
459+
dotstr += f"{nd.name}\n"
460+
461+
dotstr_edg = ""
462+
for ed in edges:
463+
if ed[0].name in wf_asnd and ed[1].name in wf_asnd:
464+
head_nd = list(ed[1].nodes)[0].name
465+
tail_nd = list(ed[0].nodes)[-1].name
466+
dotstr_edg += (
467+
f"{tail_nd} -> {head_nd} "
468+
f"[ltail=cluster_{ed[0].name}, "
469+
f"lhead=cluster_{ed[1].name}]\n"
470+
)
471+
elif ed[0].name in wf_asnd:
472+
tail_nd = list(ed[0].nodes)[-1].name
473+
dotstr_edg += (
474+
f"{tail_nd} -> {ed[1].name} [ltail=cluster_{ed[0].name}]\n"
475+
)
476+
elif ed[1].name in wf_asnd:
477+
head_nd = list(ed[1].nodes)[0].name
478+
dotstr_edg += (
479+
f"{ed[0].name} -> {head_nd} [lhead=cluster_{ed[1].name}]\n"
480+
)
481+
else:
482+
dotstr_edg += f"{ed[0].name} -> {ed[1].name}\n"
483+
dotstr = dotstr + dotstr_edg
484+
return dotstr
485+
486+
def export_graph(self, dotfile, ext="png"):
487+
""" exporting dotfile to other format, equires the dot command"""
488+
available_ext = [
489+
"bmp",
490+
"canon",
491+
"cgimage",
492+
"cmap",
493+
"cmapx",
494+
"cmapx_np",
495+
"dot",
496+
"dot_json",
497+
"eps",
498+
"exr",
499+
"fig",
500+
"gif",
501+
"gv",
502+
"icns",
503+
"ico",
504+
"imap",
505+
"imap_np",
506+
"ismap",
507+
"jp2",
508+
"jpe",
509+
"jpeg",
510+
"jpg",
511+
"json",
512+
"json0",
513+
"mp",
514+
"pct",
515+
"pdf",
516+
"pic",
517+
"pict",
518+
"plain",
519+
"plain-ext",
520+
"png",
521+
"pov",
522+
"ps",
523+
"ps2",
524+
"psd",
525+
"sgi",
526+
"svg",
527+
"svgz",
528+
"tga",
529+
"tif",
530+
"tiff",
531+
"tk",
532+
"vml",
533+
"vmlz",
534+
"xdot",
535+
"xdot1.2",
536+
"xdot1.4",
537+
"xdot_json",
538+
]
539+
if ext not in available_ext:
540+
raise Exception(f"unvalid extension - {ext}, chose from {available_ext}")
541+
542+
dot_check = sp.run(["which", "dot"], stdout=sp.PIPE, stderr=sp.PIPE)
543+
if not dot_check.stdout:
544+
raise Exception(f"dot command not available, can't create a {ext} file")
545+
546+
formatted_dot = dotfile.with_suffix(f".{ext}")
547+
cmd = f"dot -T{ext} -o {formatted_dot} {dotfile}"
548+
sp.run(cmd.split(), stdout=sp.PIPE, stderr=sp.PIPE)
549+
return formatted_dot

0 commit comments

Comments
 (0)