From 400387e5f1cbd012ea504f699836a44adf113408 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 13:43:04 +0100 Subject: [PATCH 1/8] fix(pyspark): Grab attemptId more defensively --- sentry_sdk/integrations/spark/spark_driver.py | 34 +++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/sentry_sdk/integrations/spark/spark_driver.py b/sentry_sdk/integrations/spark/spark_driver.py index a86f16344d..8d2f101438 100644 --- a/sentry_sdk/integrations/spark/spark_driver.py +++ b/sentry_sdk/integrations/spark/spark_driver.py @@ -260,7 +260,12 @@ def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803 # type: (Any) -> None stage_info = stageSubmitted.stageInfo() message = "Stage {} Submitted".format(stage_info.stageId()) - data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()} + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id + self._add_breadcrumb(level="info", message=message, data=data) _set_app_properties() @@ -271,7 +276,11 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 stage_info = stageCompleted.stageInfo() message = "" level = "" - data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()} + + data = {"name": stage_info.name()} + attempt_id = _get_attempt_id(stage_info) + if attempt_id is not None: + data["attemptId"] = attempt_id # Have to Try Except because stageInfo.failureReason() is typed with Scala Option try: @@ -283,3 +292,24 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 level = "info" self._add_breadcrumb(level=level, message=message, data=data) + + +def _get_attempt_id(stage_info): + from py4j.protocol import Py4JJavaError # type: ignore + + try: + return stage_info.attemptId() + except Py4JJavaError: + pass + + try: + return stage_info.attemptNumber() + except Py4JJavaError: + pass + + try: + return stage_info.currentAttemptId() + except Py4JJavaError: + pass + + return None From a2fc2ec32d6a4afd82092c55cae95887d326864b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 13:45:36 +0100 Subject: [PATCH 2/8] type hint --- sentry_sdk/integrations/spark/spark_driver.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sentry_sdk/integrations/spark/spark_driver.py b/sentry_sdk/integrations/spark/spark_driver.py index 8d2f101438..5431e8eff1 100644 --- a/sentry_sdk/integrations/spark/spark_driver.py +++ b/sentry_sdk/integrations/spark/spark_driver.py @@ -295,6 +295,7 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 def _get_attempt_id(stage_info): + # type: (Any) -> Optional[int] from py4j.protocol import Py4JJavaError # type: ignore try: From c217f3131d2a5ab72e5ef01d32777d7e77285ec7 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:02:18 +0100 Subject: [PATCH 3/8] . --- sentry_sdk/integrations/spark/spark_driver.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/sentry_sdk/integrations/spark/spark_driver.py b/sentry_sdk/integrations/spark/spark_driver.py index 5431e8eff1..5348db9d6a 100644 --- a/sentry_sdk/integrations/spark/spark_driver.py +++ b/sentry_sdk/integrations/spark/spark_driver.py @@ -296,7 +296,7 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 def _get_attempt_id(stage_info): # type: (Any) -> Optional[int] - from py4j.protocol import Py4JJavaError # type: ignore + from py4j.protocol import Py4JJavaError try: return stage_info.attemptId() @@ -308,9 +308,4 @@ def _get_attempt_id(stage_info): except Py4JJavaError: pass - try: - return stage_info.currentAttemptId() - except Py4JJavaError: - pass - return None From caaf036b762df3948c6cdd793d01a3f58f62faac Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:07:47 +0100 Subject: [PATCH 4/8] tests --- tests/integrations/spark/test_spark.py | 59 ++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index 44ba9f8728..fed1205941 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -282,3 +282,62 @@ def mock_main(): "partitionId": "2", "taskAttemptId": "3", } + + +def test_sentry_listener_on_stage_submitted_no_attempt_id(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + def attemptName(self): # noqa: N802 + return 14 + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert mock_hub.kwargs["data"]["attemptId"] == 14 + assert mock_hub.kwargs["data"]["name"] == "run-job" + + +def test_sentry_listener_on_stage_submitted_no_attempt_id_or_number(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert "attemptId" not in mock_hub.kwargs["data"] + assert mock_hub.kwargs["data"]["name"] == "run-job" From 9950c0329560e2f302f2b4873cd8e4a5228f568e Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:08:50 +0100 Subject: [PATCH 5/8] name --- tests/integrations/spark/test_spark.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index fed1205941..303043085c 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -295,7 +295,7 @@ def stageId(self): # noqa: N802 def name(self): return "run-job" - def attemptName(self): # noqa: N802 + def attemptNumber(self): # noqa: N802 return 14 class MockStageSubmitted: From d84144c743472cd0efbc4860ca83a922e634505b Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:14:24 +0100 Subject: [PATCH 6/8] respect error type --- tests/integrations/spark/test_spark.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index 303043085c..d24b5c772a 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -295,6 +295,9 @@ def stageId(self): # noqa: N802 def name(self): return "run-job" + def attemptId(self): + raise Py4JJavaError + def attemptNumber(self): # noqa: N802 return 14 @@ -326,6 +329,12 @@ def stageId(self): # noqa: N802 def name(self): return "run-job" + def attemptId(self): + raise Py4JJavaError + + def attemptNumber(self): + raise Py4JJavaError + class MockStageSubmitted: def stageInfo(self): # noqa: N802 stageinf = StageInfo() From 72c771231b2bf86a18c109eac18a71be446f7716 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:18:34 +0100 Subject: [PATCH 7/8] noqas --- tests/integrations/spark/test_spark.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index d24b5c772a..254a5a846f 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -295,7 +295,7 @@ def stageId(self): # noqa: N802 def name(self): return "run-job" - def attemptId(self): + def attemptId(self): # noqa: N802 raise Py4JJavaError def attemptNumber(self): # noqa: N802 @@ -329,10 +329,10 @@ def stageId(self): # noqa: N802 def name(self): return "run-job" - def attemptId(self): + def attemptId(self): # noqa: N802 raise Py4JJavaError - def attemptNumber(self): + def attemptNumber(self): # noqa: N802 raise Py4JJavaError class MockStageSubmitted: From 7e0e2e2395b01637ed9b9a712d305b71f6c2eb52 Mon Sep 17 00:00:00 2001 From: Ivana Kellyer Date: Wed, 12 Mar 2025 14:33:49 +0100 Subject: [PATCH 8/8] . --- sentry_sdk/integrations/spark/spark_driver.py | 6 +- tests/integrations/spark/test_spark.py | 128 ++++++++---------- 2 files changed, 62 insertions(+), 72 deletions(-) diff --git a/sentry_sdk/integrations/spark/spark_driver.py b/sentry_sdk/integrations/spark/spark_driver.py index 5348db9d6a..701ba12d89 100644 --- a/sentry_sdk/integrations/spark/spark_driver.py +++ b/sentry_sdk/integrations/spark/spark_driver.py @@ -296,16 +296,14 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 def _get_attempt_id(stage_info): # type: (Any) -> Optional[int] - from py4j.protocol import Py4JJavaError - try: return stage_info.attemptId() - except Py4JJavaError: + except Exception: pass try: return stage_info.attemptNumber() - except Py4JJavaError: + except Exception: pass return None diff --git a/tests/integrations/spark/test_spark.py b/tests/integrations/spark/test_spark.py index 254a5a846f..7eeab15dc4 100644 --- a/tests/integrations/spark/test_spark.py +++ b/tests/integrations/spark/test_spark.py @@ -14,6 +14,7 @@ from py4j.protocol import Py4JJavaError + ################ # DRIVER TESTS # ################ @@ -166,6 +167,65 @@ def stageInfo(self): # noqa: N802 assert mock_hub.kwargs["data"]["name"] == "run-job" +def test_sentry_listener_on_stage_submitted_no_attempt_id(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + def attemptNumber(self): # noqa: N802 + return 14 + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert mock_hub.kwargs["data"]["attemptId"] == 14 + assert mock_hub.kwargs["data"]["name"] == "run-job" + + +def test_sentry_listener_on_stage_submitted_no_attempt_id_or_number(sentry_listener): + listener = sentry_listener + with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: + + class StageInfo: + def stageId(self): # noqa: N802 + return "sample-stage-id-submit" + + def name(self): + return "run-job" + + class MockStageSubmitted: + def stageInfo(self): # noqa: N802 + stageinf = StageInfo() + return stageinf + + mock_stage_submitted = MockStageSubmitted() + listener.onStageSubmitted(mock_stage_submitted) + + mock_add_breadcrumb.assert_called_once() + mock_hub = mock_add_breadcrumb.call_args + + assert mock_hub.kwargs["level"] == "info" + assert "sample-stage-id-submit" in mock_hub.kwargs["message"] + assert "attemptId" not in mock_hub.kwargs["data"] + assert mock_hub.kwargs["data"]["name"] == "run-job" + + @pytest.fixture def get_mock_stage_completed(): def _inner(failure_reason): @@ -282,71 +342,3 @@ def mock_main(): "partitionId": "2", "taskAttemptId": "3", } - - -def test_sentry_listener_on_stage_submitted_no_attempt_id(sentry_listener): - listener = sentry_listener - with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: - - class StageInfo: - def stageId(self): # noqa: N802 - return "sample-stage-id-submit" - - def name(self): - return "run-job" - - def attemptId(self): # noqa: N802 - raise Py4JJavaError - - def attemptNumber(self): # noqa: N802 - return 14 - - class MockStageSubmitted: - def stageInfo(self): # noqa: N802 - stageinf = StageInfo() - return stageinf - - mock_stage_submitted = MockStageSubmitted() - listener.onStageSubmitted(mock_stage_submitted) - - mock_add_breadcrumb.assert_called_once() - mock_hub = mock_add_breadcrumb.call_args - - assert mock_hub.kwargs["level"] == "info" - assert "sample-stage-id-submit" in mock_hub.kwargs["message"] - assert mock_hub.kwargs["data"]["attemptId"] == 14 - assert mock_hub.kwargs["data"]["name"] == "run-job" - - -def test_sentry_listener_on_stage_submitted_no_attempt_id_or_number(sentry_listener): - listener = sentry_listener - with patch.object(listener, "_add_breadcrumb") as mock_add_breadcrumb: - - class StageInfo: - def stageId(self): # noqa: N802 - return "sample-stage-id-submit" - - def name(self): - return "run-job" - - def attemptId(self): # noqa: N802 - raise Py4JJavaError - - def attemptNumber(self): # noqa: N802 - raise Py4JJavaError - - class MockStageSubmitted: - def stageInfo(self): # noqa: N802 - stageinf = StageInfo() - return stageinf - - mock_stage_submitted = MockStageSubmitted() - listener.onStageSubmitted(mock_stage_submitted) - - mock_add_breadcrumb.assert_called_once() - mock_hub = mock_add_breadcrumb.call_args - - assert mock_hub.kwargs["level"] == "info" - assert "sample-stage-id-submit" in mock_hub.kwargs["message"] - assert "attemptId" not in mock_hub.kwargs["data"] - assert mock_hub.kwargs["data"]["name"] == "run-job"