From ee6245b821d3971331d7e422fa79d95f8ead7291 Mon Sep 17 00:00:00 2001 From: Patrik Smeds Date: Fri, 30 May 2025 14:43:05 -0400 Subject: [PATCH 1/2] this update will make it possible to list all invocations without knowing the workflow id, invocations will then be grouped by workflow. The user will also have to option of outputing the list as a json. --- planemo/commands/cmd_list_invocations.py | 110 ++++++++++++++++------- planemo/galaxy/api.py | 17 +++- 2 files changed, 92 insertions(+), 35 deletions(-) diff --git a/planemo/commands/cmd_list_invocations.py b/planemo/commands/cmd_list_invocations.py index 46b8528e8..f41b61d57 100644 --- a/planemo/commands/cmd_list_invocations.py +++ b/planemo/commands/cmd_list_invocations.py @@ -7,7 +7,10 @@ from planemo import options from planemo.cli import command_function from planemo.galaxy import profiles -from planemo.galaxy.api import get_invocations +from planemo.galaxy.api import ( + get_invocations, + gi, +) from planemo.galaxy.workflows import remote_runnable_to_workflow_id from planemo.io import ( error, @@ -25,24 +28,53 @@ @click.argument( "workflow_identifier", type=click.STRING, + required=False, + default="", +) +@click.option( + "--raw", + is_flag=True, + help="output will be a json structure.", + default=False, +) +@click.option( + "--max_items", + type=click.INT, + help="max number of items returned.", + default=100, +) +@click.option( + "--offset_items", + type=click.INT, + help="skip first X items.", + default=0, ) @options.profile_option(required=True) @command_function -def cli(ctx, workflow_identifier, **kwds): +def cli(ctx, workflow_identifier, raw, max_items, offset_items, **kwds): """ Get a list of invocations for a particular workflow ID or alias. """ - info(f"Looking for invocations for workflow {workflow_identifier}...") - runnable = for_runnable_identifier(ctx, workflow_identifier, kwds) + if not raw: + info(f"Looking for invocations for workflow {workflow_identifier}...") profile = profiles.ensure_profile(ctx, kwds.get("profile")) - assert runnable.is_remote_workflow_uri - workflow_id = remote_runnable_to_workflow_id(runnable) - + if workflow_identifier: + runnable = for_runnable_identifier(ctx, workflow_identifier, kwds) + assert runnable.is_remote_workflow_uri + workflow_id = remote_runnable_to_workflow_id(runnable) + else: + workflow_id = "" + gi_client = gi(None, profile["galaxy_url"], profile["galaxy_admin_key"] or profile["galaxy_user_key"]) invocations = get_invocations( - url=profile["galaxy_url"], - key=profile["galaxy_admin_key"] or profile["galaxy_user_key"], + gi=gi_client, workflow_id=workflow_id, + instance=True, + max_items=max_items, + offset_items=offset_items, ) + if raw: + print(json.dumps(invocations, indent=4, sort_keys=True)) + return if tabulate is not None: state_colors = { "ok": "\033[92m", # green @@ -50,34 +82,50 @@ def cli(ctx, workflow_identifier, **kwds): "error": "\033[91m", # red "paused": "\033[96m", # cyan "deleted": "\033[95m", # magenta + "deleting": "\033[95m", # magenta "deleted_new": "\033[95m", # magenta "new": "\033[96m", # cyan "queued": "\033[93m", # yellow + "skipped": "\033[90m", # gray } - print( - tabulate( - { - "Invocation ID": invocations.keys(), - "Jobs status": [ - ", ".join([f"{state_colors[k]}{v} jobs {k}\033[0m" for k, v in inv["states"].items()]) - for inv in invocations.values() - ], - "Invocation report URL": [ - "{}/workflows/invocations/report?id={}".format(profile["galaxy_url"].strip("/"), inv_id) - for inv_id in invocations - ], - "History URL": [ - "{}/histories/view?id={}".format( - profile["galaxy_url"].strip("/"), invocations[inv_id]["history_id"] - ) - for inv_id in invocations - ], - }, - headers="keys", + + grouped_invocations = {} + workflows = {} + for inv_id, inv in invocations.items(): + wf_id = inv["workflow_id"] + if wf_id not in grouped_invocations: + workflow = gi_client.workflows.show_workflow(workflow_id=wf_id, instance=True) + workflows[wf_id] = (workflow["name"], workflow["id"]) + grouped_invocations.setdefault(wf_id, {})[inv_id] = inv + for workflow_id, data in grouped_invocations.items(): + header = f"Workflow: {workflows[workflow_id][0]} : {profile['galaxy_url'].strip('/')}/workflows/run?id={workflows[workflow_id][1]}" + print(f"\n{header}") + print(len(header) * "=") + print( + tabulate( + { + "Invocation ID": data.keys(), + "Invocation report URL": [ + "{}/workflows/invocations/report?id={}".format(profile["galaxy_url"].strip("/"), inv_id) + for inv_id in data + ], + "History URL": [ + "{}/histories/view?id={}".format( + profile["galaxy_url"].strip("/"), invocations[inv_id]["history_id"] + ) + for inv_id in data + ], + "Jobs status": [ + ", ".join([f"{state_colors[k]}{v} jobs {k}\033[0m" for k, v in inv["states"].items()]) + for inv in data.values() + ], + }, + headers="keys", + ) ) - ) else: error("The tabulate package is not installed, invocations could not be listed correctly.") print(json.dumps(invocations, indent=4, sort_keys=True)) - info(f"{len(invocations)} invocations found.") + if not raw: + info(f"{len(invocations)} invocations found.") return diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index cbcea69a8..89969ab7e 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -116,12 +116,21 @@ def summarize_history(ctx, gi, history_id): print("|") -def get_invocations(url, key, workflow_id): - inv_gi = gi(None, url, key) - invocations = inv_gi.workflows.get_invocations(workflow_id) +def get_invocations(gi, workflow_id, instance=False, max_items=100, items_per_request=20, offset_items=0): + invocations = [] + while len(invocations) < max_items: + if workflow_id: + items = gi.invocations.get_invocations(workflow_id, limit=min(items_per_request, max_items), offset=len(invocations) + offset_items) + else: + items = gi.invocations.get_invocations(instance=instance, limit=min(items_per_request, max_items), offset=len(invocations) + offset_items) + if (items is None) or (len(items) == 0): + break + else: + invocations.extend(items) return { invocation["id"]: { - "states": inv_gi.invocations.get_invocation_summary(invocation["id"])["states"], + "states": gi.invocations.get_invocation_summary(invocation["id"])["states"], + "workflow_id": invocation["workflow_id"], "history_id": invocation["history_id"], } for invocation in invocations From acc9f74b13c0f3e137048b501f8441b6748ebe93 Mon Sep 17 00:00:00 2001 From: Patrik Smeds Date: Mon, 2 Jun 2025 15:12:29 +0200 Subject: [PATCH 2/2] Update api.py --- planemo/galaxy/api.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/planemo/galaxy/api.py b/planemo/galaxy/api.py index 89969ab7e..a8d68af02 100644 --- a/planemo/galaxy/api.py +++ b/planemo/galaxy/api.py @@ -120,9 +120,17 @@ def get_invocations(gi, workflow_id, instance=False, max_items=100, items_per_re invocations = [] while len(invocations) < max_items: if workflow_id: - items = gi.invocations.get_invocations(workflow_id, limit=min(items_per_request, max_items), offset=len(invocations) + offset_items) + items = gi.invocations.get_invocations( + workflow_id, + limit=min(items_per_request, max_items), + offset=len(invocations) + offset_items, + ) else: - items = gi.invocations.get_invocations(instance=instance, limit=min(items_per_request, max_items), offset=len(invocations) + offset_items) + items = gi.invocations.get_invocations( + instance=instance, + limit=min(items_per_request, max_items), + offset=len(invocations) + offset_items, + ) if (items is None) or (len(items) == 0): break else: