Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
bc2de14
Refactor test recovery
LanderOtto Jul 4, 2025
343cea1
Propagated recoverable attribute
LanderOtto Jul 4, 2025
079a602
Propagated recoverable attribute
LanderOtto Jul 4, 2025
04b43da
Revert test recovery
LanderOtto Jul 4, 2025
1d1528a
Fixed recoverable in ListToken and ObjectToken
LanderOtto Jul 5, 2025
9de3123
Update
LanderOtto Jul 8, 2025
02ad8c2
Rebase
LanderOtto Jul 10, 2025
d3e4c99
Refactor
LanderOtto Jul 11, 2025
a355f9e
TMP
LanderOtto Jul 14, 2025
eebd105
Added test of `ObjectToken`
LanderOtto Jul 15, 2025
a679bb2
Fix test `ScheduleStep`
LanderOtto Jul 16, 2025
eb15957
test
LanderOtto Jul 26, 2025
b1ae508
Rebase
LanderOtto Aug 29, 2025
5593079
test
LanderOtto Aug 29, 2025
d26b22f
temp
LanderOtto Aug 29, 2025
b855449
temp
LanderOtto Aug 29, 2025
32751bb
refactor
LanderOtto Aug 30, 2025
3f83c2b
Fix rebase
LanderOtto Sep 13, 2025
8a69bcf
Implemented a different approach.
LanderOtto Sep 30, 2025
92f96a1
Fixes
LanderOtto Sep 30, 2025
ebdbf2b
Improvements
LanderOtto Sep 30, 2025
9ef6852
test
LanderOtto Oct 1, 2025
56876b3
Another test
LanderOtto Oct 2, 2025
32864a4
Replaced getter and setter with the property
LanderOtto Oct 3, 2025
2c3f11b
Improvements
LanderOtto Oct 3, 2025
96ef0cc
This commit improves the synchronization of multiple recovery workflows
LanderOtto Oct 7, 2025
15a8218
Added test on recovery of loops
LanderOtto Oct 9, 2025
3684c7c
Refactor test recovery
LanderOtto Jul 4, 2025
bed21c9
Revert test recovery
LanderOtto Jul 4, 2025
80b7a6f
Update
LanderOtto Jul 8, 2025
8176a81
TMP
LanderOtto Jul 14, 2025
28bc56c
Added test of `ObjectToken`
LanderOtto Jul 15, 2025
2fc9513
Fix test `ScheduleStep`
LanderOtto Jul 16, 2025
a1e23b1
test
LanderOtto Jul 26, 2025
4eca2c5
test
LanderOtto Aug 29, 2025
98248f8
temp
LanderOtto Aug 29, 2025
9a93be6
Fix rebase
LanderOtto Sep 13, 2025
7f85ef4
This commit improves the synchronization of multiple recovery workflows
LanderOtto Oct 7, 2025
962292d
Fix conflicts
LanderOtto Oct 9, 2025
a60fdd3
Added different types of terminations
LanderOtto Oct 10, 2025
0ddfe38
Refactor for maintainability
LanderOtto Oct 10, 2025
df6e545
Bugfixes
LanderOtto Nov 12, 2025
82ade6e
Fixed recover of multiple generic exceptions
LanderOtto Nov 12, 2025
d3059cb
Updated DirectGraph class
LanderOtto Nov 15, 2025
c7a0686
Added resume method
LanderOtto Nov 16, 2025
2a010f6
WIP
LanderOtto Nov 17, 2025
957ca04
tmp
LanderOtto Nov 17, 2025
6a1fd9f
changing resume
LanderOtto Nov 20, 2025
a9b9fae
Fixed ValueFrom
LanderOtto Nov 21, 2025
95f5229
Fix valuefrom
LanderOtto Nov 25, 2025
dc2fdc4
Merge branch 'master' into fix/ft-loop
LanderOtto Jan 3, 2026
da81cfb
FIX
LanderOtto Jan 9, 2026
2d1a958
FIX 2
LanderOtto Jan 12, 2026
3e3ffdf
Improved tests
LanderOtto Jan 14, 2026
a622e3d
Merge master + changes
LanderOtto Jan 30, 2026
ed04d62
Merge branch 'master' into fix/ft-loop
LanderOtto Jan 30, 2026
0a6b388
Lint
LanderOtto Jan 30, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion streamflow/core/recovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,14 +113,22 @@ async def recover(self, failed_job: Job, failed_step: Step) -> None: ...


class RetryRequest:
__slots__ = ("job_token", "lock", "output_tokens", "version", "workflow")
__slots__ = (
"job_token",
"lock",
"output_tokens",
"version",
"workflow",
"workflow_ready",
)

def __init__(self) -> None:
self.job_token: JobToken | None = None
self.lock: asyncio.Lock = asyncio.Lock()
self.output_tokens: MutableMapping[str, Token] = {}
self.version: int = 1
self.workflow: Workflow | None = None
self.workflow_ready: asyncio.Event = asyncio.Event()


class TokenAvailability(IntEnum):
Expand Down
3 changes: 3 additions & 0 deletions streamflow/core/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
PersistableEntity,
)
from streamflow.core.utils import get_class_from_name, get_class_fullname
from streamflow.log_handler import logger

if TYPE_CHECKING:
from typing import Any
Expand Down Expand Up @@ -329,6 +330,7 @@ class Status(IntEnum):
CANCELLED = 6
ROLLBACK = 7
RECOVERY = 8
RECOVERED = 9


class Step(PersistableEntity, ABC):
Expand All @@ -340,6 +342,7 @@ def __init__(self, name: str, workflow: Workflow):
self.status: Status = Status.WAITING
self.terminated: bool = False
self.workflow: Workflow = workflow
logger.info(f"STARTING Step {self.name}")

def _add_port(self, name: str, port: Port, type_: DependencyType) -> None:
if port.name not in self.workflow.ports:
Expand Down
15 changes: 15 additions & 0 deletions streamflow/cwl/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ async def main(
if getattr(args, "validate", False):
return
await workflow.save(context)

# import datetime
# import posixpath
#
# from streamflow.token_printer import dag_workflow
#
# dag_workflow(
# workflow,
# title=posixpath.join(
# os.getcwd(),
# "dev",
# str(datetime.datetime.now()).replace(" ", "_").replace(":", "."),
# "wf",
# ),
# )
if logger.isEnabledFor(logging.INFO):
logger.info("COMPLETED building of workflow execution plan")
executor = StreamFlowExecutor(workflow)
Expand Down
4 changes: 4 additions & 0 deletions streamflow/cwl/transformer.py
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ async def transform(
self._only_default = True
token = None
token = await self._get_next_token(token, inputs)
if not token.recoverable:
raise WorkflowDefinitionException(
"DEBUG: default token must be recoverable"
)
return {self.get_output_name(): token}


Expand Down
171 changes: 94 additions & 77 deletions streamflow/deployment/connector/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,12 +224,16 @@ async def copy_local_to_remote(
locations: MutableSequence[ExecutionLocation],
read_only: bool = False,
) -> None:
await super().copy_local_to_remote(
src=src,
dst=dst,
locations=await self._get_effective_locations(locations, dst),
read_only=read_only,
)
try:
await super().copy_local_to_remote(
src=src,
dst=dst,
locations=await self._get_effective_locations(locations, dst),
read_only=read_only,
)
except Exception as e:
# ClientConnectionResetError
raise WorkflowExecutionException(e)

async def copy_remote_to_remote(
self,
Expand All @@ -240,37 +244,42 @@ async def copy_remote_to_remote(
source_connector: Connector | None = None,
read_only: bool = False,
) -> None:
source_connector = source_connector or self
if locations := await copy_same_connector(
connector=self,
locations=await self._get_effective_locations(locations, dst),
source_location=source_location,
src=src,
dst=dst,
read_only=read_only,
):
await copy_remote_to_remote(
try:
source_connector = source_connector or self
if locations := await copy_same_connector(
connector=self,
locations=locations,
locations=await self._get_effective_locations(locations, dst),
source_location=source_location,
src=src,
dst=dst,
source_connector=source_connector,
source_location=source_location,
writer_command=[
"sh",
"-c",
" ".join(
await utils.get_remote_to_remote_write_command(
src_connector=source_connector,
src_location=source_location,
src=src,
dst_connector=self,
dst_locations=locations,
dst=dst,
)
),
],
)
read_only=read_only,
):
await copy_remote_to_remote(
connector=self,
locations=locations,
src=src,
dst=dst,
source_connector=source_connector,
source_location=source_location,
writer_command=[
"sh",
"-c",
" ".join(
await utils.get_remote_to_remote_write_command(
src_connector=source_connector,
src_location=source_location,
src=src,
dst_connector=self,
dst_locations=locations,
dst=dst,
)
),
],
)
except Exception as e:
# ClientConnectorError(ConnectionKey(host='130.192.100.107', port=6443, is_ssl=True, ssl=True, proxy=None, proxy_auth=None, proxy_headers_hash=None),
# ConnectionRefusedError(111, "Connect call failed ('130.192.100.107', 6443)")))
raise WorkflowExecutionException(e)

async def _get_container(
self, location: ExecutionLocation
Expand Down Expand Up @@ -454,51 +463,59 @@ async def run(
+ [utils.encode_command(command)]
)
pod, container = location.name.split(":")
# noinspection PyUnresolvedReferences
result = await asyncio.wait_for(
cast(
Awaitable,
self.client_ws.connect_get_namespaced_pod_exec(
name=pod,
namespace=self.namespace or "default",
container=container,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=not capture_output,
try:
# noinspection PyUnresolvedReferences
result = await asyncio.wait_for(
cast(
Awaitable,
self.client_ws.connect_get_namespaced_pod_exec(
name=pod,
namespace=self.namespace or "default",
container=container,
command=command,
stderr=True,
stdin=False,
stdout=True,
tty=False,
_preload_content=not capture_output,
),
),
),
timeout=timeout,
)
if capture_output:
with io.StringIO() as out_buffer, io.StringIO() as err_buffer:
async with result as response:
while not response.closed:
async for msg in response:
data = msg.data.decode("utf-8", "replace")
channel = ord(data[0])
data = data[1:]
if data and channel in [
ws_client.STDOUT_CHANNEL,
ws_client.STDERR_CHANNEL,
]:
out_buffer.write(data)
elif data and channel == ws_client.ERROR_CHANNEL:
err_buffer.write(data)
err = yaml.safe_load(err_buffer.getvalue())
if err["status"] == "Success":
return out_buffer.getvalue(), 0
else:
if "code" in err:
return err["message"], int(err["code"])
timeout=timeout,
)
if capture_output:
with io.StringIO() as out_buffer, io.StringIO() as err_buffer:
async with result as response:
while not response.closed:
async for msg in response:
data = msg.data.decode("utf-8", "replace")
channel = ord(data[0])
data = data[1:]
if data and channel in [
ws_client.STDOUT_CHANNEL,
ws_client.STDERR_CHANNEL,
]:
out_buffer.write(data)
elif data and channel == ws_client.ERROR_CHANNEL:
err_buffer.write(data)
err = yaml.safe_load(err_buffer.getvalue())
if err["status"] == "Success":
return out_buffer.getvalue(), 0
else:
return err["message"], int(
err["details"]["causes"][0]["message"]
)
else:
return None
if "code" in err:
return err["message"], int(err["code"])
else:
return err["message"], int(
err["details"]["causes"][0]["message"]
)
else:
return None
except TimeoutError as e:
raise WorkflowExecutionException(e)
except Exception as e:
# AttributeError("'ServerTimeoutError' object has no attribute 'decode'")
# ClientConnectorError(ConnectionKey(host='130.192.100.107', port=6443, is_ssl=True, ssl=True, proxy=None, proxy_auth=None, proxy_headers_hash=None),
# ConnectionRefusedError(111, "Connect call failed ('130.192.100.107', 6443)"))
raise WorkflowExecutionException(e)

async def undeploy(self, external: bool):
if self.client is not None:
Expand Down
2 changes: 1 addition & 1 deletion streamflow/log_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -126,5 +126,5 @@ def highlight(self, msg: str | Any) -> str:
)
defaultStreamHandler.setFormatter(formatter)
logger.addHandler(defaultStreamHandler)
logger.setLevel(logging.INFO)
logger.setLevel(logging.DEBUG)
logger.propagate = False
Loading
Loading