Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions python/src/etos_api/library/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
# limitations under the License.
"""Docker operations for the ETOS API."""

import asyncio
import logging
import time
from threading import Lock
Expand All @@ -27,6 +28,21 @@
REPO_DELIMITER = "/"
TAG_DELIMITER = ":"

# Retry configuration for transient connection errors.
# Uses exponential backoff: delay = BACKOFF_FACTOR * 2^(attempt-1)
# With BACKOFF_FACTOR=1 and MAX_RETRIES=5 the delays are 1s, 2s, 4s, 8s.
MAX_RETRIES = 5
BACKOFF_FACTOR = 1

# aiohttp exceptions that indicate a transient connection problem worth retrying.
_RETRYABLE_EXCEPTIONS = (
aiohttp.ClientConnectionResetError,
aiohttp.ClientConnectorError,
aiohttp.ServerDisconnectedError,
aiohttp.ServerTimeoutError,
asyncio.TimeoutError,
)


class Docker:
"""Docker handler for HTTP operations against docker registries.
Expand Down Expand Up @@ -219,6 +235,11 @@ def repository(self, repo: str) -> tuple[str, str]:
async def digest(self, name: str) -> Optional[str]:
"""Get a sha256 digest from an image in an image repository.

Retries on transient connection errors (e.g. DNS hiccups) up to
MAX_RETRIES times with exponential backoff between attempts.
The delay follows the same formula as urllib3 Retry used in
etos-library: ``BACKOFF_FACTOR * 2 ** (attempt - 1)``.

:param name: The name of the container image.
:return: The sha256 digest of the container image.
"""
Expand All @@ -227,6 +248,39 @@ async def digest(self, name: str) -> Optional[str]:
registry, repo = self.repository(base)
manifest_url = f"https://{registry}/v2/{repo}/manifests/{tag}"

last_exception = None
for attempt in range(1, MAX_RETRIES + 1):
try:
digest = await self._get_digest(manifest_url)
return digest
except _RETRYABLE_EXCEPTIONS as exception:
last_exception = exception
self.logger.warning(
"Transient error checking container image %r (attempt %d/%d): %s",
name,
attempt,
MAX_RETRIES,
exception,
)
if attempt < MAX_RETRIES:
delay = BACKOFF_FACTOR * (2 ** (attempt - 1))
self.logger.info("Retrying in %s seconds...", delay)
await asyncio.sleep(delay)

self.logger.error(
"All %d attempts to check container image %r failed: %s",
MAX_RETRIES,
name,
last_exception,
)
return None

async def _get_digest(self, manifest_url: str) -> Optional[str]:
"""Perform a single attempt to fetch the digest for a manifest URL.

:param manifest_url: Full URL to the image manifest.
:return: The sha256 digest, or None if the image was not found.
"""
digest = None
async with aiohttp.ClientSession() as session:
self.logger.info("Get digest from %r", manifest_url)
Expand Down
198 changes: 198 additions & 0 deletions python/tests/library/test_docker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
# Copyright 2026 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the Docker library retry logic."""

import asyncio
import logging
import sys
from unittest.mock import AsyncMock, MagicMock, patch

import aiohttp
import pytest

from etos_api.library.docker import Docker

logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)


def _make_connector_error():
"""Create a ClientConnectorError using a mock ConnectionKey."""
connection_key = MagicMock()
connection_key.host = "ghcr.io"
connection_key.port = 443
connection_key.is_ssl = True
connection_key.ssl = True
return aiohttp.ClientConnectorError(connection_key, OSError("DNS resolution failed"))


class TestDockerDigestRetry:
"""Test retry logic for Docker.digest."""

logger = logging.getLogger(__name__)
pytestmark = pytest.mark.asyncio

@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_returns_on_first_success(self, mock_get_digest):
"""Test that digest returns immediately when first attempt succeeds.

Approval criteria:
- Digest shall be returned after a single successful attempt.

Test steps::
1. Call digest with a working image name.
2. Verify _get_digest was called once and the digest is returned.
"""
mock_get_digest.return_value = "sha256:abc123"
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 1

@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_connector_error(self, mock_get_digest):
"""Test that digest retries on ClientConnectorError.

Approval criteria:
- Digest shall retry on transient connection errors and succeed.

Test steps::
1. Configure _get_digest to fail once with ClientConnectorError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
_make_connector_error(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2

@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_server_disconnected(self, mock_get_digest):
"""Test that digest retries on ServerDisconnectedError.

Approval criteria:
- Digest shall retry when the server disconnects unexpectedly.

Test steps::
1. Configure _get_digest to fail once with ServerDisconnectedError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
aiohttp.ServerDisconnectedError(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2

@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch("etos_api.library.docker.MAX_RETRIES", 3)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_returns_none_after_all_retries_exhausted(self, mock_get_digest):
"""Test that digest returns None when all retries are exhausted.

Approval criteria:
- Digest shall return None if every attempt fails with a connection error.

Test steps::
1. Configure _get_digest to always raise ClientConnectorError.
2. Call digest.
3. Verify None is returned and all retries were attempted.
"""
error = _make_connector_error()
mock_get_digest.side_effect = [error, error, error]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result is None
assert mock_get_digest.call_count == 3

@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_does_not_retry_on_non_retryable_error(self, mock_get_digest):
"""Test that digest does not retry on non-retryable exceptions.

Approval criteria:
- Non-retryable exceptions shall propagate immediately without retries.

Test steps::
1. Configure _get_digest to raise a RuntimeError.
2. Call digest.
3. Verify the exception propagates and no retry occurred.
"""
mock_get_digest.side_effect = RuntimeError("unexpected")
docker = Docker()
with pytest.raises(RuntimeError, match="unexpected"):
await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert mock_get_digest.call_count == 1

@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_timeout_error(self, mock_get_digest):
"""Test that digest retries on asyncio.TimeoutError.

Approval criteria:
- Digest shall retry on timeout errors and succeed if a subsequent attempt works.

Test steps::
1. Configure _get_digest to fail once with TimeoutError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
asyncio.TimeoutError(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2

@patch("etos_api.library.docker.BACKOFF_FACTOR", 1)
@patch("etos_api.library.docker.MAX_RETRIES", 4)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_uses_exponential_backoff(self, mock_get_digest):
"""Test that retry delays follow exponential backoff.

Approval criteria:
- Delays between retries shall follow BACKOFF_FACTOR * 2^(attempt-1).

Test steps::
1. Configure _get_digest to always fail with ServerDisconnectedError.
2. Patch asyncio.sleep to record the delay values.
3. Call digest.
4. Verify the recorded delays match 1, 2, 4 (for attempts 1, 2, 3).
"""
mock_get_digest.side_effect = aiohttp.ServerDisconnectedError()
recorded_delays = []

async def fake_sleep(delay):
recorded_delays.append(delay)

docker = Docker()
with patch("etos_api.library.docker.asyncio.sleep", side_effect=fake_sleep):
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")

assert result is None
assert mock_get_digest.call_count == 4
# Delays: BACKOFF_FACTOR * 2^0, BACKOFF_FACTOR * 2^1, BACKOFF_FACTOR * 2^2
# No sleep after the last attempt.
assert recorded_delays == [1, 2, 4]
Loading