diff --git a/sentry_sdk/integrations/spark/spark_driver.py b/sentry_sdk/integrations/spark/spark_driver.py index a86f16344d..701ba12d89 100644 --- a/sentry_sdk/integrations/spark/spark_driver.py +++ b/sentry_sdk/integrations/spark/spark_driver.py @@ -260,7 +260,12 @@ def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803 # type: (Any) -> None stage_info = stageSubmitted.stageInfo() message = "Stage {} Submitted".format(stage_info.stageId()) - data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()} + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id + self._add_breadcrumb(level="info", message=message, data=data) _set_app_properties() @@ -271,7 +276,11 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 stage_info = stageCompleted.stageInfo() message = "" level = "" - data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()} + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id # Have to Try Except because stageInfo.failureReason() is typed with Scala Option try: @@ -283,3 +292,18 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 level = "info" self._add_breadcrumb(level=level, message=message, data=data) + + +def _get_attempt_id(stage_info): + # type: (Any) -> Optional[int] + try: + return stage_info.attemptId() + except Exception: + pass + + try: + return stage_info.attemptNumber() + except Exception: + pass + + return None diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index 44ba9f8728..7eeab15dc4 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -14,6 +14,7 @@ from py4j.protocol import Py4JJavaError + ################ # DRIVER TESTS # ################ @@ -166,6 +167,65 @@ def stageInfo(self): # noqa: N802 assert mock_hub.kwargs["data"]["name"] == "run-job" +def test_sentry_listener_on_stage_submitted_no_attempt_id(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + def attemptNumber(self): # noqa: N802 + return 14 + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert mock_hub.kwargs["data"]["attemptId"] == 14 + assert mock_hub.kwargs["data"]["name"] == "run-job" + + +def test_sentry_listener_on_stage_submitted_no_attempt_id_or_number(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert "attemptId" not in mock_hub.kwargs["data"] + assert mock_hub.kwargs["data"]["name"] == "run-job" + + @pytest.fixture def get_mock_stage_completed(): def _inner(failure_reason):