Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions flytekit/bin/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -828,6 +828,9 @@ def map_execute_task_cmd(
prev_checkpoint,
checkpoint_path,
):
logger.info("Registering faulthandler for SIGUSR1 for map tasks")
faulthandler.register(signal.SIGUSR1)

logger.info(get_version_message())

raw_output_data_prefix, checkpoint_path, prev_checkpoint = normalize_inputs(
Expand Down
6 changes: 3 additions & 3 deletions flytekit/models/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,7 +459,7 @@ def __init__(self, id, spec, closure):
self._closure = closure

@property
def id(self):
def id(self) -> _identifier.WorkflowExecutionIdentifier:
"""
:rtype: flytekit.models.core.identifier.WorkflowExecutionIdentifier
"""
Expand Down Expand Up @@ -532,7 +532,7 @@ def __init__(
phase: int,
started_at: datetime.datetime,
duration: datetime.timedelta,
error: typing.Optional[flytekit.models.core.execution.ExecutionError] = None,
error: typing.Optional[_core_execution.ExecutionError] = None,
outputs: typing.Optional[LiteralMapBlob] = None,
abort_metadata: typing.Optional[AbortMetadata] = None,
created_at: typing.Optional[datetime.datetime] = None,
Expand All @@ -556,7 +556,7 @@ def __init__(
self._updated_at = updated_at

@property
def error(self) -> flytekit.models.core.execution.ExecutionError:
def error(self) -> _core_execution.ExecutionError:
return self._error

@property
Expand Down
24 changes: 24 additions & 0 deletions flytekit/remote/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -2474,6 +2474,10 @@ def wait(
timedelta or a duration in seconds as int.
:param sync_nodes: passed along to the sync call for the workflow execution
"""
logger.debug(
f"Beginning wait for {execution.id} with {timeout=}, {poll_interval=}, and {sync_nodes=}."
)

if poll_interval is not None and not isinstance(poll_interval, timedelta):
poll_interval = timedelta(seconds=poll_interval)
poll_interval = poll_interval or timedelta(seconds=30)
Expand All @@ -2482,11 +2486,31 @@ def wait(
timeout = timedelta(seconds=timeout)
time_to_give_up = datetime.max if timeout is None else datetime.now() + timeout

poll_count = 0
while datetime.now() < time_to_give_up:
if poll_count % 10 == 0:
logger.debug(f"Waiting for execution {execution.id} to complete.")
logger.debug(f"Current phase: {execution.closure.phase}, {execution.closure.updated_at=}")

execution = self.sync_execution(execution, sync_nodes=sync_nodes)
if execution.is_done:
return execution
time.sleep(poll_interval.total_seconds())
poll_count += 1

if datetime.now() > time_to_give_up:
logger.info("Wait timeout exceeded. Syncing execution one final time.")
refetched_exec = self.fetch_execution(
project=execution.id.project,
domain=execution.id.domain,
name=execution.id.name)
if refetched_exec.is_done:
logger.info("Re-sync'ed execution found to be complete!")
if sync_nodes:
self.sync_execution(refetched_exec, sync_nodes=True)
return refetched_exec
else:
logger.debug(f"Execution {execution.id} not complete after timeout, phase is {refetched_exec.closure.phase}")

raise user_exceptions.FlyteTimeout(f"Execution {self} did not complete before timeout.")

Expand Down
Loading