Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spark/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ files:
type: boolean
display_default: false
example: true
- name: startup_wait_retries
description: |
Controls how to handle "Spark is starting up" responses from the driver.
When the Spark driver is not yet ready, it returns a non-JSON startup message
instead of the expected JSON response.

Set to 0 to disable this feature and treat startup messages as errors immediately.
Set to a positive integer to retry that many times before marking the check as failed.
value:
type: integer
display_default: 3
example: 3
- template: instances/http
overrides:
auth_token.description: |
Expand Down
1 change: 1 addition & 0 deletions spark/changelog.d/22252.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `startup_wait_retries` option to handle Spark driver startup messages gracefully.
4 changes: 4 additions & 0 deletions spark/datadog_checks/spark/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def instance_spark_proxy_enabled():
return False


def instance_startup_wait_retries():
return 3


def instance_streaming_metrics():
return True

Expand Down
1 change: 1 addition & 0 deletions spark/datadog_checks/spark/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class InstanceConfig(BaseModel):
spark_proxy_enabled: Optional[bool] = None
spark_ui_ports: Optional[tuple[int, ...]] = None
spark_url: str
startup_wait_retries: Optional[int] = None
streaming_metrics: Optional[bool] = None
tags: Optional[tuple[str, ...]] = None
timeout: Optional[float] = None
Expand Down
10 changes: 10 additions & 0 deletions spark/datadog_checks/spark/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ instances:
#
# disable_spark_stage_metrics: true

## @param startup_wait_retries - integer - optional - default: 3
## Controls how to handle "Spark is starting up" responses from the driver.
## When the Spark driver is not yet ready, it returns a non-JSON startup message
## instead of the expected JSON response.
##
## Set to 0 to disable this feature and treat startup messages as errors immediately.
## Set to a positive integer to retry that many times before marking the check as failed.
#
# startup_wait_retries: 3

## @param proxy - mapping - optional
## This overrides the `proxy` setting in `init_config`.
##
Expand Down
29 changes: 29 additions & 0 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ def __init__(self, name, init_config, instances):
self._connection_error_seen = False
self._debounced_this_run = False

# Startup retry configuration:
# -1: disable (treat startup messages as JSON parse errors immediately)
# >0: retry N times before marking as broken
self._startup_wait_retries = int(self.instance.get('startup_wait_retries', 3))
self._startup_retry_count = 0

def check(self, _):
self._debounced_this_run = False

Expand Down Expand Up @@ -701,6 +707,27 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
response_json = response.json()

except JSONDecodeError as e:
response_text = response.text.strip()
if response_text and 'spark is starting up' in response_text.lower():
# Handle startup message based on retry configuration
if self._startup_wait_retries > 0:
self._startup_retry_count += 1
if self._startup_retry_count <= self._startup_wait_retries:
self.log.debug(
"Spark driver not ready yet at %s (attempt %d/%d): %s",
self._get_url_base(address),
self._startup_retry_count,
self._startup_wait_retries,
response_text,
)
return None
else:
self.log.warning(
"Spark driver startup retries exhausted (%d/%d)",
self._startup_retry_count,
self._startup_wait_retries,
)

self.service_check(
service_name,
AgentCheck.CRITICAL,
Expand All @@ -709,6 +736,8 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
)
raise

# Reset startup retry counter on successful JSON parse
self._startup_retry_count = 0
Comment on lines +739 to +740

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Avoid global retry reset across different Spark drivers

_startup_retry_count is a single counter for all requests and gets reset on any successful JSON parse. In driver or YARN modes, the check makes multiple _rest_request_to_json calls to different tracking URLs in the same run; if one app responds with valid JSON, the counter resets even if another app keeps returning the “Spark is starting up” message. That means the retry limit can be perpetually skipped for a stuck driver, so its startup error is never surfaced as CRITICAL and metrics stay silently missing. Consider tracking retries per address/driver or only resetting the counter for the same URL that succeeded.

Useful? React with 👍 / 👎.

return response_json

def _should_suppress_connection_error(self, exception, tags):
Expand Down
118 changes: 118 additions & 0 deletions spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,124 @@ def test_do_not_crash_on_version_collection_failure():
assert not c._collect_version(running_apps, [])


@pytest.mark.unit
def test_driver_startup_message_default_retries(aggregator, caplog):
"""Default behavior (startup_wait_retries=3): retry 3 times then raise."""
from simplejson import JSONDecodeError

check = SparkCheck('spark', {}, [DRIVER_CONFIG])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=response):
# First 3 attempts should return None (default is 3 retries)
for i in range(3):
result = check._rest_request_to_json(
DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None, f"Attempt {i + 1} should return None"

# 4th attempt should raise
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

assert 'spark driver not ready yet' in caplog.text.lower()
assert 'retries exhausted' in caplog.text.lower()

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(DRIVER_CONFIG['spark_url'])],
)


@pytest.mark.unit
@pytest.mark.parametrize("retries_value", [0, -1, -5])
def test_driver_startup_message_disabled(aggregator, retries_value):
"""When startup_wait_retries<=0, treat startup messages as errors immediately."""
from simplejson import JSONDecodeError

config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = retries_value
check = SparkCheck('spark', {}, [config])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with mock.patch.object(check, '_rest_request', return_value=response):
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(config['spark_url'])],
)


@pytest.mark.unit
def test_driver_startup_message_limited_retries(aggregator, caplog):
"""When startup_wait_retries>0, retry N times then raise."""
from simplejson import JSONDecodeError

config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = 3
check = SparkCheck('spark', {}, [config])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=response):
# First 3 attempts should return None
for i in range(3):
result = check._rest_request_to_json(
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None, f"Attempt {i + 1} should return None"

# 4th attempt should raise
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

assert 'attempt 1/3' in caplog.text.lower()
assert 'attempt 3/3' in caplog.text.lower()
assert 'retries exhausted' in caplog.text.lower()

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(config['spark_url'])],
)


@pytest.mark.unit
def test_driver_startup_retry_counter_resets_on_success(caplog):
"""Verify the retry counter resets after a successful JSON response."""
config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = 2
check = SparkCheck('spark', {}, [config])
startup_response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
success_response = MockResponse(json_data=[{"id": "app_001", "name": "TestApp"}])

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=startup_response):
# Use 1 retry
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
assert result is None
assert check._startup_retry_count == 1

# Successful response resets counter
with mock.patch.object(check, '_rest_request', return_value=success_response):
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
assert result == [{"id": "app_001", "name": "TestApp"}]
assert check._startup_retry_count == 0

# After reset, we should have 2 retries available again
with mock.patch.object(check, '_rest_request', return_value=startup_response):
for _ in range(2):
result = check._rest_request_to_json(
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None


@pytest.mark.unit
def test_ssl(dd_run_check):
run_ssl_server()
Expand Down
Loading