diff --git a/cubed/core/plan.py b/cubed/core/plan.py index 07a4cfdc..b1a1ce95 100644 --- a/cubed/core/plan.py +++ b/cubed/core/plan.py @@ -102,6 +102,7 @@ def _new( first_cubed_i = min(i for i, s in enumerate(stack_summaries) if s.is_cubed()) first_cubed_summary = stack_summaries[first_cubed_i] + func_name = first_cubed_summary.name op_name_unique = gensym() @@ -111,9 +112,9 @@ def _new( op_name_unique, name=op_name_unique, op_name=op_name, + func_name=func_name, type="op", stack_summaries=stack_summaries, - op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}", hidden=hidden, ) # array @@ -142,9 +143,9 @@ def _new( op_name_unique, name=op_name_unique, op_name=op_name, + func_name=func_name, type="op", stack_summaries=stack_summaries, - op_display_name=f"{op_name_unique}\n{first_cubed_summary.name}", hidden=hidden, primitive_op=primitive_op, pipeline=primitive_op.pipeline, @@ -216,7 +217,7 @@ def _create_lazy_zarr_arrays(self, dag): name=name, op_name=op_name, type="op", - op_display_name=name, + func_name="", primitive_op=primitive_op, pipeline=primitive_op.pipeline, ) @@ -388,7 +389,8 @@ def visualize( tooltip = f"name: {n}\n" node_type = d.get("type", None) if node_type == "op": - label = d["op_display_name"] + func_name = d["func_name"] + label = f"{n}\n{func_name}".strip() op_name = d["op_name"] if op_name == "blockwise": d["style"] = '"rounded,filled"' diff --git a/cubed/diagnostics/rich.py b/cubed/diagnostics/rich.py index 42a23ccd..7e4794fb 100644 --- a/cubed/diagnostics/rich.py +++ b/cubed/diagnostics/rich.py @@ -44,7 +44,8 @@ def on_compute_start(self, event): progress_tasks = {} for name, node in visit_nodes(event.dag): num_tasks = node["primitive_op"].num_tasks - op_display_name = node["op_display_name"].replace("\n", " ") + func_name = node["func_name"] + op_display_name = f"{name} {func_name}" progress_task = progress.add_task( f"{op_display_name}", start=False, total=num_tasks ) diff --git a/cubed/diagnostics/tqdm.py b/cubed/diagnostics/tqdm.py index 3963fd13..d818a0dd 100644 --- a/cubed/diagnostics/tqdm.py +++ b/cubed/diagnostics/tqdm.py @@ -18,18 +18,15 @@ def on_compute_start(self, event): from tqdm.auto import tqdm # find the maximum display width so we can align bars below - max_op_display_name = ( - max( - len(node["op_display_name"].replace("\n", " ")) - for _, node in visit_nodes(event.dag) - ) - + 1 # for the colon + max_op_display_name = max( + len(f"{name} {node['func_name']}:") for name, node in visit_nodes(event.dag) ) self.pbars = {} for i, (name, node) in enumerate(visit_nodes(event.dag)): num_tasks = node["primitive_op"].num_tasks - op_display_name = node["op_display_name"].replace("\n", " ") + ":" + func_name = node["func_name"] + op_display_name = f"{name} {func_name}:" # note double curlies to get literal { and } for tqdm bar format bar_format = ( f"{{desc:{max_op_display_name}}} {{percentage:3.0f}}%|{{bar}}{{r_bar}}" diff --git a/cubed/runtime/executors/coiled.py b/cubed/runtime/executors/coiled.py index fcb20aec..e9c56143 100644 --- a/cubed/runtime/executors/coiled.py +++ b/cubed/runtime/executors/coiled.py @@ -13,8 +13,8 @@ from cubed.spec import Spec -def make_coiled_function(func, coiled_kwargs): - return coiled.function(**coiled_kwargs)(execution_stats(func)) +def make_coiled_function(func, name, coiled_kwargs): + return coiled.function(**coiled_kwargs)(execution_stats(func, name=name)) class CoiledExecutor(DagExecutor): @@ -41,7 +41,12 @@ def execute_dag( for name, node in visit_nodes(dag): handle_operation_start_callbacks(callbacks, name) pipeline = node["pipeline"] - coiled_function = make_coiled_function(pipeline.function, merged_kwargs) + # this name will show up on the dask dashboard - need to replace '-' as anything after it is suppressed + func_name = node["func_name"] + op_display_name = f"{name} {func_name}".replace("-", "_") + coiled_function = make_coiled_function( + pipeline.function, op_display_name, merged_kwargs + ) if minimum_workers is not None: coiled_function.cluster.adapt(minimum=minimum_workers) # coiled expects a sequence (it calls `len` on it) diff --git a/cubed/runtime/utils.py b/cubed/runtime/utils.py index a0319b9c..321be03a 100644 --- a/cubed/runtime/utils.py +++ b/cubed/runtime/utils.py @@ -57,10 +57,14 @@ def execute_with_timing(function, *args, **kwargs): ) -def execution_stats(func): +def execution_stats(func, name=None): """Decorator to measure timing information and peak memory usage of a function call.""" - return partial(execute_with_stats, func) + def wrapper(*args, **kwargs): + return execute_with_stats(func, *args, **kwargs) + + wrapper.__name__ = name or func.__name__ + return wrapper def execution_timing(func):