-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathtest_docker.py
More file actions
198 lines (165 loc) · 7.86 KB
/
test_docker.py
File metadata and controls
198 lines (165 loc) · 7.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
# Copyright 2026 Axis Communications AB.
#
# For a full list of individual contributors, please see the commit history.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for the Docker library retry logic."""
import asyncio
import logging
import sys
from unittest.mock import AsyncMock, MagicMock, patch
import aiohttp
import pytest
from etos_api.library.docker import Docker
logging.basicConfig(level=logging.DEBUG, stream=sys.stdout)
def _make_connector_error():
"""Create a ClientConnectorError using a mock ConnectionKey."""
connection_key = MagicMock()
connection_key.host = "ghcr.io"
connection_key.port = 443
connection_key.is_ssl = True
connection_key.ssl = True
return aiohttp.ClientConnectorError(connection_key, OSError("DNS resolution failed"))
class TestDockerDigestRetry:
"""Test retry logic for Docker.digest."""
logger = logging.getLogger(__name__)
pytestmark = pytest.mark.asyncio
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_returns_on_first_success(self, mock_get_digest):
"""Test that digest returns immediately when first attempt succeeds.
Approval criteria:
- Digest shall be returned after a single successful attempt.
Test steps::
1. Call digest with a working image name.
2. Verify _get_digest was called once and the digest is returned.
"""
mock_get_digest.return_value = "sha256:abc123"
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 1
@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_connector_error(self, mock_get_digest):
"""Test that digest retries on ClientConnectorError.
Approval criteria:
- Digest shall retry on transient connection errors and succeed.
Test steps::
1. Configure _get_digest to fail once with ClientConnectorError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
_make_connector_error(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2
@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_server_disconnected(self, mock_get_digest):
"""Test that digest retries on ServerDisconnectedError.
Approval criteria:
- Digest shall retry when the server disconnects unexpectedly.
Test steps::
1. Configure _get_digest to fail once with ServerDisconnectedError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
aiohttp.ServerDisconnectedError(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2
@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch("etos_api.library.docker.MAX_RETRIES", 3)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_returns_none_after_all_retries_exhausted(self, mock_get_digest):
"""Test that digest returns None when all retries are exhausted.
Approval criteria:
- Digest shall return None if every attempt fails with a connection error.
Test steps::
1. Configure _get_digest to always raise ClientConnectorError.
2. Call digest.
3. Verify None is returned and all retries were attempted.
"""
error = _make_connector_error()
mock_get_digest.side_effect = [error, error, error]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result is None
assert mock_get_digest.call_count == 3
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_does_not_retry_on_non_retryable_error(self, mock_get_digest):
"""Test that digest does not retry on non-retryable exceptions.
Approval criteria:
- Non-retryable exceptions shall propagate immediately without retries.
Test steps::
1. Configure _get_digest to raise a RuntimeError.
2. Call digest.
3. Verify the exception propagates and no retry occurred.
"""
mock_get_digest.side_effect = RuntimeError("unexpected")
docker = Docker()
with pytest.raises(RuntimeError, match="unexpected"):
await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert mock_get_digest.call_count == 1
@patch("etos_api.library.docker.BACKOFF_FACTOR", 0)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_retries_on_timeout_error(self, mock_get_digest):
"""Test that digest retries on asyncio.TimeoutError.
Approval criteria:
- Digest shall retry on timeout errors and succeed if a subsequent attempt works.
Test steps::
1. Configure _get_digest to fail once with TimeoutError then succeed.
2. Call digest.
3. Verify retry happened and correct digest is returned.
"""
mock_get_digest.side_effect = [
asyncio.TimeoutError(),
"sha256:abc123",
]
docker = Docker()
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result == "sha256:abc123"
assert mock_get_digest.call_count == 2
@patch("etos_api.library.docker.BACKOFF_FACTOR", 1)
@patch("etos_api.library.docker.MAX_RETRIES", 4)
@patch.object(Docker, "_get_digest", new_callable=AsyncMock)
async def test_digest_uses_exponential_backoff(self, mock_get_digest):
"""Test that retry delays follow exponential backoff.
Approval criteria:
- Delays between retries shall follow BACKOFF_FACTOR * 2^(attempt-1).
Test steps::
1. Configure _get_digest to always fail with ServerDisconnectedError.
2. Patch asyncio.sleep to record the delay values.
3. Call digest.
4. Verify the recorded delays match 1, 2, 4 (for attempts 1, 2, 3).
"""
mock_get_digest.side_effect = aiohttp.ServerDisconnectedError()
recorded_delays = []
async def fake_sleep(delay):
recorded_delays.append(delay)
docker = Docker()
with patch("etos_api.library.docker.asyncio.sleep", side_effect=fake_sleep):
result = await docker.digest("ghcr.io/eiffel-community/etos-test-runner:latest")
assert result is None
assert mock_get_digest.call_count == 4
# Delays: BACKOFF_FACTOR * 2^0, BACKOFF_FACTOR * 2^1, BACKOFF_FACTOR * 2^2
# No sleep after the last attempt.
assert recorded_delays == [1, 2, 4]