diff --git a/spark/assets/configuration/spec.yaml b/spark/assets/configuration/spec.yaml index fa4cc8dbeb6ad..62f342f40af4b 100644 --- a/spark/assets/configuration/spec.yaml +++ b/spark/assets/configuration/spec.yaml @@ -135,6 +135,18 @@ files: type: boolean display_default: false example: true + - name: startup_wait_retries + description: | + Controls how to handle "Spark is starting up" responses from the driver. + When the Spark driver is not yet ready, it returns a non-JSON startup message + instead of the expected JSON response. + + Set to 0 to disable this feature and treat startup messages as errors immediately. + Set to a positive integer to retry that many times before marking the check as failed. + value: + type: integer + display_default: 3 + example: 3 - template: instances/http overrides: auth_token.description: | diff --git a/spark/changelog.d/22252.added b/spark/changelog.d/22252.added new file mode 100644 index 0000000000000..d889b4159100f --- /dev/null +++ b/spark/changelog.d/22252.added @@ -0,0 +1 @@ +Add `startup_wait_retries` option to handle Spark driver startup messages gracefully. \ No newline at end of file diff --git a/spark/datadog_checks/spark/config_models/defaults.py b/spark/datadog_checks/spark/config_models/defaults.py index 78e9f3d6de6b7..1e36a634fa5a5 100644 --- a/spark/datadog_checks/spark/config_models/defaults.py +++ b/spark/datadog_checks/spark/config_models/defaults.py @@ -100,6 +100,10 @@ def instance_spark_proxy_enabled(): return False +def instance_startup_wait_retries(): + return 3 + + def instance_streaming_metrics(): return True diff --git a/spark/datadog_checks/spark/config_models/instance.py b/spark/datadog_checks/spark/config_models/instance.py index 3b1225296e3be..0cddfe1a0e9c0 100644 --- a/spark/datadog_checks/spark/config_models/instance.py +++ b/spark/datadog_checks/spark/config_models/instance.py @@ -96,6 +96,7 @@ class InstanceConfig(BaseModel): spark_proxy_enabled: Optional[bool] = None spark_ui_ports: Optional[tuple[int, ...]] = None spark_url: str + startup_wait_retries: Optional[int] = None streaming_metrics: Optional[bool] = None tags: Optional[tuple[str, ...]] = None timeout: Optional[float] = None diff --git a/spark/datadog_checks/spark/data/conf.yaml.example b/spark/datadog_checks/spark/data/conf.yaml.example index 006e51c95fec1..450ecd8bf60f4 100644 --- a/spark/datadog_checks/spark/data/conf.yaml.example +++ b/spark/datadog_checks/spark/data/conf.yaml.example @@ -155,6 +155,16 @@ instances: # # disable_spark_stage_metrics: true + ## @param startup_wait_retries - integer - optional - default: 3 + ## Controls how to handle "Spark is starting up" responses from the driver. + ## When the Spark driver is not yet ready, it returns a non-JSON startup message + ## instead of the expected JSON response. + ## + ## Set to 0 to disable this feature and treat startup messages as errors immediately. + ## Set to a positive integer to retry that many times before marking the check as failed. + # + # startup_wait_retries: 3 + ## @param proxy - mapping - optional ## This overrides the `proxy` setting in `init_config`. ## diff --git a/spark/datadog_checks/spark/spark.py b/spark/datadog_checks/spark/spark.py index d1cdf096b986b..ef66975b4fdc9 100644 --- a/spark/datadog_checks/spark/spark.py +++ b/spark/datadog_checks/spark/spark.py @@ -86,6 +86,12 @@ def __init__(self, name, init_config, instances): self._connection_error_seen = False self._debounced_this_run = False + # Startup retry configuration: + # -1: disable (treat startup messages as JSON parse errors immediately) + # >0: retry N times before marking as broken + self._startup_wait_retries = int(self.instance.get('startup_wait_retries', 3)) + self._startup_retry_count = 0 + def check(self, _): self._debounced_this_run = False @@ -701,6 +707,27 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args, response_json = response.json() except JSONDecodeError as e: + response_text = response.text.strip() + if response_text and 'spark is starting up' in response_text.lower(): + # Handle startup message based on retry configuration + if self._startup_wait_retries > 0: + self._startup_retry_count += 1 + if self._startup_retry_count <= self._startup_wait_retries: + self.log.debug( + "Spark driver not ready yet at %s (attempt %d/%d): %s", + self._get_url_base(address), + self._startup_retry_count, + self._startup_wait_retries, + response_text, + ) + return None + else: + self.log.warning( + "Spark driver startup retries exhausted (%d/%d)", + self._startup_retry_count, + self._startup_wait_retries, + ) + self.service_check( service_name, AgentCheck.CRITICAL, @@ -709,6 +736,8 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args, ) raise + # Reset startup retry counter on successful JSON parse + self._startup_retry_count = 0 return response_json def _should_suppress_connection_error(self, exception, tags): diff --git a/spark/tests/test_spark.py b/spark/tests/test_spark.py index 36a700770bf0c..43bae42035523 100644 --- a/spark/tests/test_spark.py +++ b/spark/tests/test_spark.py @@ -1187,6 +1187,124 @@ def test_do_not_crash_on_version_collection_failure(): assert not c._collect_version(running_apps, []) +@pytest.mark.unit +def test_driver_startup_message_default_retries(aggregator, caplog): + """Default behavior (startup_wait_retries=3): retry 3 times then raise.""" + from simplejson import JSONDecodeError + + check = SparkCheck('spark', {}, [DRIVER_CONFIG]) + response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.") + + with caplog.at_level(logging.DEBUG): + with mock.patch.object(check, '_rest_request', return_value=response): + # First 3 attempts should return None (default is 3 retries) + for i in range(3): + result = check._rest_request_to_json( + DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [] + ) + assert result is None, f"Attempt {i + 1} should return None" + + # 4th attempt should raise + with pytest.raises(JSONDecodeError): + check._rest_request_to_json(DRIVER_CONFIG['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []) + + assert 'spark driver not ready yet' in caplog.text.lower() + assert 'retries exhausted' in caplog.text.lower() + + aggregator.assert_service_check( + SPARK_DRIVER_SERVICE_CHECK, + status=SparkCheck.CRITICAL, + tags=['url:{}'.format(DRIVER_CONFIG['spark_url'])], + ) + + +@pytest.mark.unit +@pytest.mark.parametrize("retries_value", [0, -1, -5]) +def test_driver_startup_message_disabled(aggregator, retries_value): + """When startup_wait_retries<=0, treat startup messages as errors immediately.""" + from simplejson import JSONDecodeError + + config = DRIVER_CONFIG.copy() + config['startup_wait_retries'] = retries_value + check = SparkCheck('spark', {}, [config]) + response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.") + + with mock.patch.object(check, '_rest_request', return_value=response): + with pytest.raises(JSONDecodeError): + check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []) + + aggregator.assert_service_check( + SPARK_DRIVER_SERVICE_CHECK, + status=SparkCheck.CRITICAL, + tags=['url:{}'.format(config['spark_url'])], + ) + + +@pytest.mark.unit +def test_driver_startup_message_limited_retries(aggregator, caplog): + """When startup_wait_retries>0, retry N times then raise.""" + from simplejson import JSONDecodeError + + config = DRIVER_CONFIG.copy() + config['startup_wait_retries'] = 3 + check = SparkCheck('spark', {}, [config]) + response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.") + + with caplog.at_level(logging.DEBUG): + with mock.patch.object(check, '_rest_request', return_value=response): + # First 3 attempts should return None + for i in range(3): + result = check._rest_request_to_json( + config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [] + ) + assert result is None, f"Attempt {i + 1} should return None" + + # 4th attempt should raise + with pytest.raises(JSONDecodeError): + check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []) + + assert 'attempt 1/3' in caplog.text.lower() + assert 'attempt 3/3' in caplog.text.lower() + assert 'retries exhausted' in caplog.text.lower() + + aggregator.assert_service_check( + SPARK_DRIVER_SERVICE_CHECK, + status=SparkCheck.CRITICAL, + tags=['url:{}'.format(config['spark_url'])], + ) + + +@pytest.mark.unit +def test_driver_startup_retry_counter_resets_on_success(caplog): + """Verify the retry counter resets after a successful JSON response.""" + config = DRIVER_CONFIG.copy() + config['startup_wait_retries'] = 2 + check = SparkCheck('spark', {}, [config]) + startup_response = MockResponse(content="Spark is starting up. Please wait a while until it's ready.") + success_response = MockResponse(json_data=[{"id": "app_001", "name": "TestApp"}]) + + with caplog.at_level(logging.DEBUG): + with mock.patch.object(check, '_rest_request', return_value=startup_response): + # Use 1 retry + result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []) + assert result is None + assert check._startup_retry_count == 1 + + # Successful response resets counter + with mock.patch.object(check, '_rest_request', return_value=success_response): + result = check._rest_request_to_json(config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, []) + assert result == [{"id": "app_001", "name": "TestApp"}] + assert check._startup_retry_count == 0 + + # After reset, we should have 2 retries available again + with mock.patch.object(check, '_rest_request', return_value=startup_response): + for _ in range(2): + result = check._rest_request_to_json( + config['spark_url'], SPARK_REST_PATH, SPARK_DRIVER_SERVICE_CHECK, [] + ) + assert result is None + + @pytest.mark.unit def test_ssl(dd_run_check): run_ssl_server()