Skip to content
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 58 additions & 42 deletions sentry_sdk/integrations/spark/spark_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ class SparkIntegration(Integration):
@staticmethod
def setup_once():
# type: () -> None
patch_spark_context_init()
_setup_sentry_tracing()


def _set_app_properties():
Expand Down Expand Up @@ -49,7 +49,52 @@ def _start_sentry_listener(sc):
sc._jsc.sc().addSparkListener(listener)


def patch_spark_context_init():
def _add_event_processor(sc):
# type: (Any) -> None
scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if sc._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", sc.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", sc._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
sc._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host"))
event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port"))
event["tags"].setdefault("spark_version", sc.version)
event["tags"].setdefault("app_name", sc.appName)
event["tags"].setdefault("application_id", sc.applicationId)
event["tags"].setdefault("master", sc.master)
event["tags"].setdefault("spark_home", sc.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl)

return event


def _activate_integration(sc):
# type: (SparkContext) -> None
from pyspark import SparkContext

_start_sentry_listener(sc)
_set_app_properties()
_add_event_processor(sc)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have bundled the functions that need to be executed using integration.



def _patch_spark_context_init():
# type: () -> None
from pyspark import SparkContext

Expand All @@ -59,51 +104,22 @@ def patch_spark_context_init():
def _sentry_patched_spark_context_init(self, *args, **kwargs):
# type: (SparkContext, *Any, **Any) -> Optional[Any]
rv = spark_context_init(self, *args, **kwargs)
_start_sentry_listener(self)
_set_app_properties()

scope = sentry_sdk.get_isolation_scope()

@scope.add_event_processor
def process_event(event, hint):
# type: (Event, Hint) -> Optional[Event]
with capture_internal_exceptions():
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
return event

if self._active_spark_context is None:
return event

event.setdefault("user", {}).setdefault("id", self.sparkUser())

event.setdefault("tags", {}).setdefault(
"executor.id", self._conf.get("spark.executor.id")
)
event["tags"].setdefault(
"spark-submit.deployMode",
self._conf.get("spark.submit.deployMode"),
)
event["tags"].setdefault(
"driver.host", self._conf.get("spark.driver.host")
)
event["tags"].setdefault(
"driver.port", self._conf.get("spark.driver.port")
)
event["tags"].setdefault("spark_version", self.version)
event["tags"].setdefault("app_name", self.appName)
event["tags"].setdefault("application_id", self.applicationId)
event["tags"].setdefault("master", self.master)
event["tags"].setdefault("spark_home", self.sparkHome)

event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)

return event
Comment on lines -65 to -100
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have separated this part into a distinct function.


_activate_integration(self)
return rv

SparkContext._do_init = _sentry_patched_spark_context_init


def _setup_sentry_tracing():
# type: () -> None
from pyspark import SparkContext

if SparkContext._active_spark_context is not None:
_activate_integration(SparkContext._active_spark_context)
return
_patch_spark_context_init()
Comment on lines +117 to +120
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the Spark context already exists, _activate_integration is called instead of applying the patch.



class SparkListener:
def onApplicationEnd(self, applicationEnd): # noqa: N802,N803
# type: (Any) -> None
Expand Down
50 changes: 42 additions & 8 deletions tests/integrations/spark/test_spark.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import pytest
import sys
from unittest.mock import patch

from sentry_sdk.integrations.spark.spark_driver import (
_set_app_properties,
_start_sentry_listener,
Expand All @@ -18,8 +19,23 @@
################


def test_set_app_properties():
spark_context = SparkContext(appName="Testing123")
@pytest.fixture(scope="function")
def sentry_init_with_reset(sentry_init):
from sentry_sdk.integrations import _processed_integrations

yield lambda: sentry_init(integrations=[SparkIntegration()])
_processed_integrations.remove("spark")


@pytest.fixture(scope="function")
def create_spark_context():
yield lambda: SparkContext(appName="Testing123")
if SparkContext._active_spark_context:
SparkContext._active_spark_context.stop()


def test_set_app_properties(create_spark_context):
spark_context = create_spark_context()
_set_app_properties()

assert spark_context.getLocalProperty("sentry_app_name") == "Testing123"
Expand All @@ -30,9 +46,8 @@ def test_set_app_properties():
)


def test_start_sentry_listener():
spark_context = SparkContext.getOrCreate()

def test_start_sentry_listener(create_spark_context):
spark_context = create_spark_context()
gateway = spark_context._gateway
assert gateway._callback_server is None

Expand All @@ -41,9 +56,28 @@ def test_start_sentry_listener():
assert gateway._callback_server is not None


def test_initialize_spark_integration(sentry_init):
sentry_init(integrations=[SparkIntegration()])
SparkContext.getOrCreate()
@patch("sentry_sdk.integrations.spark.spark_driver._patch_spark_context_init")
def test_initialize_spark_integration_before_spark_context_init(
mock_patch_spark_context_init,
sentry_init_with_reset,
create_spark_context,
):
sentry_init_with_reset()
create_spark_context()

mock_patch_spark_context_init.assert_called_once()


@patch("sentry_sdk.integrations.spark.spark_driver._activate_integration")
def test_initialize_spark_integration_after_spark_context_init(
mock_activate_integration,
create_spark_context,
sentry_init_with_reset,
):
create_spark_context()
sentry_init_with_reset()

mock_activate_integration.assert_called_once()


@pytest.fixture
Expand Down