-
Notifications
You must be signed in to change notification settings - Fork 571
Support SparkIntegration activation after SparkContext created #3411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
74edf67
4c673d6
52559be
fd48489
2f0d7be
68aaed6
6af8cd4
f762f7b
33e22b5
da491d4
1828149
d536268
b4066ed
08d46ee
b2b0a92
2421099
7b99ba8
8fddd3c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,7 @@ | ||
| import sentry_sdk | ||
| from sentry_sdk.integrations import Integration | ||
| from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled | ||
| from sentry_sdk.scope import Scope | ||
|
|
||
| from sentry_sdk._types import TYPE_CHECKING | ||
|
|
||
|
|
@@ -17,7 +18,7 @@ class SparkIntegration(Integration): | |
| @staticmethod | ||
| def setup_once(): | ||
| # type: () -> None | ||
| patch_spark_context_init() | ||
| _setup_sentry_tracing() | ||
|
|
||
|
|
||
| def _set_app_properties(): | ||
|
|
@@ -49,7 +50,52 @@ def _start_sentry_listener(sc): | |
| sc._jsc.sc().addSparkListener(listener) | ||
|
|
||
|
|
||
| def patch_spark_context_init(): | ||
| def _add_event_processor(sc): | ||
| # type: (Any) -> None | ||
| scope = sentry_sdk.get_isolation_scope() | ||
|
|
||
| @scope.add_event_processor | ||
| def process_event(event, hint): | ||
| # type: (Event, Hint) -> Optional[Event] | ||
| with capture_internal_exceptions(): | ||
| if sentry_sdk.get_client().get_integration(SparkIntegration) is None: | ||
| return event | ||
|
|
||
| if sc._active_spark_context is None: | ||
| return event | ||
|
|
||
| event.setdefault("user", {}).setdefault("id", sc.sparkUser()) | ||
|
|
||
| event.setdefault("tags", {}).setdefault( | ||
| "executor.id", sc._conf.get("spark.executor.id") | ||
| ) | ||
| event["tags"].setdefault( | ||
| "spark-submit.deployMode", | ||
| sc._conf.get("spark.submit.deployMode"), | ||
| ) | ||
| event["tags"].setdefault("driver.host", sc._conf.get("spark.driver.host")) | ||
| event["tags"].setdefault("driver.port", sc._conf.get("spark.driver.port")) | ||
| event["tags"].setdefault("spark_version", sc.version) | ||
| event["tags"].setdefault("app_name", sc.appName) | ||
| event["tags"].setdefault("application_id", sc.applicationId) | ||
| event["tags"].setdefault("master", sc.master) | ||
| event["tags"].setdefault("spark_home", sc.sparkHome) | ||
|
|
||
| event.setdefault("extra", {}).setdefault("web_url", sc.uiWebUrl) | ||
|
|
||
| return event | ||
|
|
||
|
|
||
| def _activate_integration(sc): | ||
| # type: (SparkContext) -> None | ||
| from pyspark import SparkContext | ||
|
|
||
| _start_sentry_listener(sc) | ||
| _set_app_properties() | ||
| _add_event_processor(sc) | ||
|
|
||
|
|
||
| def _patch_spark_context_init(): | ||
| # type: () -> None | ||
| from pyspark import SparkContext | ||
|
|
||
|
|
@@ -59,51 +105,22 @@ def patch_spark_context_init(): | |
| def _sentry_patched_spark_context_init(self, *args, **kwargs): | ||
| # type: (SparkContext, *Any, **Any) -> Optional[Any] | ||
| rv = spark_context_init(self, *args, **kwargs) | ||
| _start_sentry_listener(self) | ||
| _set_app_properties() | ||
|
|
||
| scope = sentry_sdk.get_isolation_scope() | ||
|
|
||
| @scope.add_event_processor | ||
| def process_event(event, hint): | ||
| # type: (Event, Hint) -> Optional[Event] | ||
| with capture_internal_exceptions(): | ||
| if sentry_sdk.get_client().get_integration(SparkIntegration) is None: | ||
| return event | ||
|
|
||
| if self._active_spark_context is None: | ||
| return event | ||
|
|
||
| event.setdefault("user", {}).setdefault("id", self.sparkUser()) | ||
|
|
||
| event.setdefault("tags", {}).setdefault( | ||
| "executor.id", self._conf.get("spark.executor.id") | ||
| ) | ||
| event["tags"].setdefault( | ||
| "spark-submit.deployMode", | ||
| self._conf.get("spark.submit.deployMode"), | ||
| ) | ||
| event["tags"].setdefault( | ||
| "driver.host", self._conf.get("spark.driver.host") | ||
| ) | ||
| event["tags"].setdefault( | ||
| "driver.port", self._conf.get("spark.driver.port") | ||
| ) | ||
| event["tags"].setdefault("spark_version", self.version) | ||
| event["tags"].setdefault("app_name", self.appName) | ||
| event["tags"].setdefault("application_id", self.applicationId) | ||
| event["tags"].setdefault("master", self.master) | ||
| event["tags"].setdefault("spark_home", self.sparkHome) | ||
|
|
||
| event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl) | ||
|
|
||
| return event | ||
|
Comment on lines
-65
to
-100
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have separated this part into a distinct function. |
||
|
|
||
| _activate_integration(self) | ||
| return rv | ||
|
|
||
| SparkContext._do_init = _sentry_patched_spark_context_init | ||
|
|
||
|
|
||
| def _setup_sentry_tracing(): | ||
| # type: () -> None | ||
| from pyspark import SparkContext | ||
|
|
||
| if SparkContext._active_spark_context is not None: | ||
| _activate_integration(SparkContext._active_spark_context) | ||
| return | ||
| _patch_spark_context_init() | ||
|
Comment on lines
+117
to
+120
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When the Spark context already exists, |
||
|
|
||
|
|
||
| class SparkListener: | ||
| def onApplicationEnd(self, applicationEnd): # noqa: N802,N803 | ||
| # type: (Any) -> None | ||
|
|
@@ -208,10 +225,20 @@ class Java: | |
|
|
||
|
|
||
| class SentryListener(SparkListener): | ||
| def _add_breadcrumb( | ||
| self, | ||
| level, # type: str | ||
| message, # type: str | ||
| data=None, # type: Optional[dict[str, Any]] | ||
| ): | ||
| # type: (...) -> None | ||
| Scope.set_isolation_scope(Scope.get_global_scope()) | ||
|
||
| sentry_sdk.add_breadcrumb(level=level, message=message, data=data) | ||
antonpirker marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
|
|
||
| def onJobStart(self, jobStart): # noqa: N802,N803 | ||
| # type: (Any) -> None | ||
| message = "Job {} Started".format(jobStart.jobId()) | ||
| sentry_sdk.add_breadcrumb(level="info", message=message) | ||
| self._add_breadcrumb(level="info", message=message) | ||
| _set_app_properties() | ||
|
|
||
| def onJobEnd(self, jobEnd): # noqa: N802,N803 | ||
|
|
@@ -227,14 +254,14 @@ def onJobEnd(self, jobEnd): # noqa: N802,N803 | |
| level = "warning" | ||
| message = "Job {} Failed".format(jobEnd.jobId()) | ||
|
|
||
| sentry_sdk.add_breadcrumb(level=level, message=message, data=data) | ||
| self._add_breadcrumb(level=level, message=message, data=data) | ||
|
|
||
| def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803 | ||
| # type: (Any) -> None | ||
| stage_info = stageSubmitted.stageInfo() | ||
| message = "Stage {} Submitted".format(stage_info.stageId()) | ||
| data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()} | ||
| sentry_sdk.add_breadcrumb(level="info", message=message, data=data) | ||
| self._add_breadcrumb(level="info", message=message, data=data) | ||
| _set_app_properties() | ||
|
|
||
| def onStageCompleted(self, stageCompleted): # noqa: N802,N803 | ||
|
|
@@ -255,4 +282,4 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803 | |
| message = "Stage {} Completed".format(stage_info.stageId()) | ||
| level = "info" | ||
|
|
||
| sentry_sdk.add_breadcrumb(level=level, message=message, data=data) | ||
| self._add_breadcrumb(level=level, message=message, data=data) | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have bundled the functions that need to be executed using integration.