Skip to content

Commit 730d94d

Browse files
authored
Merge pull request #449 from botify-labs/some-spring-cleanups
Some spring cleanups
2 parents 9141aec + fd47983 commit 730d94d

File tree

15 files changed

+301
-161
lines changed

15 files changed

+301
-161
lines changed

.pre-commit-config.yaml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@ repos:
1414
- --maxkb=1024
1515
- repo: https://github.com/astral-sh/uv-pre-commit
1616
# uv version.
17-
rev: 0.5.24
17+
rev: 0.6.14
1818
hooks:
1919
# Keep uv.lock up to date.
2020
- id: uv-lock
2121

2222
- repo: https://github.com/astral-sh/ruff-pre-commit
2323
# Ruff version.
24-
rev: v0.9.3
24+
rev: v0.11.5
2525
hooks:
2626
# Run the linter.
2727
- id: ruff

examples/basic.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
def increment(x):
1111
# Here's how you can access the raw context of the activity task if you need
1212
# it. It gives you access to the response of the PollForActivityTask call to
13-
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax # NOQA
13+
# the SWF API. See docs for more info: http://docs.aws.amazon.com/amazonswf/latest/apireference/API_PollForActivityTask.html#API_PollForActivityTask_ResponseSyntax
1414
logger.warning(f"activity context: {increment.context}")
1515
return x + 1
1616

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,11 @@ extend-select = [
6767
"W", # pycodestyle warnings
6868
"RUF", # ruff
6969
]
70-
fixable = ["ALL"]
7170
allowed-confusables = ["", ""]
7271
extend-ignore = [
7372
]
73+
ignore = ["E501"]
74+
7475
[tool.ruff.lint.isort]
7576
required-imports = ["from __future__ import annotations"]
7677

simpleflow/command.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,9 @@ def terminate_workflow(
236236
run_id: str | None,
237237
):
238238
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
239+
if not ex:
240+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
241+
sys.exit(1)
239242
ex.terminate()
240243

241244

@@ -251,6 +254,9 @@ def terminate_workflow(
251254
)
252255
def restart_workflow(domain: str, workflow_id: str, run_id: str | None):
253256
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
257+
if not ex:
258+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
259+
sys.exit(1)
254260
history = ex.history()
255261
ex.terminate(reason="workflow.restart")
256262
new_ex = ex.workflow_type.start_execution(
@@ -315,6 +321,7 @@ def profile(ctx, domain, workflow_id, run_id, nb_tasks):
315321
)
316322

317323

324+
# FIXME superseded by history
318325
@click.option(
319326
"--nb-tasks",
320327
"-n",
@@ -347,6 +354,7 @@ def workflow_tasks(
347354
)
348355

349356

357+
# FIXME superseded by filter
350358
@click.argument(
351359
"domain",
352360
envvar="SWF_DOMAIN",
@@ -373,16 +381,16 @@ def list_workflows(ctx, domain: str, status: str, started_since: int):
373381
_NOTSET = object()
374382

375383

376-
@click.argument(
377-
"domain",
378-
envvar="SWF_DOMAIN",
379-
)
380384
@cli.command(
381385
"workflow.history",
382386
help="Workflow history from workflow WORKFLOW_ID [RUN_ID].",
383387
)
384388
@click.argument("workflow_id")
385389
@click.argument("run_id", type=RUN_ID, required=False)
390+
@click.option(
391+
"--domain",
392+
envvar="SWF_DOMAIN",
393+
)
386394
@click.option(
387395
"--output-format",
388396
"--of",
@@ -402,12 +410,15 @@ def workflow_history(
402410
output_format: str,
403411
reverse_order: bool = False,
404412
) -> None:
405-
from simpleflow.swf.mapper.models.history.base import History as BaseHistory
406-
407413
if ctx.parent.params["format"] != "json" or not ctx.parent.params["header"]:
408414
raise NotImplementedError("Only pretty JSON mode is implemented")
409415

416+
from simpleflow.swf.mapper.models.history.base import History as BaseHistory
417+
410418
ex = helpers.get_workflow_execution(domain, workflow_id, run_id)
419+
if not ex:
420+
print(f"Execution {workflow_id} {run_id} not found" if run_id else f"Workflow {workflow_id} not found")
421+
sys.exit(1)
411422
events = ex.history_events(
412423
callback=get_progression_callback("events"),
413424
reverse_order=reverse_order,
@@ -432,11 +443,15 @@ def workflow_history(
432443
elif output_format == "cooked":
433444
history.parse()
434445
events = {
446+
"workflow": history.workflow,
435447
"activities": history.activities,
436448
"child_workflows": history.child_workflows,
437449
"markers": history.markers,
438-
"signals": history.signals,
439450
"timers": history.timers,
451+
"signals": history.signals,
452+
"signal_lists": history.signal_lists,
453+
"external_workflows_signaling": history.external_workflows_signaling,
454+
"signaled_workflows": history.signaled_workflows,
440455
}
441456
else:
442457
raise NotImplementedError
@@ -840,6 +855,11 @@ def standalone(
840855
ex.workflow_id,
841856
ex.run_id,
842857
)
858+
if not ex:
859+
print(
860+
f"Execution {workflow_id} {ex.run_id} not found" if ex.run_id else f"Workflow {workflow_id} not found"
861+
)
862+
sys.exit(1)
843863
if display_status:
844864
print(f"status: {ex.status}", file=sys.stderr)
845865
if ex.status == ex.STATUS_CLOSED:

simpleflow/history.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,11 +43,16 @@ def __init__(self, history: simpleflow.swf.mapper.models.history.History) -> Non
4343
self.started_decision_id: int | None = None
4444
self.completed_decision_id: int | None = None
4545
self.last_event_id: int | None = None
46+
self._workflow: dict[str, Any] = {}
4647

4748
@property
4849
def swf_history(self) -> simpleflow.swf.mapper.models.history.History:
4950
return self._history
5051

52+
@property
53+
def workflow(self):
54+
return self._workflow
55+
5156
@property
5257
def activities(self) -> dict[str, ActivityTaskEventDict]:
5358
"""
@@ -432,6 +437,118 @@ def parse_workflow_event(self, events: list[Event], event: WorkflowExecutionEven
432437
"""
433438
Parse a workflow event.
434439
"""
440+
if event.state == "started":
441+
self._workflow.update(
442+
{
443+
"state": event.state,
444+
f"{event.state}_id": event.id,
445+
f"{event.state}_timestamp": event.timestamp,
446+
"child_policy": getattr(event, "child_policy", None),
447+
"task_list": event.task_list["name"],
448+
"workflow_type": event.workflow_type,
449+
"continued_execution_run_id": getattr(event, "continued_execution_run_id", None),
450+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
451+
"input": getattr(event, "input", None),
452+
"lambda_role": getattr(event, "lambda_role", None),
453+
"parent_initiated_event_id": getattr(event, "parent_initiated_event_id", None),
454+
"parent_workflow_execution": getattr(event, "parent_workflow_execution", None),
455+
"tag_list": getattr(event, "tag_list", None),
456+
"task_priority": getattr(event, "task_priority", None),
457+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
458+
}
459+
)
460+
elif event.state == "continued_as_new":
461+
self._workflow.update(
462+
{
463+
"state": event.state,
464+
f"{event.state}_id": event.id,
465+
f"{event.state}_timestamp": event.timestamp,
466+
f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
467+
"new_execution_run_id": event.new_execution_run_id,
468+
"task_list": event.task_list["name"],
469+
"workflow_type": event.workflow_type,
470+
"execution_start_to_close_timeout": getattr(event, "execution_start_to_close_timeout", None),
471+
"input": getattr(event, "input", None),
472+
"lambda_role": getattr(event, "lambda_role", None),
473+
"tag_list": getattr(event, "tag_list", None),
474+
"task_priority": getattr(event, "task_priority", None),
475+
"task_start_to_close_timeout": getattr(event, "task_start_to_close_timeout", None),
476+
}
477+
)
478+
elif event.state == "completed":
479+
self._workflow.update(
480+
{
481+
"state": event.state,
482+
f"{event.state}_id": event.id,
483+
f"{event.state}_timestamp": event.timestamp,
484+
"initiated_event_id": getattr(event, "initiated_event_id", None),
485+
"result": getattr(event, "result", None),
486+
}
487+
)
488+
elif event.state == "cancelled":
489+
self._workflow.update(
490+
{
491+
"state": event.state,
492+
f"{event.state}_id": event.id,
493+
f"{event.state}_timestamp": event.timestamp,
494+
"initiated_event_id": getattr(event, "initiated_event_id", None),
495+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
496+
"details": getattr(event, "details", None),
497+
}
498+
)
499+
elif event.state == "failed":
500+
self._workflow.update(
501+
{
502+
"state": event.state,
503+
f"{event.state}_id": event.id,
504+
f"{event.state}_timestamp": event.timestamp,
505+
"initiated_event_id": getattr(event, "initiated_event_id", None),
506+
"decision_task_completed_event_id": event.decision_task_completed_event_id,
507+
"reason": getattr(event, "reason", None),
508+
"details": getattr(event, "details", None),
509+
}
510+
)
511+
elif event.state == "terminated":
512+
self._workflow.update(
513+
{
514+
"state": event.state,
515+
f"{event.state}_id": event.id,
516+
f"{event.state}_timestamp": event.timestamp,
517+
"initiated_event_id": getattr(event, "initiated_event_id", None),
518+
"cause": getattr(event, "cause", None),
519+
"details": getattr(event, "details", None),
520+
}
521+
)
522+
elif event.state == "timed_out":
523+
self._workflow.update(
524+
{
525+
"state": event.state,
526+
f"{event.state}_id": event.id,
527+
f"{event.state}_timestamp": event.timestamp,
528+
"initiated_event_id": getattr(event, "initiated_event_id", None),
529+
"timeout_type": event.timeout_type,
530+
}
531+
)
532+
# elif event.state in (
533+
# "cancel_failed",
534+
# "complete_failed",
535+
# "continue_as_new",
536+
# "fail_failed",
537+
# "start_child_failed",
538+
# "start_failed",
539+
# "terminate_failed",
540+
# ):
541+
# self._workflow.update(
542+
# {
543+
# "state": event.state,
544+
# f"{event.state}_id": event.id,
545+
# f"{event.state}_cause": getattr(event, "cause", None),
546+
# f"{event.state}_decision_task_completed_event_id": event.decision_task_completed_event_id,
547+
# }
548+
# )
549+
550+
if event.state == "cancel_requested":
551+
self._workflow.update()
435552
if event.state == "signaled":
436553
signal = {
437554
"type": "signal",

simpleflow/local/executor.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
from __future__ import annotations
22

3-
import collections
43
import sys
54
import traceback
65
import uuid
@@ -27,8 +26,8 @@ def __init__(self, workflow_class, **kwargs):
2726
super().__init__(workflow_class)
2827
self.update_workflow_class()
2928
self.nb_activities = 0
30-
self.signals_sent = set()
31-
self._markers = collections.OrderedDict()
29+
self.signals_sent: set[str] = set()
30+
self._markers: dict[str, list[Marker]] = {}
3231

3332
self.wf_run_id = []
3433
self.wf_id = []
@@ -206,7 +205,7 @@ def continue_as_new(self, workflow: type[Workflow], *args, **kwargs):
206205
self.update_workflow_class()
207206
self.nb_activities = 0
208207
self.signals_sent = set()
209-
self._markers = collections.OrderedDict()
208+
self._markers = {}
210209

211210
self.wf_run_id = []
212211
self.wf_id = []

0 commit comments

Comments
 (0)