Skip to content

Commit 6ffde5e

Browse files
p3rf Teamcopybara-github
authored andcommitted
Ignore timeouts that occur while sampling nodes, replicas.
This could result in gaps in the data, but that's almost always ok. PiperOrigin-RevId: 721487361
1 parent 9153567 commit 6ffde5e

File tree

3 files changed

+211
-27
lines changed

3 files changed

+211
-27
lines changed

perfkitbenchmarker/container_service.py

Lines changed: 39 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@
130130
),
131131
'read: connection reset by peer',
132132
'Unable to connect to the server: dial tcp',
133+
'Unable to connect to the server: net/http: TLS handshake timeout',
133134
]
134135

135136

@@ -161,6 +162,36 @@ def RunKubectlCommand(command: list[str], **kwargs) -> tuple[str, str, int]:
161162
# IssueCommand defaults stack_level to 1, so 2 skips this function.
162163
kwargs['stack_level'] = 2
163164
cmd = [FLAGS.kubectl, '--kubeconfig', FLAGS.kubeconfig] + command
165+
166+
orig_suppress_failure = None
167+
if 'suppress_failure' in kwargs:
168+
orig_suppress_failure = kwargs['suppress_failure']
169+
170+
def _DetectTimeoutViaSuppressFailure(stdout, stderr, retcode):
171+
# Check for kubectl timeout. If found, treat it the same as a regular
172+
# timeout.
173+
if retcode != 0:
174+
for error_substring in _RETRYABLE_KUBECTL_ERRORS:
175+
if error_substring in stderr:
176+
# Raise timeout error regardless of raise_on_failure - as the intended
177+
# semantics is to ignore expected errors caused by invoking the
178+
# command not errors from PKB infrastructure.
179+
raise_on_timeout = (
180+
kwargs['raise_on_timeout']
181+
if 'raise_on_timeout' in kwargs
182+
else True
183+
)
184+
if raise_on_timeout:
185+
raise errors.VmUtil.IssueCommandTimeoutError(stderr)
186+
# Else, if the user supplied a suppress_failure function, try that.
187+
if orig_suppress_failure is not None:
188+
return orig_suppress_failure(stdout, stderr, retcode)
189+
190+
# Else, no suppression.
191+
return False
192+
193+
kwargs['suppress_failure'] = _DetectTimeoutViaSuppressFailure
194+
164195
return vm_util.IssueCommand(cmd, **kwargs)
165196

166197

@@ -172,6 +203,12 @@ def RunRetryableKubectlCommand(
172203
run_cmd: list[str], timeout: int | None = None, **kwargs
173204
) -> tuple[str, str, int]:
174205
"""Runs a kubectl command, retrying somewhat exepected errors."""
206+
if 'raise_on_timeout' in kwargs and kwargs['raise_on_timeout']:
207+
raise ValueError(
208+
'RunRetryableKubectlCommand does not allow `raise_on_timeout=True`'
209+
' (since timeouts are retryable).'
210+
)
211+
175212
if 'stack_level' in kwargs:
176213
kwargs['stack_level'] += 1
177214
else:
@@ -180,24 +217,12 @@ def RunRetryableKubectlCommand(
180217

181218
@vm_util.Retry(
182219
timeout=timeout,
183-
retryable_exceptions=(RetryableKubectlError,),
220+
retryable_exceptions=(errors.VmUtil.IssueCommandTimeoutError,),
184221
)
185222
def _RunRetryablePart(run_cmd: list[str], **kwargs):
186223
"""Inner function retries command so timeout can be passed to decorator."""
187224
kwargs['stack_level'] += 1
188-
out, err, code = RunKubectlCommand(
189-
run_cmd, raise_on_failure=False, **kwargs
190-
)
191-
if err:
192-
logging.warning('Got error %s when running %s', err, run_cmd)
193-
for error_substring in _RETRYABLE_KUBECTL_ERRORS:
194-
if error_substring in err:
195-
raise RetryableKubectlError(
196-
f'Tried running {run_cmd} but it failed with the substring'
197-
f' {error_substring}. Retrying. Full error is: {err}'
198-
)
199-
raise errors.VmUtil.IssueCommandError(err)
200-
return out, err, code
225+
return RunKubectlCommand(run_cmd, raise_on_timeout=True, **kwargs)
201226

202227
return _RunRetryablePart(run_cmd, timeout=timeout, **kwargs)
203228

perfkitbenchmarker/linux_benchmarks/kubernetes_hpa_benchmark.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from perfkitbenchmarker import benchmark_spec as bm_spec
2525
from perfkitbenchmarker import configs
2626
from perfkitbenchmarker import container_service
27+
from perfkitbenchmarker import errors
2728
from perfkitbenchmarker.linux_packages import locust
2829
from perfkitbenchmarker.sample import Sample
2930

@@ -226,8 +227,20 @@ def _Observe(
226227
self,
227228
observe_fn: Callable[[], List[Sample]],
228229
) -> None:
230+
"""Call the specified function until self._stop is signalled.
231+
232+
Results are appended to self._samples. Timeouts are ignored.
233+
234+
Args:
235+
observe_fn: The function to call.
236+
"""
229237
while True:
230-
self._samples.extend(observe_fn())
238+
try:
239+
self._samples.extend(observe_fn())
240+
except errors.VmUtil.IssueCommandTimeoutError:
241+
# Ignore timeouts - there'll be a gap in the data, but that's ok.
242+
pass
243+
231244
if self._stop.wait(timeout=1.0):
232245
return
233246

tests/container_service_test.py

Lines changed: 158 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
1-
from typing import Iterable
1+
import time
2+
from typing import Callable, Iterable, Protocol, Tuple
23
import unittest
34
from unittest import mock
45
from absl.testing import flagsaver
56
from perfkitbenchmarker import container_service
67
from perfkitbenchmarker import errors
78
from perfkitbenchmarker import provider_info
9+
from perfkitbenchmarker import vm_util
810
from perfkitbenchmarker.configs import container_spec
911
from perfkitbenchmarker.sample import Sample
1012
from tests import pkb_common_test_case
@@ -25,6 +27,60 @@ def _Delete(self):
2527
pass
2628

2729

30+
kubectl_timeout_tuple = (
31+
'',
32+
(
33+
'Unable to connect to the server: dial tcp 10.42.42.42:443:'
34+
'connect: connection timed out'
35+
),
36+
1,
37+
)
38+
39+
40+
class _IssueCommandCallable(Protocol):
41+
42+
def __call__(
43+
self,
44+
cmd: Iterable[str],
45+
suppress_failure: Callable[[str, str, int], bool] | None = None,
46+
**kwargs,
47+
) -> Tuple[str, str, int]:
48+
...
49+
50+
51+
def _MockedIssueCommandSuppressing(
52+
stderr: str,
53+
) -> _IssueCommandCallable:
54+
def _MockedCommand(
55+
cmd: Iterable[str],
56+
suppress_failure: Callable[[str, str, int], bool] | None = None,
57+
**kwargs,
58+
):
59+
_ = cmd
60+
_ = kwargs
61+
stdout = ''
62+
status = 1
63+
if suppress_failure and suppress_failure(stdout, stderr, status):
64+
return stdout, '', 0
65+
return stdout, stderr, status
66+
67+
return _MockedCommand
68+
69+
70+
def _MockedIssueCommandFailure(
71+
cmd: Iterable[str],
72+
suppress_failure: Callable[[str, str, int], bool] | None = None,
73+
**kwargs,
74+
) -> Tuple[str, str, int]:
75+
return _MockedIssueCommandSuppressing(
76+
stderr='A failure occurred',
77+
)(
78+
cmd,
79+
suppress_failure=suppress_failure,
80+
**kwargs,
81+
)
82+
83+
2884
class ContainerServiceTest(pkb_common_test_case.PkbCommonTestCase):
2985

3086
def setUp(self):
@@ -62,27 +118,60 @@ def test_apply_manifest_gets_deployment_name(self):
62118
)
63119
self.assertEqual(next(deploy_ids), 'deployment.apps/test-deployment')
64120

65-
def test_retriable_kubectl_command_fails_on_random_error(self):
66-
self.MockIssueCommand(
67-
{'get podpatchwork': [('', 'error: invalid syntax', 1)]}
68-
)
121+
@mock.patch.object(
122+
vm_util,
123+
'IssueCommand',
124+
side_effect=[errors.VmUtil.IssueCommandError()],
125+
autospec=True,
126+
)
127+
def test_retriable_kubectl_command_fails_on_random_error(self, _):
69128
with self.assertRaises(errors.VmUtil.IssueCommandError):
70129
container_service.RunRetryableKubectlCommand(['get', 'podpatchwork'])
71130

72-
def test_retriable_kubectl_command_retries_on_connection_reset(self):
73-
self.MockIssueCommand({
74-
'get pods': [
75-
('', 'error: read: connection reset by peer', 1),
76-
('pod1, pod2', '', 0),
77-
]
78-
})
131+
@mock.patch.object(
132+
vm_util,
133+
'IssueCommand',
134+
side_effect=[
135+
errors.VmUtil.IssueCommandTimeoutError(),
136+
('pod1, pod2', '', 0),
137+
],
138+
autospec=True,
139+
)
140+
@mock.patch.object(time, 'sleep', autospec=True)
141+
def test_retriable_kubectl_command_retries_on_retriable_error(
142+
self, sleep_mock, issue_command_mock
143+
):
79144
out, err, ret = container_service.RunRetryableKubectlCommand(
80145
['get', 'pods']
81146
)
82147
self.assertEqual(out, 'pod1, pod2')
83148
self.assertEqual(err, '')
84149
self.assertEqual(ret, 0)
85150

151+
def test_retriable_kubectl_command_passes_timeout_through(self):
152+
def _VerifyTimeout(
153+
cmd: Iterable[str],
154+
timeout: int | None = vm_util.DEFAULT_TIMEOUT,
155+
**kwargs,
156+
) -> Tuple[str, str, int]:
157+
_ = cmd
158+
_ = kwargs
159+
self.assertEqual(
160+
timeout,
161+
1,
162+
'timeout not correctly passed to underlying vm_util.IssueCommand()',
163+
)
164+
return 'ok', '', 0
165+
166+
with mock.patch.object(vm_util, 'IssueCommand', _VerifyTimeout):
167+
container_service.RunRetryableKubectlCommand(['get', 'pods'], timeout=1)
168+
169+
def test_retriable_kubectl_command_fails_with_raise_on_timeout(self):
170+
with self.assertRaises(ValueError):
171+
container_service.RunRetryableKubectlCommand(
172+
['get', 'pods'], raise_on_timeout=True
173+
)
174+
86175
def test_GetNumReplicasSamples_found(self):
87176
resource_name = 'deployment/my_deployment'
88177
namespace = 'my_namespace'
@@ -161,6 +250,63 @@ def _Sample(count: int, state: str) -> Sample:
161250
],
162251
)
163252

253+
@mock.patch.object(
254+
vm_util,
255+
'IssueCommand',
256+
return_value=['stdout', 'stderr', 0],
257+
autospec=True,
258+
)
259+
def test_RunKubectlCommand(self, issue_command_mock):
260+
stdout, stderr, status = container_service.RunKubectlCommand(
261+
['get', 'pods']
262+
)
263+
self.assertEqual(stdout, 'stdout')
264+
self.assertEqual(stderr, 'stderr')
265+
self.assertEqual(status, 0)
266+
267+
@mock.patch.object(
268+
vm_util,
269+
'IssueCommand',
270+
side_effect=errors.VmUtil.IssueCommandTimeoutError(),
271+
autospec=True,
272+
)
273+
def test_RunKubectlCommand_CommandTimeoutPropagated(self, issue_command_mock):
274+
with self.assertRaises(errors.VmUtil.IssueCommandTimeoutError):
275+
container_service.RunKubectlCommand(['get', 'pods'])
276+
277+
def test_RunKubectlCommand_KubectlTimeoutRaisesCommandTimeout(self):
278+
for err in container_service._RETRYABLE_KUBECTL_ERRORS:
279+
with mock.patch.object(
280+
vm_util, 'IssueCommand', _MockedIssueCommandSuppressing(stderr=err)
281+
):
282+
with self.assertRaises(
283+
errors.VmUtil.IssueCommandTimeoutError,
284+
msg=f'Failed to raise timeout for error: {err}',
285+
):
286+
container_service.RunKubectlCommand(['get', 'pods'])
287+
288+
def test_RunKubectlCommand_KubectlTimeoutWithSuppressFailureRaisesCommandTimeout(
289+
self,
290+
):
291+
for err in container_service._RETRYABLE_KUBECTL_ERRORS:
292+
with mock.patch.object(
293+
vm_util, 'IssueCommand', _MockedIssueCommandSuppressing(stderr=err)
294+
):
295+
with self.assertRaises(
296+
errors.VmUtil.IssueCommandTimeoutError,
297+
msg=f'Failed to raise timeout for error: {err}',
298+
):
299+
container_service.RunKubectlCommand(
300+
['get', 'pods'], suppress_failure=lambda x, y, z: True
301+
)
302+
303+
@mock.patch.object(vm_util, 'IssueCommand', _MockedIssueCommandFailure)
304+
def test_RunKubectlCommand_KubectlFailWithSuppressFailure(self):
305+
_, _, status = container_service.RunKubectlCommand(
306+
['get', 'pods'], suppress_failure=lambda x, y, z: True
307+
)
308+
self.assertEqual(status, 0)
309+
164310

165311
def _ClearTimestamps(samples: Iterable[Sample]) -> Iterable[Sample]:
166312
for s in samples:

0 commit comments

Comments
 (0)