Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions spark/assets/configuration/spec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,18 @@ files:
type: boolean
display_default: false
example: true
- name: startup_wait_retries
description: |
Controls how to handle "Spark is starting up" responses from the driver.
When the Spark driver is not yet ready, it returns a non-JSON startup message
instead of the expected JSON response.

Set to 0 to disable this feature and treat startup messages as errors immediately.
Set to a positive integer to retry that many times before marking the check as failed.
value:
type: integer
display_default: 3
example: 3
- template: instances/http
overrides:
auth_token.description: |
Expand Down
1 change: 1 addition & 0 deletions spark/changelog.d/22252.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `startup_wait_retries` option to handle Spark driver startup messages gracefully.
4 changes: 4 additions & 0 deletions spark/datadog_checks/spark/config_models/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ def instance_spark_proxy_enabled():
return False


def instance_startup_wait_retries():
return 3


def instance_streaming_metrics():
return True

Expand Down
1 change: 1 addition & 0 deletions spark/datadog_checks/spark/config_models/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ class InstanceConfig(BaseModel):
spark_proxy_enabled: Optional[bool] = None
spark_ui_ports: Optional[tuple[int, ...]] = None
spark_url: str
startup_wait_retries: Optional[int] = None
streaming_metrics: Optional[bool] = None
tags: Optional[tuple[str, ...]] = None
timeout: Optional[float] = None
Expand Down
10 changes: 10 additions & 0 deletions spark/datadog_checks/spark/data/conf.yaml.example
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ instances:
#
# disable_spark_stage_metrics: true

## @param startup_wait_retries - integer - optional - default: 3
## Controls how to handle "Spark is starting up" responses from the driver.
## When the Spark driver is not yet ready, it returns a non-JSON startup message
## instead of the expected JSON response.
##
## Set to 0 to disable this feature and treat startup messages as errors immediately.
## Set to a positive integer to retry that many times before marking the check as failed.
#
# startup_wait_retries: 3

## @param proxy - mapping - optional
## This overrides the `proxy` setting in `init_config`.
##
Expand Down
29 changes: 29 additions & 0 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ def __init__(self, name, init_config, instances):
self._connection_error_seen = False
self._debounced_this_run = False

# Startup retry configuration:
# -1: disable (treat startup messages as JSON parse errors immediately)
# >0: retry N times before marking as broken
self._startup_wait_retries = int(self.instance.get('startup_wait_retries', 3))
self._startup_retry_count = 0

def check(self, _):
self._debounced_this_run = False

Expand Down Expand Up @@ -701,6 +707,27 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
response_json = response.json()

except JSONDecodeError as e:
response_text = response.text.strip()
if response_text and 'spark is starting up' in response_text.lower():
# Handle startup message based on retry configuration
if self._startup_wait_retries > 0:
self._startup_retry_count += 1
if self._startup_retry_count <= self._startup_wait_retries:
self.log.debug(
"Spark driver not ready yet at %s (attempt %d/%d): %s",
self._get_url_base(address),
self._startup_retry_count,
self._startup_wait_retries,
response_text,
)
return None
else:
self.log.warning(
"Spark driver startup retries exhausted (%d/%d)",
self._startup_retry_count,
self._startup_wait_retries,
)

self.service_check(
service_name,
AgentCheck.CRITICAL,
Expand All @@ -709,6 +736,8 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
)
raise

# Reset startup retry counter on successful JSON parse
self._startup_retry_count = 0
return response_json

def _should_suppress_connection_error(self, exception, tags):
Expand Down
118 changes: 118 additions & 0 deletions spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,124 @@ def test_do_not_crash_on_version_collection_failure():
assert not c._collect_version(running_apps, [])


@pytest.mark.unit
def test_driver_startup_message_default_retries(aggregator, caplog):
"""Default behavior (startup_wait_retries=3): retry 3 times then raise."""
from simplejson import JSONDecodeError

check = SparkCheck('spark', {}, [DRIVER_CONFIG])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=response):
# First 3 attempts should return None (default is 3 retries)
for i in range(3):
result = check._rest_request_to_json(
DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None, f"Attempt {i + 1} should return None"

# 4th attempt should raise
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

assert 'spark driver not ready yet' in caplog.text.lower()
assert 'retries exhausted' in caplog.text.lower()

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(DRIVER_CONFIG['spark_url'])],
)


@pytest.mark.unit
@pytest.mark.parametrize("retries_value", [0, -1, -5])
def test_driver_startup_message_disabled(aggregator, retries_value):
"""When startup_wait_retries<=0, treat startup messages as errors immediately."""
from simplejson import JSONDecodeError

config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = retries_value
check = SparkCheck('spark', {}, [config])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with mock.patch.object(check, '_rest_request', return_value=response):
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(config['spark_url'])],
)


@pytest.mark.unit
def test_driver_startup_message_limited_retries(aggregator, caplog):
"""When startup_wait_retries>0, retry N times then raise."""
from simplejson import JSONDecodeError

config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = 3
check = SparkCheck('spark', {}, [config])
response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=response):
# First 3 attempts should return None
for i in range(3):
result = check._rest_request_to_json(
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None, f"Attempt {i + 1} should return None"

# 4th attempt should raise
with pytest.raises(JSONDecodeError):
check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])

assert 'attempt 1/3' in caplog.text.lower()
assert 'attempt 3/3' in caplog.text.lower()
assert 'retries exhausted' in caplog.text.lower()

aggregator.assert_service_check(
SPARK_DRIVER_SERVICE_CHECK,
status=SparkCheck.CRITICAL,
tags=['url:{}'.format(config['spark_url'])],
)


@pytest.mark.unit
def test_driver_startup_retry_counter_resets_on_success(caplog):
"""Verify the retry counter resets after a successful JSON response."""
config = DRIVER_CONFIG.copy()
config['startup_wait_retries'] = 2
check = SparkCheck('spark', {}, [config])
startup_response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.")
success_response = MockResponse(json_data=[{"id": "app_001", "name": "TestApp"}])

with caplog.at_level(logging.DEBUG):
with mock.patch.object(check, '_rest_request', return_value=startup_response):
# Use 1 retry
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
assert result is None
assert check._startup_retry_count == 1

# Successful response resets counter
with mock.patch.object(check, '_rest_request', return_value=success_response):
result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [])
assert result == [{"id": "app_001", "name": "TestApp"}]
assert check._startup_retry_count == 0

# After reset, we should have 2 retries available again
with mock.patch.object(check, '_rest_request', return_value=startup_response):
for _ in range(2):
result = check._rest_request_to_json(
config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []
)
assert result is None


@pytest.mark.unit
def test_ssl(dd_run_check):
run_ssl_server()
Expand Down
Loading