Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,21 @@ By default pyinfra only prints high level information (this host connected, this
+ `-vv`: as above plus print shell input to the remote host
+ `-vvv` as above plus print shell output from the remote host

### Retry Options

pyinfra supports automatic retry of failed operations via CLI options:

+ `--retry N`: Retry failed operations up to N times (default: 0)
+ `--retry-delay N`: Wait N seconds between retry attempts (default: 5)

```sh
# Retry failed operations up to 3 times with default 5 second delay
pyinfra inventory.py deploy.py --retry 3

# Retry with custom delay
pyinfra inventory.py deploy.py --retry 2 --retry-delay 10
```


## Inventory

Expand Down
29 changes: 29 additions & 0 deletions docs/faq.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,3 +49,32 @@ Use the LINK ``files.file``, ``files.directory`` or ``files.link`` operations to
group="pyinfra",
mode=644,
)

How do I handle unreliable operations or network issues?
--------------------------------------------------------

Use the `retry behavior arguments <arguments.html#retry-behavior>`_ to automatically retry failed operations. This is especially useful for network operations or services that may be temporarily unavailable:

.. code:: python

# Retry a network operation up to 3 times
server.shell(
name="Download file with retries",
commands=["wget https://example.com/file.zip"],
_retries=3,
_retry_delay=5, # wait 5 seconds between retries
)

# Use custom retry logic for specific error conditions
def should_retry_download(output_data):
# Retry only on temporary network errors, not permanent failures
stderr_text = " ".join(output_data["stderr_lines"]).lower()
temporary_errors = ["timeout", "connection refused", "temporary failure"]
return any(error in stderr_text for error in temporary_errors)

server.shell(
name="Download with smart retry logic",
commands=["wget https://example.com/large-file.zip"],
_retries=3,
_retry_until=should_retry_download,
)
39 changes: 39 additions & 0 deletions docs/using-operations.rst
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,45 @@ Global arguments are covered in detail here: :doc:`arguments`. There is a set of
_sudo_user="pyinfra",
)

Retry Functionality
-------------------

Operations can be configured to retry automatically on failure using retry arguments:

.. code:: python

from pyinfra.operations import server

# Retry a flaky command up to 3 times with default 5 second delay
server.shell(
name="Download file with retries",
commands=["curl -o /tmp/file.tar.gz https://example.com/file.tar.gz"],
_retries=3,
)

# Retry with custom delay between attempts
server.shell(
name="Check service status with retries",
commands=["systemctl is-active myservice"],
_retries=2,
_retry_delay=10, # 10 second delay between retries
)

# Use custom retry condition to control when to retry
def retry_on_network_error(output_data):
# Retry if stderr contains network-related errors
for line in output_data["stderr_lines"]:
if any(keyword in line.lower() for keyword in ["network", "timeout", "connection"]):
return True
return False

server.shell(
name="Network operation with conditional retry",
commands=["wget https://example.com/large-file.zip"],
_retries=5,
_retry_until=retry_on_network_error,
)


The ``host`` Object
-------------------
Expand Down
61 changes: 61 additions & 0 deletions pyinfra/api/arguments.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ class ConnectorArguments(TypedDict, total=False):
_get_pty: bool
_stdin: Union[str, Iterable[str]]

# Retry arguments
_retries: int
_retry_delay: Union[int, float]
_retry_until: Optional[Callable[[dict], bool]]


def generate_env(config: "Config", value: dict) -> dict:
env = config.ENV.copy()
Expand Down Expand Up @@ -232,11 +237,28 @@ def all_global_arguments() -> List[tuple[str, Type]]:
return list(get_type_hints(AllArguments).items())


# Create a dictionary for retry arguments
retry_argument_meta: dict[str, ArgumentMeta] = {
"_retries": ArgumentMeta(
"Number of times to retry failed operations.",
default=lambda config: config.RETRY,
),
"_retry_delay": ArgumentMeta(
"Delay in seconds between retry attempts.",
default=lambda config: config.RETRY_DELAY,
),
"_retry_until": ArgumentMeta(
"Callable taking output data that returns True to continue retrying.",
default=lambda config: None,
),
}

all_argument_meta: dict[str, ArgumentMeta] = {
**auth_argument_meta,
**shell_argument_meta,
**meta_argument_meta,
**execution_argument_meta,
**retry_argument_meta, # Add retry arguments
}

EXECUTION_KWARG_KEYS = list(ExecutionArguments.__annotations__.keys())
Expand Down Expand Up @@ -286,6 +308,45 @@ def all_global_arguments() -> List[tuple[str, Type]]:
),
"Operation meta & callbacks": (meta_argument_meta, "", ""),
"Execution strategy": (execution_argument_meta, "", ""),
"Retry behavior": (
retry_argument_meta,
"""
Retry arguments allow you to automatically retry operations that fail. You can specify
how many times to retry, the delay between retries, and optionally a condition
function to determine when to stop retrying.
""",
"""
.. code:: python

# Retry a command up to 3 times with the default 5 second delay
server.shell(
name="Run flaky command with retries",
commands=["flaky_command"],
_retries=3,
)
# Retry with a custom delay
server.shell(
name="Run flaky command with custom delay",
commands=["flaky_command"],
_retries=2,
_retry_delay=10, # 10 second delay between retries
)
# Retry with a custom condition
def retry_on_specific_error(output_data):
# Retry if stderr contains "temporary failure"
for line in output_data["stderr_lines"]:
if "temporary failure" in line.lower():
return True
return False

server.shell(
name="Run command with conditional retry",
commands=["flaky_command"],
_retries=5,
_retry_until=retry_on_specific_error,
)
""",
),
}


Expand Down
6 changes: 6 additions & 0 deletions pyinfra/api/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ class ConfigDefaults:
IGNORE_ERRORS: bool = False
# Shell to use to execute commands
SHELL: str = "sh"
# Whether to display full diffs for files
DIFF: bool = False
# Number of times to retry failed operations
RETRY: int = 0
# Delay in seconds between retry attempts
RETRY_DELAY: int = 5


config_defaults = {key: value for key, value in ConfigDefaults.__dict__.items() if key.isupper()}
Expand Down
21 changes: 19 additions & 2 deletions pyinfra/api/connect.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,22 @@ def connect_all(state: "State"):


def disconnect_all(state: "State"):
for host in state.activated_hosts: # only hosts we connected to please!
host.disconnect() # normally a noop
"""
Disconnect from all of the configured servers in parallel. Reads/writes state.inventory.

Args:
state (``pyinfra.api.State`` obj): the state containing an inventory to connect to
"""
greenlet_to_host = {
state.pool.spawn(host.disconnect): host
for host in state.activated_hosts # only hosts we connected to please!
}

with progress_spinner(greenlet_to_host.values()) as progress:
for greenlet in gevent.iwait(greenlet_to_host.keys()):
host = greenlet_to_host[greenlet]
progress(host)

for greenlet, host in greenlet_to_host.items():
# Raise any unexpected exception
greenlet.get()
4 changes: 4 additions & 0 deletions pyinfra/api/host.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,10 @@ def _get_temp_directory(self):

return temp_directory

def get_temp_dir_config(self):

return self.state.config.TEMP_DIR or self.state.config.DEFAULT_TEMP_DIR

def get_temp_filename(
self,
hash_key: Optional[str] = None,
Expand Down
55 changes: 54 additions & 1 deletion pyinfra/api/operation.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ class OperationMeta:
_commands: Optional[list[Any]] = None
_maybe_is_change: Optional[bool] = None
_success: Optional[bool] = None
_retry_attempts: int = 0
_max_retries: int = 0
_retry_succeeded: Optional[bool] = None

def __init__(self, hash, is_change: Optional[bool]):
self._hash = hash
Expand All @@ -59,9 +62,17 @@ def __repr__(self) -> str:
"""

if self._commands is not None:
retry_info = ""
if self._retry_attempts > 0:
retry_result = "succeeded" if self._retry_succeeded else "failed"
retry_info = (
f", retries={self._retry_attempts}/{self._max_retries} ({retry_result})"
)

return (
"OperationMeta(executed=True, "
f"success={self.did_succeed()}, hash={self._hash}, commands={len(self._commands)})"
f"success={self.did_succeed()}, hash={self._hash}, "
f"commands={len(self._commands)}{retry_info})"
)
return (
"OperationMeta(executed=False, "
Expand All @@ -74,12 +85,20 @@ def set_complete(
success: bool,
commands: list[Any],
combined_output: "CommandOutput",
retry_attempts: int = 0,
max_retries: int = 0,
) -> None:
if self.is_complete():
raise RuntimeError("Cannot complete an already complete operation")
self._success = success
self._commands = commands
self._combined_output = combined_output
self._retry_attempts = retry_attempts
self._max_retries = max_retries

# Determine if operation succeeded after retries
if retry_attempts > 0:
self._retry_succeeded = success

def is_complete(self) -> bool:
return self._success is not None
Expand Down Expand Up @@ -150,6 +169,40 @@ def stdout(self) -> str:
def stderr(self) -> str:
return "\n".join(self.stderr_lines)

@property
def retry_attempts(self) -> int:
return self._retry_attempts

@property
def max_retries(self) -> int:
return self._max_retries

@property
def was_retried(self) -> bool:
"""
Returns whether this operation was retried at least once.
"""
return self._retry_attempts > 0

@property
def retry_succeeded(self) -> Optional[bool]:
"""
Returns whether this operation succeeded after retries.
Returns None if the operation was not retried.
"""
return self._retry_succeeded

def get_retry_info(self) -> dict[str, Any]:
"""
Returns a dictionary with all retry-related information.
"""
return {
"retry_attempts": self._retry_attempts,
"max_retries": self._max_retries,
"was_retried": self.was_retried,
"retry_succeeded": self._retry_succeeded,
}


def add_op(state: State, op_func, *args, **kwargs):
"""
Expand Down
Loading