Skip to content

Commit 8896615

Browse files
committed
Log transient errors as warnings during health check
1 parent 21275f8 commit 8896615

File tree

10 files changed

+271
-29
lines changed

10 files changed

+271
-29
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ repos:
66
hooks:
77
- id: trailing-whitespace
88
- repo: https://github.com/psf/black
9-
rev: 22.3.0
9+
rev: 25.1.0
1010
hooks:
1111
- id: black
1212
- repo: https://github.com/PyCQA/flake8

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ Changelog
55
Unreleased
66
----------
77

8+
* Log transient connection and timeout errors as warnings to reduce log noise.
9+
810
2.46.0 (2025-03-17)
911
-------------------
1012

crate/operator/create.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,9 @@ def get_statefulset_crate_command(
516516
elif config.CLOUD_PROVIDER == CloudProvider.GCP:
517517
url = "http://169.254.169.254/computeMetadata/v1/instance/zone" # noqa
518518
# projects/<account-id>/zones/us-central1-a
519-
settings[
520-
"-Cnode.attr.zone"
521-
] = f"$(curl -s '{url}' -H 'Metadata-Flavor: Google' | awk -F'/' '{{print $NF}}')" # noqa
519+
settings["-Cnode.attr.zone"] = (
520+
f"$(curl -s '{url}' -H 'Metadata-Flavor: Google' | awk -F'/' '{{print $NF}}')" # noqa
521+
)
522522

523523
if cluster_settings:
524524
for k, v in cluster_settings.items():

crate/operator/handlers/handle_ping_cratedb_status.py

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
# software solely pursuant to the terms of the relevant commercial agreement.
2121

2222
import logging
23+
from asyncio import TimeoutError
2324

2425
import kopf
26+
from aiohttp.client_exceptions import ClientConnectorError
2527
from kubernetes_asyncio.client import CoreV1Api
2628

2729
from crate.operator.cratedb import connection_factory, get_healthiness
@@ -57,23 +59,39 @@ async def ping_cratedb_status(
5759
patch.status[CLUSTER_STATUS_KEY] = {"health": "SUSPENDED"}
5860
return
5961

60-
async with GlobalApiClient() as api_client:
61-
core = CoreV1Api(api_client)
62-
host = await get_host(core, namespace, name)
63-
password = await get_system_user_password(core, namespace, name)
64-
conn_factory = connection_factory(host, password)
65-
6662
try:
67-
async with conn_factory() as conn:
68-
async with conn.cursor() as cursor:
63+
async with GlobalApiClient() as api_client:
64+
core = CoreV1Api(api_client)
65+
host = await get_host(core, namespace, name)
66+
password = await get_system_user_password(core, namespace, name)
67+
conn_factory = connection_factory(host, password)
68+
connection = conn_factory()
69+
70+
async with connection as conn:
71+
cursor_cm = await conn.cursor()
72+
async with cursor_cm as cursor:
6973
healthiness = await get_healthiness(cursor)
7074
# If there are no tables in the cluster, get_healthiness returns
7175
# none: default to `Green`, as cluster is reachable
7276
status = HEALTHINESS_TO_STATUS.get(
7377
healthiness, PrometheusClusterStatus.GREEN
7478
)
7579
except Exception as e:
76-
logger.warning("Failed to ping cluster.", exc_info=e)
80+
match e:
81+
case ClientConnectorError():
82+
logger.warning(
83+
"Transient Kubernetes API connection error during health check: %s",
84+
e,
85+
)
86+
case TimeoutError():
87+
logger.warning(
88+
"Timeout while connecting to CrateDB during health check: %s", e
89+
)
90+
case _:
91+
logger.warning(
92+
"Unexpected error during CrateDB health check.", exc_info=True
93+
)
94+
7795
status = PrometheusClusterStatus.UNREACHABLE
7896

7997
report_cluster_status(name, cluster_name, namespace, status)

crate/operator/kube_auth.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,9 +65,7 @@ async def login_via_kubernetes_asyncio(
6565
scheme, token = (
6666
(None, None)
6767
if len(parts) == 0
68-
else (None, parts[0])
69-
if len(parts) == 1
70-
else (parts[0], parts[1])
68+
else (None, parts[0]) if len(parts) == 1 else (parts[0], parts[1])
7169
) # RFC-7235, Appendix C.
7270

7371
# Interpret the k8s_config object for our own minimalistic credentials.

crate/operator/restore_backup.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -527,9 +527,9 @@ async def _prepare_cluster_settings(
527527
)
528528
)
529529
patch.status["maxBytesPerSec"] = max_bytes_per_sec
530-
patch.status[
531-
"clusterConcurrentRebalance"
532-
] = cluster_concurrent_rebalance
530+
patch.status["clusterConcurrentRebalance"] = (
531+
cluster_concurrent_rebalance
532+
)
533533
# update the settings during restore operation
534534
await set_cluster_setting(
535535
conn_factory,

crate/operator/scale.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,11 @@ async def check_nodes_present_or_gone(
213213
logger=logger,
214214
status=WebhookStatus.IN_PROGRESS,
215215
operation=WebhookOperation.UPDATE,
216-
action=WebhookAction.SUSPEND
217-
if old_replicas == 0
218-
else WebhookAction.SCALE,
216+
action=(
217+
WebhookAction.SUSPEND
218+
if old_replicas == 0
219+
else WebhookAction.SCALE
220+
),
219221
)
220222

221223
raise kopf.TemporaryError(
@@ -234,9 +236,11 @@ async def check_nodes_present_or_gone(
234236
logger=logger,
235237
status=WebhookStatus.IN_PROGRESS,
236238
operation=WebhookOperation.UPDATE,
237-
action=WebhookAction.SUSPEND
238-
if new_replicas == 0
239-
else WebhookAction.SCALE,
239+
action=(
240+
WebhookAction.SUSPEND
241+
if new_replicas == 0
242+
else WebhookAction.SCALE
243+
),
240244
)
241245
raise kopf.TemporaryError(
242246
f"Waiting for nodes {excess_nodes} to be gone.", delay=15

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
[tool.black]
2-
target-version = ['py38']
2+
target-version = ['py313']
3+
line-length = 88
34

45
[build-system]
56
requires = ["setuptools>=58", "wheel", "setuptools_scm>=6.2"]

tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ def load_config(worker_id):
109109
"CRATEDB_OPERATOR_JMX_EXPORTER_VERSION": "1.2.0",
110110
"CRATEDB_OPERATOR_LOG_LEVEL": "DEBUG",
111111
"CRATEDB_OPERATOR_TESTING": "true",
112-
"CRATEDB_OPERATOR_PARALLEL_TESTING": "false"
113-
if worker_id == "master"
114-
else "true",
112+
"CRATEDB_OPERATOR_PARALLEL_TESTING": (
113+
"false" if worker_id == "master" else "true"
114+
),
115115
"CRATEDB_OPERATOR_JOBS_TABLE": "test.test_sys_jobs",
116116
"CRATEDB_OPERATOR_BOOTSTRAP_RETRY_DELAY": "5",
117117
"CRATEDB_OPERATOR_HEALTH_CHECK_RETRY_DELAY": "5",

tests/test_ping_cratedb_status.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# CrateDB Kubernetes Operator
2+
#
3+
# Licensed to Crate.IO GmbH ("Crate") under one or more contributor
4+
# license agreements. See the NOTICE file distributed with this work for
5+
# additional information regarding copyright ownership. Crate licenses
6+
# this file to you under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License. You may
8+
# obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
# License for the specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# However, if you have executed another commercial license agreement
19+
# with Crate these terms will supersede the license and you may use the
20+
# software solely pursuant to the terms of the relevant commercial agreement.
21+
22+
import logging
23+
from asyncio import TimeoutError
24+
from unittest.mock import AsyncMock, MagicMock, patch
25+
26+
import pytest
27+
from aiohttp.client_exceptions import ClientConnectorError
28+
29+
from crate.operator.handlers.handle_ping_cratedb_status import (
30+
CLUSTER_STATUS_KEY,
31+
ping_cratedb_status,
32+
)
33+
from crate.operator.prometheus import PrometheusClusterStatus
34+
35+
36+
@pytest.fixture
37+
def mock_cratedb_connection():
38+
mock_cursor = AsyncMock()
39+
mock_cursor.__aenter__.return_value = mock_cursor
40+
mock_cursor.__aexit__.return_value = None
41+
42+
mock_conn = AsyncMock()
43+
mock_conn.__aenter__.return_value = mock_conn
44+
mock_conn.__aexit__.return_value = None
45+
mock_conn.cursor.return_value = mock_cursor
46+
47+
patcher = patch("crate.operator.cratedb.aiopg.connect", return_value=mock_conn)
48+
mocked_connect = patcher.start()
49+
50+
yield {
51+
"mock_connect": mocked_connect,
52+
"mock_conn": mock_conn,
53+
"mock_cursor": mock_cursor,
54+
}
55+
56+
patcher.stop()
57+
58+
59+
@pytest.mark.asyncio
60+
async def test_ping_cratedb_status_success(mock_cratedb_connection):
61+
namespace = "test-ns"
62+
name = "test-cluster"
63+
cluster_name = "test-cluster"
64+
desired_instances = 1
65+
66+
patch_obj = MagicMock()
67+
logger = MagicMock(spec=logging.Logger)
68+
69+
mock_api_client = AsyncMock()
70+
mock_api_client.__aenter__.return_value = mock_api_client
71+
72+
mock_core = MagicMock()
73+
74+
with (
75+
patch(
76+
"crate.operator.handlers.handle_ping_cratedb_status.GlobalApiClient",
77+
return_value=mock_api_client,
78+
),
79+
patch(
80+
"crate.operator.handlers.handle_ping_cratedb_status.CoreV1Api",
81+
return_value=mock_core,
82+
),
83+
patch(
84+
"crate.operator.handlers.handle_ping_cratedb_status.get_host",
85+
new_callable=AsyncMock,
86+
return_value="crate.db.local",
87+
),
88+
patch(
89+
"crate.operator.handlers.handle_ping_cratedb_status.get_system_user_password", # noqa
90+
new_callable=AsyncMock,
91+
return_value="secret-password",
92+
),
93+
patch(
94+
"crate.operator.handlers.handle_ping_cratedb_status.get_healthiness",
95+
new_callable=AsyncMock,
96+
return_value="GREEN",
97+
),
98+
patch(
99+
"crate.operator.handlers.handle_ping_cratedb_status.report_cluster_status"
100+
) as mock_report,
101+
patch(
102+
"crate.operator.handlers.handle_ping_cratedb_status.webhook_client.send_notification", # noqa
103+
new_callable=AsyncMock,
104+
),
105+
):
106+
107+
await ping_cratedb_status(
108+
namespace=namespace,
109+
name=name,
110+
cluster_name=cluster_name,
111+
desired_instances=desired_instances,
112+
patch=patch_obj,
113+
logger=logger,
114+
)
115+
116+
mock_conn = mock_cratedb_connection["mock_conn"]
117+
mock_cursor = mock_cratedb_connection["mock_cursor"]
118+
119+
mock_conn.__aenter__.assert_awaited_once()
120+
mock_conn.cursor.assert_called_once()
121+
mock_cursor.__aenter__.assert_awaited_once()
122+
123+
patch_obj.status.__setitem__.assert_called_once_with(
124+
CLUSTER_STATUS_KEY, {"health": PrometheusClusterStatus.GREEN.name}
125+
)
126+
127+
mock_report.assert_called_once_with(
128+
name, cluster_name, namespace, PrometheusClusterStatus.GREEN
129+
)
130+
131+
132+
fake_key = MagicMock()
133+
fake_key.ssl = None
134+
135+
136+
@pytest.mark.asyncio
137+
@pytest.mark.parametrize(
138+
"side_effect, expected_log_method, expected_log_message",
139+
[
140+
(
141+
ClientConnectorError(fake_key, OSError("kaboom")),
142+
"warning",
143+
"Transient Kubernetes API connection error during health check",
144+
),
145+
(
146+
TimeoutError("timed out"),
147+
"warning",
148+
"Timeout while connecting to CrateDB during health check",
149+
),
150+
(
151+
Exception("generic failure"),
152+
"warning",
153+
"Unexpected error during CrateDB health check",
154+
),
155+
],
156+
)
157+
async def test_ping_cratedb_status_exceptions(
158+
side_effect, expected_log_method, expected_log_message
159+
):
160+
namespace = "test-ns"
161+
name = "test-cluster"
162+
cluster_name = "test-cluster"
163+
desired_instances = 1
164+
165+
patch_obj = MagicMock()
166+
logger = MagicMock(spec=logging.Logger)
167+
168+
mock_api_client_cm = AsyncMock()
169+
mock_core = MagicMock()
170+
171+
with (
172+
patch(
173+
"crate.operator.handlers.handle_ping_cratedb_status.GlobalApiClient",
174+
return_value=mock_api_client_cm,
175+
),
176+
patch(
177+
"crate.operator.handlers.handle_ping_cratedb_status.CoreV1Api",
178+
return_value=mock_core,
179+
),
180+
patch(
181+
"crate.operator.handlers.handle_ping_cratedb_status.get_host",
182+
side_effect=side_effect,
183+
),
184+
patch(
185+
"crate.operator.handlers.handle_ping_cratedb_status.get_system_user_password", # noqa
186+
new_callable=AsyncMock,
187+
),
188+
patch(
189+
"crate.operator.handlers.handle_ping_cratedb_status.report_cluster_status"
190+
) as mock_report,
191+
patch(
192+
"crate.operator.handlers.handle_ping_cratedb_status.webhook_client.send_notification", # noqa
193+
new_callable=AsyncMock,
194+
),
195+
):
196+
197+
await ping_cratedb_status(
198+
namespace=namespace,
199+
name=name,
200+
cluster_name=cluster_name,
201+
desired_instances=desired_instances,
202+
patch=patch_obj,
203+
logger=logger,
204+
)
205+
206+
# validate patch to status UNREACHABLE
207+
patch_obj.status.__setitem__.assert_called_once_with(
208+
CLUSTER_STATUS_KEY, {"health": PrometheusClusterStatus.UNREACHABLE.name}
209+
)
210+
211+
# validate the correct logger method was called
212+
log_method = getattr(logger, expected_log_method)
213+
log_method.assert_called_once()
214+
assert expected_log_message in log_method.call_args[0][0]
215+
216+
# check status was reported
217+
mock_report.assert_called_once_with(
218+
name, cluster_name, namespace, PrometheusClusterStatus.UNREACHABLE
219+
)

0 commit comments

Comments
 (0)