Skip to content

Commit b40d6e4

Browse files
committed
Log transient errors as warnings during health check
1 parent ef08482 commit b40d6e4

File tree

5 files changed

+259
-18
lines changed

5 files changed

+259
-18
lines changed

CHANGES.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ Unreleased
99

1010
* Upgraded ``kopf`` to ``1.37.5``.
1111

12+
* Log transient connection and timeout errors as warnings to reduce log noise.
13+
1214
2.46.0 (2025-03-17)
1315
-------------------
1416

crate/operator/handlers/handle_ping_cratedb_status.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,10 @@
2020
# software solely pursuant to the terms of the relevant commercial agreement.
2121

2222
import logging
23+
from asyncio import TimeoutError
2324

2425
import kopf
26+
from aiohttp.client_exceptions import ClientConnectorError
2527
from kubernetes_asyncio.client import CoreV1Api
2628

2729
from crate.operator.cratedb import connection_factory, get_healthiness
@@ -57,23 +59,37 @@ async def ping_cratedb_status(
5759
patch.status[CLUSTER_STATUS_KEY] = {"health": "SUSPENDED"}
5860
return
5961

60-
async with GlobalApiClient() as api_client:
61-
core = CoreV1Api(api_client)
62-
host = await get_host(core, namespace, name)
63-
password = await get_system_user_password(core, namespace, name)
64-
conn_factory = connection_factory(host, password)
65-
6662
try:
67-
async with conn_factory() as conn:
68-
async with conn.cursor() as cursor:
63+
async with GlobalApiClient() as api_client:
64+
core = CoreV1Api(api_client)
65+
host = await get_host(core, namespace, name)
66+
password = await get_system_user_password(core, namespace, name)
67+
conn_factory = connection_factory(host, password)
68+
connection = conn_factory()
69+
70+
async with connection as conn:
71+
cursor_cm = await conn.cursor()
72+
async with cursor_cm as cursor:
6973
healthiness = await get_healthiness(cursor)
7074
# If there are no tables in the cluster, get_healthiness returns
7175
# none: default to `Green`, as cluster is reachable
7276
status = HEALTHINESS_TO_STATUS.get(
7377
healthiness, PrometheusClusterStatus.GREEN
7478
)
7579
except Exception as e:
76-
logger.warning("Failed to ping cluster.", exc_info=e)
80+
if isinstance(e, ClientConnectorError):
81+
error_msg = (
82+
"Transient Kubernetes API connection error during health check: %s"
83+
)
84+
logger.warning(error_msg, e)
85+
elif isinstance(e, TimeoutError):
86+
error_msg = "Timeout while connecting to CrateDB during health check: %s"
87+
logger.warning(error_msg, e)
88+
else:
89+
logger.warning(
90+
"Unexpected error during CrateDB health check.", exc_info=True
91+
)
92+
7793
status = PrometheusClusterStatus.UNREACHABLE
7894

7995
report_cluster_status(name, cluster_name, namespace, status)

crate/operator/scale.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -213,9 +213,11 @@ async def check_nodes_present_or_gone(
213213
logger=logger,
214214
status=WebhookStatus.IN_PROGRESS,
215215
operation=WebhookOperation.UPDATE,
216-
action=WebhookAction.SUSPEND
217-
if old_replicas == 0
218-
else WebhookAction.SCALE,
216+
action=(
217+
WebhookAction.SUSPEND
218+
if old_replicas == 0
219+
else WebhookAction.SCALE
220+
),
219221
)
220222

221223
raise kopf.TemporaryError(
@@ -234,9 +236,11 @@ async def check_nodes_present_or_gone(
234236
logger=logger,
235237
status=WebhookStatus.IN_PROGRESS,
236238
operation=WebhookOperation.UPDATE,
237-
action=WebhookAction.SUSPEND
238-
if new_replicas == 0
239-
else WebhookAction.SCALE,
239+
action=(
240+
WebhookAction.SUSPEND
241+
if new_replicas == 0
242+
else WebhookAction.SCALE
243+
),
240244
)
241245
raise kopf.TemporaryError(
242246
f"Waiting for nodes {excess_nodes} to be gone.", delay=15

tests/conftest.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,9 +109,9 @@ def load_config(worker_id):
109109
"CRATEDB_OPERATOR_JMX_EXPORTER_VERSION": "1.2.0",
110110
"CRATEDB_OPERATOR_LOG_LEVEL": "DEBUG",
111111
"CRATEDB_OPERATOR_TESTING": "true",
112-
"CRATEDB_OPERATOR_PARALLEL_TESTING": "false"
113-
if worker_id == "master"
114-
else "true",
112+
"CRATEDB_OPERATOR_PARALLEL_TESTING": (
113+
"false" if worker_id == "master" else "true"
114+
),
115115
"CRATEDB_OPERATOR_JOBS_TABLE": "test.test_sys_jobs",
116116
"CRATEDB_OPERATOR_BOOTSTRAP_RETRY_DELAY": "5",
117117
"CRATEDB_OPERATOR_HEALTH_CHECK_RETRY_DELAY": "5",

tests/test_ping_cratedb_status.py

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
# CrateDB Kubernetes Operator
2+
#
3+
# Licensed to Crate.IO GmbH ("Crate") under one or more contributor
4+
# license agreements. See the NOTICE file distributed with this work for
5+
# additional information regarding copyright ownership. Crate licenses
6+
# this file to you under the Apache License, Version 2.0 (the "License");
7+
# you may not use this file except in compliance with the License. You may
8+
# obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
14+
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
15+
# License for the specific language governing permissions and limitations
16+
# under the License.
17+
#
18+
# However, if you have executed another commercial license agreement
19+
# with Crate these terms will supersede the license and you may use the
20+
# software solely pursuant to the terms of the relevant commercial agreement.
21+
22+
import logging
23+
from asyncio import TimeoutError
24+
from unittest.mock import AsyncMock, MagicMock, patch
25+
26+
import pytest
27+
from aiohttp.client_exceptions import ClientConnectorError
28+
29+
from crate.operator.handlers.handle_ping_cratedb_status import (
30+
CLUSTER_STATUS_KEY,
31+
ping_cratedb_status,
32+
)
33+
from crate.operator.prometheus import PrometheusClusterStatus
34+
35+
36+
@pytest.fixture
37+
def mock_cratedb_connection():
38+
mock_cursor = AsyncMock()
39+
mock_cursor.__aenter__.return_value = mock_cursor
40+
mock_cursor.__aexit__.return_value = None
41+
42+
mock_conn = AsyncMock()
43+
mock_conn.__aenter__.return_value = mock_conn
44+
mock_conn.__aexit__.return_value = None
45+
mock_conn.cursor.return_value = mock_cursor
46+
47+
patcher = patch("crate.operator.cratedb.aiopg.connect", return_value=mock_conn)
48+
mocked_connect = patcher.start()
49+
50+
yield {
51+
"mock_connect": mocked_connect,
52+
"mock_conn": mock_conn,
53+
"mock_cursor": mock_cursor,
54+
}
55+
56+
patcher.stop()
57+
58+
59+
@pytest.mark.asyncio
60+
async def test_ping_cratedb_status_success(mock_cratedb_connection):
61+
namespace = "test-ns"
62+
name = "test-cluster"
63+
cluster_name = "test-cluster"
64+
desired_instances = 1
65+
66+
patch_obj = MagicMock()
67+
logger = MagicMock(spec=logging.Logger)
68+
69+
mock_api_client = AsyncMock()
70+
mock_api_client.__aenter__.return_value = mock_api_client
71+
72+
mock_core = MagicMock()
73+
74+
with (
75+
patch(
76+
"crate.operator.handlers.handle_ping_cratedb_status.GlobalApiClient",
77+
return_value=mock_api_client,
78+
),
79+
patch(
80+
"crate.operator.handlers.handle_ping_cratedb_status.CoreV1Api",
81+
return_value=mock_core,
82+
),
83+
patch(
84+
"crate.operator.handlers.handle_ping_cratedb_status.get_host",
85+
new_callable=AsyncMock,
86+
return_value="crate.db.local",
87+
),
88+
patch(
89+
"crate.operator.handlers.handle_ping_cratedb_status.get_system_user_password", # noqa
90+
new_callable=AsyncMock,
91+
return_value="secret-password",
92+
),
93+
patch(
94+
"crate.operator.handlers.handle_ping_cratedb_status.get_healthiness",
95+
new_callable=AsyncMock,
96+
return_value="GREEN",
97+
),
98+
patch(
99+
"crate.operator.handlers.handle_ping_cratedb_status.report_cluster_status"
100+
) as mock_report,
101+
patch(
102+
"crate.operator.handlers.handle_ping_cratedb_status.webhook_client.send_notification", # noqa
103+
new_callable=AsyncMock,
104+
),
105+
):
106+
107+
await ping_cratedb_status(
108+
namespace=namespace,
109+
name=name,
110+
cluster_name=cluster_name,
111+
desired_instances=desired_instances,
112+
patch=patch_obj,
113+
logger=logger,
114+
)
115+
116+
mock_conn = mock_cratedb_connection["mock_conn"]
117+
mock_cursor = mock_cratedb_connection["mock_cursor"]
118+
119+
mock_conn.__aenter__.assert_awaited_once()
120+
mock_conn.cursor.assert_called_once()
121+
mock_cursor.__aenter__.assert_awaited_once()
122+
123+
patch_obj.status.__setitem__.assert_called_once_with(
124+
CLUSTER_STATUS_KEY, {"health": PrometheusClusterStatus.GREEN.name}
125+
)
126+
127+
mock_report.assert_called_once_with(
128+
name, cluster_name, namespace, PrometheusClusterStatus.GREEN
129+
)
130+
131+
132+
fake_key = MagicMock()
133+
fake_key.ssl = None
134+
135+
136+
@pytest.mark.asyncio
137+
@pytest.mark.parametrize(
138+
"side_effect, expected_log_method, expected_log_message",
139+
[
140+
(
141+
ClientConnectorError(fake_key, OSError("kaboom")),
142+
"warning",
143+
"Transient Kubernetes API connection error during health check",
144+
),
145+
(
146+
TimeoutError("timed out"),
147+
"warning",
148+
"Timeout while connecting to CrateDB during health check",
149+
),
150+
(
151+
Exception("generic failure"),
152+
"warning",
153+
"Unexpected error during CrateDB health check",
154+
),
155+
],
156+
)
157+
async def test_ping_cratedb_status_exceptions(
158+
side_effect, expected_log_method, expected_log_message
159+
):
160+
namespace = "test-ns"
161+
name = "test-cluster"
162+
cluster_name = "test-cluster"
163+
desired_instances = 1
164+
165+
patch_obj = MagicMock()
166+
logger = MagicMock(spec=logging.Logger)
167+
168+
mock_api_client_cm = AsyncMock()
169+
mock_core = MagicMock()
170+
171+
with (
172+
patch(
173+
"crate.operator.handlers.handle_ping_cratedb_status.GlobalApiClient",
174+
return_value=mock_api_client_cm,
175+
),
176+
patch(
177+
"crate.operator.handlers.handle_ping_cratedb_status.CoreV1Api",
178+
return_value=mock_core,
179+
),
180+
patch(
181+
"crate.operator.handlers.handle_ping_cratedb_status.get_host",
182+
side_effect=side_effect,
183+
),
184+
patch(
185+
"crate.operator.handlers.handle_ping_cratedb_status.get_system_user_password", # noqa
186+
new_callable=AsyncMock,
187+
),
188+
patch(
189+
"crate.operator.handlers.handle_ping_cratedb_status.report_cluster_status"
190+
) as mock_report,
191+
patch(
192+
"crate.operator.handlers.handle_ping_cratedb_status.webhook_client.send_notification", # noqa
193+
new_callable=AsyncMock,
194+
),
195+
):
196+
197+
await ping_cratedb_status(
198+
namespace=namespace,
199+
name=name,
200+
cluster_name=cluster_name,
201+
desired_instances=desired_instances,
202+
patch=patch_obj,
203+
logger=logger,
204+
)
205+
206+
# validate patch to status UNREACHABLE
207+
patch_obj.status.__setitem__.assert_called_once_with(
208+
CLUSTER_STATUS_KEY, {"health": PrometheusClusterStatus.UNREACHABLE.name}
209+
)
210+
211+
# validate the correct logger method was called
212+
log_method = getattr(logger, expected_log_method)
213+
log_method.assert_called_once()
214+
assert expected_log_message in log_method.call_args[0][0]
215+
216+
# check status was reported
217+
mock_report.assert_called_once_with(
218+
name, cluster_name, namespace, PrometheusClusterStatus.UNREACHABLE
219+
)

0 commit comments

Comments
 (0)