Skip to content

Commit caf90d5

Browse files
Kyle-Nealesteveny91
authored andcommitted
Fix for Spark driver init readiness pending (#22252)
* Fix for Spark driver init readiness pending * Add changelog * Lint * Add config param for retries * Simplify description * Address comments * Update spark/assets/configuration/spec.yaml Co-authored-by: Steven Yuen <[email protected]> * ddev sync models and config --------- Co-authored-by: Steven Yuen <[email protected]> (cherry picked from commit 7477060)
1 parent bf9ee89 commit caf90d5

File tree

7 files changed

+175
-0
lines changed

7 files changed

+175
-0
lines changed

spark/assets/configuration/spec.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,18 @@ files:
135135
type: boolean
136136
display_default: false
137137
example: true
138+
- name: startup_wait_retries
139+
description: |
140+
Controls how to handle "Spark is starting up" responses from the driver.
141+
When the Spark driver is not yet ready, it returns a non-JSON startup message
142+
instead of the expected JSON response.
143+
144+
Set to 0 to disable this feature and treat startup messages as errors immediately.
145+
Set to a positive integer to retry that many times before marking the check as failed.
146+
value:
147+
type: integer
148+
display_default: 3
149+
example: 3
138150
- template: instances/http
139151
overrides:
140152
auth_token.description: |

spark/changelog.d/22252.added

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `startup_wait_retries` option to handle Spark driver startup messages gracefully.

spark/datadog_checks/spark/config_models/defaults.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,10 @@ def instance_spark_proxy_enabled():
100100
return False
101101

102102

103+
def instance_startup_wait_retries():
104+
return 3
105+
106+
103107
def instance_streaming_metrics():
104108
return True
105109

spark/datadog_checks/spark/config_models/instance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ class InstanceConfig(BaseModel):
9696
spark_proxy_enabled: Optional[bool] = None
9797
spark_ui_ports: Optional[tuple[int, ...]] = None
9898
spark_url: str
99+
startup_wait_retries: Optional[int] = None
99100
streaming_metrics: Optional[bool] = None
100101
tags: Optional[tuple[str, ...]] = None
101102
timeout: Optional[float] = None

spark/datadog_checks/spark/data/conf.yaml.example

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,16 @@ instances:
155155
#
156156
# disable_spark_stage_metrics: true
157157

158+
## @param startup_wait_retries - integer - optional - default: 3
159+
## Controls how to handle "Spark is starting up" responses from the driver.
160+
## When the Spark driver is not yet ready, it returns a non-JSON startup message
161+
## instead of the expected JSON response.
162+
##
163+
## Set to 0 to disable this feature and treat startup messages as errors immediately.
164+
## Set to a positive integer to retry that many times before marking the check as failed.
165+
#
166+
# startup_wait_retries: 3
167+
158168
## @param proxy - mapping - optional
159169
## This overrides the `proxy` setting in `init_config`.
160170
##

spark/datadog_checks/spark/spark.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,12 @@ def __init__(self, name, init_config, instances):
8484

8585
self.master_address = self._get_master_address()
8686

87+
# Startup retry configuration:
88+
# -1: disable (treat startup messages as JSON parse errors immediately)
89+
# >0: retry N times before marking as broken
90+
self._startup_wait_retries = int(self.instance.get('startup_wait_retries', 3))
91+
self._startup_retry_count = 0
92+
8793
def check(self, _):
8894
tags = list(self.tags)
8995

@@ -658,6 +664,27 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
658664
response_json = response.json()
659665

660666
except JSONDecodeError as e:
667+
response_text = response.text.strip()
668+
if response_text and 'spark is starting up' in response_text.lower():
669+
# Handle startup message based on retry configuration
670+
if self._startup_wait_retries > 0:
671+
self._startup_retry_count += 1
672+
if self._startup_retry_count <= self._startup_wait_retries:
673+
self.log.debug(
674+
"Spark driver not ready yet at %s (attempt %d/%d): %s",
675+
self._get_url_base(address),
676+
self._startup_retry_count,
677+
self._startup_wait_retries,
678+
response_text,
679+
)
680+
return None
681+
else:
682+
self.log.warning(
683+
"Spark driver startup retries exhausted (%d/%d)",
684+
self._startup_retry_count,
685+
self._startup_wait_retries,
686+
)
687+
661688
self.service_check(
662689
service_name,
663690
AgentCheck.CRITICAL,
@@ -666,6 +693,8 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
666693
)
667694
raise
668695

696+
# Reset startup retry counter on successful JSON parse
697+
self._startup_retry_count = 0
669698
return response_json
670699

671700
@classmethod

spark/tests/test_spark.py

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,124 @@ def test_do_not_crash_on_version_collection_failure():
11871187
assert not c._collect_version(running_apps, [])
11881188

11891189

1190+
@pytest.mark.unit
1191+
def test_driver_startup_message_default_retries(aggregator, caplog):
1192+
"""Default behavior (startup_wait_retries=3): retry 3 times then raise."""
1193+
from simplejson import JSONDecodeError
1194+
1195+
check = SparkCheck('spark', {}, [DRIVER_CONFIG])
1196+
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
1197+
1198+
with caplog.at_level(logging.DEBUG):
1199+
with mock.patch.object(check, '_rest_request', return_value=response):
1200+
# First 3 attempts should return None (default is 3 retries)
1201+
for i in range(3):
1202+
result = check._rest_request_to_json(
1203+
DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
1204+
)
1205+
assert result is None, f"Attempt {i + 1} should return None"
1206+
1207+
# 4th attempt should raise
1208+
with pytest.raises(JSONDecodeError):
1209+
check._rest_request_to_json(DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
1210+
1211+
assert 'spark driver not ready yet' in caplog.text.lower()
1212+
assert 'retries exhausted' in caplog.text.lower()
1213+
1214+
aggregator.assert_service_check(
1215+
SPARK_DRIVER_SERVICE_CHECK,
1216+
status=SparkCheck.CRITICAL,
1217+
tags=['url:{}'.format(DRIVER_CONFIG['spark_url'])],
1218+
)
1219+
1220+
1221+
@pytest.mark.unit
1222+
@pytest.mark.parametrize("retries_value", [0, -1, -5])
1223+
def test_driver_startup_message_disabled(aggregator, retries_value):
1224+
"""When startup_wait_retries<=0, treat startup messages as errors immediately."""
1225+
from simplejson import JSONDecodeError
1226+
1227+
config = DRIVER_CONFIG.copy()
1228+
config['startup_wait_retries'] = retries_value
1229+
check = SparkCheck('spark', {}, [config])
1230+
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
1231+
1232+
with mock.patch.object(check, '_rest_request', return_value=response):
1233+
with pytest.raises(JSONDecodeError):
1234+
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
1235+
1236+
aggregator.assert_service_check(
1237+
SPARK_DRIVER_SERVICE_CHECK,
1238+
status=SparkCheck.CRITICAL,
1239+
tags=['url:{}'.format(config['spark_url'])],
1240+
)
1241+
1242+
1243+
@pytest.mark.unit
1244+
def test_driver_startup_message_limited_retries(aggregator, caplog):
1245+
"""When startup_wait_retries>0, retry N times then raise."""
1246+
from simplejson import JSONDecodeError
1247+
1248+
config = DRIVER_CONFIG.copy()
1249+
config['startup_wait_retries'] = 3
1250+
check = SparkCheck('spark', {}, [config])
1251+
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
1252+
1253+
with caplog.at_level(logging.DEBUG):
1254+
with mock.patch.object(check, '_rest_request', return_value=response):
1255+
# First 3 attempts should return None
1256+
for i in range(3):
1257+
result = check._rest_request_to_json(
1258+
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
1259+
)
1260+
assert result is None, f"Attempt {i + 1} should return None"
1261+
1262+
# 4th attempt should raise
1263+
with pytest.raises(JSONDecodeError):
1264+
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
1265+
1266+
assert 'attempt 1/3' in caplog.text.lower()
1267+
assert 'attempt 3/3' in caplog.text.lower()
1268+
assert 'retries exhausted' in caplog.text.lower()
1269+
1270+
aggregator.assert_service_check(
1271+
SPARK_DRIVER_SERVICE_CHECK,
1272+
status=SparkCheck.CRITICAL,
1273+
tags=['url:{}'.format(config['spark_url'])],
1274+
)
1275+
1276+
1277+
@pytest.mark.unit
1278+
def test_driver_startup_retry_counter_resets_on_success(caplog):
1279+
"""Verify the retry counter resets after a successful JSON response."""
1280+
config = DRIVER_CONFIG.copy()
1281+
config['startup_wait_retries'] = 2
1282+
check = SparkCheck('spark', {}, [config])
1283+
startup_response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
1284+
success_response = MockResponse(json_data=[{"id": "app_001", "name": "TestApp"}])
1285+
1286+
with caplog.at_level(logging.DEBUG):
1287+
with mock.patch.object(check, '_rest_request', return_value=startup_response):
1288+
# Use 1 retry
1289+
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
1290+
assert result is None
1291+
assert check._startup_retry_count == 1
1292+
1293+
# Successful response resets counter
1294+
with mock.patch.object(check, '_rest_request', return_value=success_response):
1295+
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
1296+
assert result == [{"id": "app_001", "name": "TestApp"}]
1297+
assert check._startup_retry_count == 0
1298+
1299+
# After reset, we should have 2 retries available again
1300+
with mock.patch.object(check, '_rest_request', return_value=startup_response):
1301+
for _ in range(2):
1302+
result = check._rest_request_to_json(
1303+
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
1304+
)
1305+
assert result is None
1306+
1307+
11901308
@pytest.mark.unit
11911309
def test_ssl(dd_run_check):
11921310
run_ssl_server()

0 commit comments

Comments
 (0)