Skip to content

Commit 221c24b

Browse files
authored
fix(spark): [O11YINFRA-46] retry connection error to account for autodiscovery race condition (#21922)
1 parent 43bce57 commit 221c24b

File tree

3 files changed

+210
-5
lines changed

3 files changed

+210
-5
lines changed

spark/changelog.d/21922.fixed

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Debounce false-positive connection errors

spark/datadog_checks/spark/spark.py

Lines changed: 83 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,8 +83,12 @@ def __init__(self, name, init_config, instances):
8383
raise ConfigurationError('The cluster_name must be specified in the instance configuration')
8484

8585
self.master_address = self._get_master_address()
86+
self._connection_error_seen = False
87+
self._debounced_this_run = False
8688

8789
def check(self, _):
90+
self._debounced_this_run = False
91+
8892
tags = list(self.tags)
8993

9094
tags.append('spark_cluster:%s' % self.cluster_name)
@@ -192,6 +196,8 @@ def _get_running_apps(self):
192196
def _collect_version(self, base_url, tags):
193197
try:
194198
version_json = self._rest_request_to_json(base_url, SPARK_VERSION_PATH, SPARK_SERVICE_CHECK, tags)
199+
if version_json is None:
200+
return False
195201
version = version_json['spark']
196202
except Exception as e:
197203
self.log.debug("Failed to collect version information: %s", e)
@@ -206,10 +212,18 @@ def _driver_init(self, tags):
206212
"""
207213
self._collect_version(self.master_address, tags)
208214
running_apps = {}
215+
216+
# A request earlier in this check run already hit a debounced connection failure.
217+
# Skip the remaining driver queries so we only retry on the next scheduled run.
218+
if self._debounced_this_run:
219+
return running_apps
209220
metrics_json = self._rest_request_to_json(
210221
self.master_address, SPARK_APPS_PATH, SPARK_DRIVER_SERVICE_CHECK, tags
211222
)
212223

224+
if metrics_json is None:
225+
return running_apps
226+
213227
for app_json in metrics_json:
214228
app_id = app_json.get('id')
215229
app_name = app_json.get('name')
@@ -231,6 +245,9 @@ def _standalone_init(self, pre_20_mode, tags):
231245
self.master_address, SPARK_MASTER_STATE_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
232246
)
233247

248+
if metrics_json is None:
249+
return {}
250+
234251
running_apps = {}
235252
version_set = False
236253

@@ -251,10 +268,11 @@ def _standalone_init(self, pre_20_mode, tags):
251268
applist = self._rest_request_to_json(
252269
app_url, SPARK_APPS_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
253270
)
254-
for appl in applist:
255-
aid = appl.get('id')
256-
aname = appl.get('name')
257-
running_apps[aid] = (aname, app_url)
271+
if applist:
272+
for appl in applist:
273+
aid = appl.get('id')
274+
aname = appl.get('name')
275+
running_apps[aid] = (aname, app_url)
258276
else:
259277
running_apps[app_id] = (app_name, app_url)
260278
except Exception:
@@ -279,6 +297,9 @@ def _mesos_init(self, tags):
279297

280298
metrics_json = self._rest_request_to_json(self.master_address, MESOS_MASTER_APP_PATH, MESOS_SERVICE_CHECK, tags)
281299

300+
if metrics_json is None:
301+
return running_apps
302+
282303
if metrics_json.get('frameworks'):
283304
for app_json in metrics_json.get('frameworks'):
284305
app_id = app_json.get('id')
@@ -330,6 +351,9 @@ def _get_standalone_app_url(self, app_id, tags):
330351
self.master_address, SPARK_MASTER_APP_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags, appId=app_id
331352
)
332353

354+
if app_page is None:
355+
return None
356+
333357
dom = BeautifulSoup(app_page.text, 'html.parser')
334358
app_detail_ui_links = dom.find_all('a', string='Application Detail UI')
335359

@@ -352,6 +376,9 @@ def _yarn_get_running_spark_apps(self, tags):
352376
applicationTypes=YARN_APPLICATION_TYPES,
353377
)
354378

379+
if metrics_json is None:
380+
return {}
381+
355382
running_apps = {}
356383

357384
if metrics_json.get('apps'):
@@ -379,6 +406,8 @@ def _get_spark_app_ids(self, running_apps, tags):
379406
if not version_set:
380407
version_set = self._collect_version(tracking_url, tags)
381408
response = self._rest_request_to_json(tracking_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, tags)
409+
if response is None:
410+
continue
382411
except Exception as e:
383412
self.log.warning("Exception happened when fetching app ids for %s: %s", tracking_url, e)
384413
continue
@@ -405,6 +434,8 @@ def _describe_app(self, property, running_apps, addl_tags):
405434
response = self._rest_request(
406435
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property
407436
)
437+
if response is None:
438+
continue
408439
except HTTPError:
409440
self.log.debug("Got an error collecting %s", property, exc_info=True)
410441
continue
@@ -512,6 +543,8 @@ def _spark_structured_streams_metrics(self, running_apps, addl_tags):
512543
response = self._rest_request_to_json(
513544
base_url, self.metricsservlet_path, SPARK_SERVICE_CHECK, addl_tags
514545
)
546+
if response is None:
547+
continue
515548
self.log.debug('Structured streaming metrics: %s', response)
516549
response = {
517550
metric_name: v['value']
@@ -611,6 +644,10 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
611644
self.log.debug('Spark check URL: %s', url)
612645
response = self.http.get(url, cookies=self.proxy_redirect_cookies)
613646
response.raise_for_status()
647+
648+
# Reset connection errors on success
649+
self._connection_error_seen = False
650+
614651
content = response.text
615652
proxy_redirect_url = self._parse_proxy_redirect_url(content)
616653
if proxy_redirect_url:
@@ -633,6 +670,9 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
633670
raise
634671

635672
except (HTTPError, InvalidURL, ConnectionError) as e:
673+
if isinstance(e, ConnectionError) and self._should_suppress_connection_error(e, tags):
674+
return None
675+
636676
self.service_check(
637677
service_name,
638678
AgentCheck.CRITICAL,
@@ -654,6 +694,9 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
654694
"""
655695
response = self._rest_request(address, object_path, service_name, tags, *args, **kwargs)
656696

697+
if response is None:
698+
return None
699+
657700
try:
658701
response_json = response.json()
659702

@@ -668,6 +711,42 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
668711

669712
return response_json
670713

714+
def _should_suppress_connection_error(self, exception, tags):
715+
"""Suppress kubernetes-only connection false positives during pod shutdown."""
716+
pod_phase = self._get_pod_phase(tags)
717+
if pod_phase is None:
718+
return False
719+
720+
if pod_phase in ('failed', 'succeeded', 'unknown'):
721+
self.log.debug("Pod phase is terminal, suppressing request error: %s", exception)
722+
return True
723+
724+
if (
725+
not self._connection_error_seen
726+
and not self._debounced_this_run
727+
and ("Connection refused" in str(exception) or "No route to host" in str(exception))
728+
):
729+
self._connection_error_seen = True
730+
self._debounced_this_run = True
731+
self.log.warning(
732+
"Connection failed. Suppressing error once to ensure driver is running. Error: %s",
733+
exception,
734+
)
735+
return True
736+
737+
return False
738+
739+
def _is_pod_in_terminal_state(self, tags):
740+
pod_phase = self._get_pod_phase(tags)
741+
return pod_phase in ('failed', 'succeeded', 'unknown') if pod_phase is not None else False
742+
743+
@staticmethod
744+
def _get_pod_phase(tags):
745+
for tag in tags or []:
746+
if tag.startswith('pod_phase:'):
747+
return tag.split(':', 1)[1].strip().lower()
748+
return None
749+
671750
@classmethod
672751
def _join_url_dir(cls, url, *args):
673752
"""

spark/tests/test_spark.py

Lines changed: 126 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import mock
1313
import pytest
1414
import urllib3
15-
from requests import RequestException
15+
from requests import ConnectionError, RequestException
1616

1717
from datadog_checks.dev.http import MockResponse
1818
from datadog_checks.dev.utils import get_metadata_metrics
@@ -1465,3 +1465,128 @@ def test_integration_driver_2(aggregator, dd_run_check):
14651465
)
14661466
aggregator.assert_all_metrics_covered()
14671467
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
1468+
1469+
1470+
@pytest.mark.unit
1471+
def test_debounce_connection_failure(aggregator, dd_run_check, caplog):
1472+
# Mock connection failure
1473+
def connection_failure_mock(*args, **kwargs):
1474+
raise ConnectionError("Connection refused")
1475+
1476+
instance = DRIVER_CONFIG.copy()
1477+
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Running']
1478+
1479+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1480+
c = SparkCheck('spark', {}, [instance])
1481+
1482+
# First run: expect warning, no CRITICAL check
1483+
with caplog.at_level(logging.WARNING):
1484+
dd_run_check(c)
1485+
1486+
assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text
1487+
1488+
# Verify no CRITICAL check sent for spark.driver.can_connect
1489+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1490+
assert len(service_checks) == 0
1491+
1492+
# Second run: expect CRITICAL (wrapped by dd_run_check as Exception)
1493+
with pytest.raises(Exception) as excinfo:
1494+
dd_run_check(c)
1495+
1496+
assert "Connection refused" in str(excinfo.value)
1497+
1498+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1499+
assert len(service_checks) == 1
1500+
assert service_checks[0].status == SparkCheck.CRITICAL
1501+
1502+
1503+
@pytest.mark.unit
1504+
def test_connection_failure_non_k8s(aggregator, dd_run_check):
1505+
def connection_failure_mock(*args, **kwargs):
1506+
raise ConnectionError("Connection refused")
1507+
1508+
instance = DRIVER_CONFIG.copy()
1509+
instance['tags'] = list(instance.get('tags', []))
1510+
1511+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1512+
c = SparkCheck('spark', {}, [instance])
1513+
1514+
with pytest.raises(Exception) as excinfo:
1515+
dd_run_check(c)
1516+
1517+
assert "Connection refused" in str(excinfo.value)
1518+
1519+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1520+
assert len(service_checks) == 1
1521+
assert service_checks[0].status == SparkCheck.CRITICAL
1522+
1523+
1524+
@pytest.mark.unit
1525+
def test_debounce_connection_failure_terminal_phase(aggregator, dd_run_check, caplog):
1526+
def connection_failure_mock(*args, **kwargs):
1527+
raise ConnectionError("Connection refused")
1528+
1529+
instance = DRIVER_CONFIG.copy()
1530+
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Failed']
1531+
1532+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1533+
c = SparkCheck('spark', {}, [instance])
1534+
1535+
with caplog.at_level(logging.DEBUG):
1536+
dd_run_check(c)
1537+
1538+
assert "Pod phase is terminal, suppressing request error" in caplog.text
1539+
1540+
# Expect NO service check because we suppress errors for failed pods
1541+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1542+
assert len(service_checks) == 0
1543+
1544+
1545+
@pytest.mark.unit
1546+
def test_debounce_connection_recovery(aggregator, dd_run_check, caplog):
1547+
# Mock connection failure
1548+
def connection_failure_mock(*args, **kwargs):
1549+
raise ConnectionError("Connection refused")
1550+
1551+
instance = DRIVER_CONFIG.copy()
1552+
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Running']
1553+
1554+
c = SparkCheck('spark', {}, [instance])
1555+
1556+
# 1. Fail (Debounce)
1557+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1558+
with caplog.at_level(logging.WARNING):
1559+
dd_run_check(c)
1560+
1561+
assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text
1562+
# Verify no CRITICAL check sent
1563+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1564+
assert len(service_checks) == 0
1565+
1566+
caplog.clear()
1567+
aggregator.reset()
1568+
1569+
# 2. Success (Reset)
1570+
with mock.patch('requests.Session.get', driver_requests_get_mock):
1571+
dd_run_check(c)
1572+
1573+
# Verify success
1574+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1575+
assert len(service_checks) > 0
1576+
assert service_checks[0].status == SparkCheck.OK
1577+
1578+
# Verify internal state was reset
1579+
assert c._connection_error_seen is False
1580+
1581+
caplog.clear()
1582+
aggregator.reset()
1583+
1584+
# 3. Fail (Debounce again)
1585+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1586+
with caplog.at_level(logging.WARNING):
1587+
dd_run_check(c)
1588+
1589+
assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text
1590+
# Verify no CRITICAL check sent
1591+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1592+
assert len(service_checks) == 0

0 commit comments

Comments
 (0)