Skip to content

Commit dda80d1

Browse files
jlemeshtiborsimko
authored andcommitted
feat(workflow): add logs --follow functionality (#731)
1 parent 80d88dc commit dda80d1

File tree

5 files changed

+390
-81
lines changed

5 files changed

+390
-81
lines changed

AUTHORS.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ The list of contributors in alphabetical order:
1515
- [Giuseppe Steduto](https://orcid.org/0009-0002-1258-8553)
1616
- [Harri Hirvonsalo](https://orcid.org/0000-0002-5503-510X)
1717
- [Jan Okraska](https://orcid.org/0000-0002-1416-3244)
18+
- [Jelizaveta Lemeševa](https://orcid.org/0009-0003-6606-9270)
1819
- [Leticia Wanderley](https://orcid.org/0000-0003-4649-6630)
1920
- [Marco Donadoni](https://orcid.org/0000-0003-2922-5505)
2021
- [Marco Vidal](https://orcid.org/0000-0002-9363-4971)

reana_client/cli/utils.py

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import os
1313
import shlex
1414
import sys
15+
import time
1516
from typing import Callable, NoReturn, Optional, List, Tuple, Union
1617

1718
import click
@@ -24,6 +25,8 @@
2425
RUN_STATUSES,
2526
JOB_STATUS_TO_MSG_COLOR,
2627
JSON,
28+
CLI_LOGS_FOLLOW_MIN_INTERVAL,
29+
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
2730
)
2831
from reana_client.printer import display_message
2932
from reana_client.utils import workflow_uuid_or_name
@@ -409,3 +412,114 @@ def output_user_friendly_logs(workflow_logs, steps):
409412
f"Step {job_name_or_id} emitted no logs.",
410413
msg_type="info",
411414
)
415+
416+
417+
def retrieve_workflow_logs(
418+
workflow,
419+
access_token,
420+
json_format,
421+
filters,
422+
steps,
423+
chosen_filters,
424+
available_filters,
425+
page=None,
426+
size=None,
427+
): # noqa: D301
428+
"""Retrieve workflow logs."""
429+
from reana_client.api.client import get_workflow_logs
430+
431+
response = get_workflow_logs(
432+
workflow,
433+
access_token,
434+
steps=None if not steps else list(set(steps)),
435+
page=page,
436+
size=size,
437+
)
438+
workflow_logs = json.loads(response["logs"])
439+
if filters:
440+
for key, value in chosen_filters.items():
441+
unwanted_steps = [
442+
k
443+
for k, v in workflow_logs["job_logs"].items()
444+
if v[available_filters[key]] != value
445+
]
446+
for job_id in unwanted_steps:
447+
del workflow_logs["job_logs"][job_id]
448+
449+
if json_format:
450+
display_message(json.dumps(workflow_logs, indent=2))
451+
sys.exit(0)
452+
else:
453+
from reana_client.cli.utils import output_user_friendly_logs
454+
455+
output_user_friendly_logs(workflow_logs, None if not steps else list(set(steps)))
456+
457+
458+
def follow_workflow_logs(
459+
workflow,
460+
access_token,
461+
interval,
462+
steps,
463+
): # noqa: D301
464+
"""Continuously poll for workflow or job logs."""
465+
from reana_client.api.client import get_workflow_logs, get_workflow_status
466+
467+
if len(steps) > 1:
468+
display_message(
469+
"Only one step can be followed at a time, ignoring additional steps.",
470+
"warning",
471+
)
472+
if interval < CLI_LOGS_FOLLOW_MIN_INTERVAL:
473+
interval = CLI_LOGS_FOLLOW_DEFAULT_INTERVAL
474+
display_message(
475+
f"Interval should be an integer greater than or equal to {CLI_LOGS_FOLLOW_MIN_INTERVAL}, resetting to default ({CLI_LOGS_FOLLOW_DEFAULT_INTERVAL} s).",
476+
"warning",
477+
)
478+
step = steps[0] if steps else None
479+
480+
previous_logs = ""
481+
482+
while True:
483+
response = get_workflow_logs(
484+
workflow,
485+
access_token,
486+
steps=None if not step else [step],
487+
)
488+
if response.get("live_logs_enabled", False) is False:
489+
display_message(
490+
"Live logs are not enabled, please rerun the command without the --follow flag.",
491+
"error",
492+
)
493+
return
494+
495+
json_response = json.loads(response.get("logs"))
496+
497+
if step:
498+
jobs = json_response["job_logs"]
499+
500+
if not jobs:
501+
raise Exception(f"Step data not found: {step}")
502+
503+
job = next(iter(jobs.values())) # get values of the first job
504+
logs = job["logs"]
505+
status = job["status"]
506+
else:
507+
logs = json_response["workflow_logs"]
508+
status = get_workflow_status(workflow, access_token).get("status")
509+
510+
previous_lines = previous_logs.splitlines()
511+
new_lines = logs.splitlines()
512+
513+
diff = "\n".join([x for x in new_lines if x not in previous_lines])
514+
if diff != "" and diff != "\n":
515+
display_message(diff)
516+
517+
if status in ["finished", "failed", "stopped", "deleted"]:
518+
subject = "Workflow" if not step else "Job"
519+
display_message(
520+
f"{subject} has completed, you might want to rerun the command without the --follow flag.",
521+
"info",
522+
)
523+
return
524+
previous_logs = logs
525+
time.sleep(interval)

reana_client/cli/workflow.py

Lines changed: 94 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,15 @@
3131
key_value_to_dict,
3232
parse_filter_parameters,
3333
requires_environments,
34+
retrieve_workflow_logs,
35+
follow_workflow_logs,
36+
)
37+
from reana_client.config import (
38+
ERROR_MESSAGES,
39+
RUN_STATUSES,
40+
TIMECHECK,
41+
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
3442
)
35-
from reana_client.config import ERROR_MESSAGES, RUN_STATUSES, TIMECHECK
3643
from reana_client.printer import display_message
3744
from reana_client.utils import (
3845
get_reana_yaml_file_path,
@@ -886,6 +893,20 @@ def add_verbose_data_from_response(response, verbose_headers, headers, data):
886893
multiple=True,
887894
help="Filter job logs to include only those steps that match certain filtering criteria. Use --filter name=value pairs. Available filters are compute_backend, docker_img, status and step.",
888895
)
896+
@click.option(
897+
"--follow",
898+
"follow",
899+
is_flag=True,
900+
default=False,
901+
help="Follow the logs of a running workflow or job (similar to tail -f).",
902+
)
903+
@click.option(
904+
"-i",
905+
"--interval",
906+
"interval",
907+
default=CLI_LOGS_FOLLOW_DEFAULT_INTERVAL,
908+
help=f"Sleep time in seconds between log polling if log following is enabled. [default={CLI_LOGS_FOLLOW_DEFAULT_INTERVAL}]",
909+
)
889910
@add_pagination_options
890911
@check_connection
891912
@click.pass_context
@@ -894,22 +915,31 @@ def workflow_logs(
894915
workflow,
895916
access_token,
896917
json_format,
897-
steps=None,
918+
follow,
919+
interval,
898920
filters=None,
899921
page=None,
900922
size=None,
901923
): # noqa: D301
902924
"""Get workflow logs.
903925
904-
The ``logs`` command allows to retrieve logs of running workflow. Note that
905-
only finished steps of the workflow are returned, the logs of the currently
906-
processed step is not returned until it is finished.
926+
The ``logs`` command allows to retrieve logs of a running workflow.
907927
908928
Examples:\n
909-
\t $ reana-client logs -w myanalysis.42
910-
\t $ reana-client logs -w myanalysis.42 -s 1st_step
929+
\t $ reana-client logs -w myanalysis.42\n
930+
\t $ reana-client logs -w myanalysis.42 --json\n
931+
\t $ reana-client logs -w myanalysis.42 --filter status=running\n
932+
\t $ reana-client logs -w myanalysis.42 --filter step=myfit --follow\n
911933
"""
912-
from reana_client.api.client import get_workflow_logs
934+
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
935+
for p in ctx.params:
936+
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
937+
938+
if json_format and follow:
939+
display_message(
940+
"Ignoring --json as it cannot be used together with --follow.",
941+
msg_type="warning",
942+
)
913943

914944
available_filters = {
915945
"step": "job_name",
@@ -920,90 +950,73 @@ def workflow_logs(
920950
steps = []
921951
chosen_filters = dict()
922952

923-
logging.debug("command: {}".format(ctx.command_path.replace(" ", ".")))
924-
for p in ctx.params:
925-
logging.debug("{param}: {value}".format(param=p, value=ctx.params[p]))
926-
if workflow:
927-
if filters:
928-
try:
929-
for f in filters:
930-
key, value = f.split("=")
931-
if key not in available_filters:
953+
if filters:
954+
try:
955+
for f in filters:
956+
key, value = f.split("=")
957+
if key not in available_filters:
958+
display_message(
959+
"Filter '{}' is not valid.\n"
960+
"Available filters are '{}'.".format(
961+
key,
962+
"' '".join(sorted(available_filters.keys())),
963+
),
964+
msg_type="error",
965+
)
966+
sys.exit(1)
967+
elif key == "step":
968+
steps.append(value)
969+
else:
970+
# Case insensitive for compute backends
971+
if (
972+
key == "compute_backend"
973+
and value.lower() in REANA_COMPUTE_BACKENDS
974+
):
975+
value = REANA_COMPUTE_BACKENDS[value.lower()]
976+
elif key == "status" and value not in RUN_STATUSES:
932977
display_message(
933-
"Filter '{}' is not valid.\n"
934-
"Available filters are '{}'.".format(
935-
key,
936-
"' '".join(sorted(available_filters.keys())),
937-
),
978+
"Input status value {} is not valid. ".format(value),
938979
msg_type="error",
939-
)
980+
),
940981
sys.exit(1)
941-
elif key == "step":
942-
steps.append(value)
943-
else:
944-
# Case insensitive for compute backends
945-
if (
946-
key == "compute_backend"
947-
and value.lower() in REANA_COMPUTE_BACKENDS
948-
):
949-
value = REANA_COMPUTE_BACKENDS[value.lower()]
950-
elif key == "status" and value not in RUN_STATUSES:
951-
display_message(
952-
"Input status value {} is not valid. ".format(value),
953-
msg_type="error",
954-
),
955-
sys.exit(1)
956-
chosen_filters[key] = value
957-
except Exception as e:
958-
logging.debug(traceback.format_exc())
959-
logging.debug(str(e))
960-
display_message(
961-
"Please provide complete --filter name=value pairs, "
962-
"for example --filter status=running.\n"
963-
"Available filters are '{}'.".format(
964-
"' '".join(sorted(available_filters.keys()))
965-
),
966-
msg_type="error",
967-
)
968-
sys.exit(1)
969-
try:
970-
response = get_workflow_logs(
971-
workflow,
972-
access_token,
973-
steps=None if not steps else list(set(steps)),
974-
page=page,
975-
size=size,
976-
)
977-
workflow_logs = json.loads(response["logs"])
978-
if filters:
979-
for key, value in chosen_filters.items():
980-
unwanted_steps = [
981-
k
982-
for k, v in workflow_logs["job_logs"].items()
983-
if v[available_filters[key]] != value
984-
]
985-
for job_id in unwanted_steps:
986-
del workflow_logs["job_logs"][job_id]
987-
988-
if json_format:
989-
display_message(json.dumps(workflow_logs, indent=2))
990-
sys.exit(0)
991-
else:
992-
from reana_client.cli.utils import output_user_friendly_logs
993-
994-
output_user_friendly_logs(
995-
workflow_logs, None if not steps else list(set(steps))
996-
)
982+
chosen_filters[key] = value
997983
except Exception as e:
998984
logging.debug(traceback.format_exc())
999985
logging.debug(str(e))
1000986
display_message(
1001-
"Cannot retrieve the logs of a workflow {}: \n"
1002-
"{}".format(workflow, str(e)),
987+
"Please provide complete --filter name=value pairs, "
988+
"for example --filter status=running.\n"
989+
"Available filters are '{}'.".format(
990+
"' '".join(sorted(available_filters.keys()))
991+
),
1003992
msg_type="error",
1004993
)
1005994
sys.exit(1)
1006995

996+
try:
997+
if follow:
998+
follow_workflow_logs(workflow, access_token, interval, steps)
999+
else:
1000+
retrieve_workflow_logs(
1001+
workflow,
1002+
access_token,
1003+
json_format,
1004+
filters,
1005+
steps,
1006+
chosen_filters,
1007+
available_filters,
1008+
page,
1009+
size,
1010+
)
1011+
except Exception as e:
1012+
logging.debug(traceback.format_exc())
1013+
logging.debug(str(e))
1014+
display_message(
1015+
"Cannot retrieve logs for workflow {}: \n{}".format(workflow, str(e)),
1016+
msg_type="error",
1017+
)
1018+
sys.exit(1)
1019+
10071020

10081021
@workflow_execution_group.command("validate")
10091022
@click.option(

reana_client/config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,3 +77,9 @@
7777

7878
STD_OUTPUT_CHAR = "-"
7979
"""Character used to refer to the standard output."""
80+
81+
CLI_LOGS_FOLLOW_MIN_INTERVAL = 1
82+
"""Minimum interval between log requests in seconds."""
83+
84+
CLI_LOGS_FOLLOW_DEFAULT_INTERVAL = 10
85+
"""Default interval between log requests in seconds."""

0 commit comments

Comments
 (0)