Skip to content

Commit 4e5251c

Browse files
authored
ref(spark): Use new scopes API (#2888)
1 parent 22611a1 commit 4e5251c

File tree

2 files changed

+83
-96
lines changed

2 files changed

+83
-96
lines changed

sentry_sdk/integrations/spark/spark_driver.py

Lines changed: 43 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from sentry_sdk import configure_scope
2-
from sentry_sdk.hub import Hub
1+
import sentry_sdk
32
from sentry_sdk.integrations import Integration
4-
from sentry_sdk.utils import capture_internal_exceptions
3+
from sentry_sdk.scope import Scope
4+
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
55

66
from sentry_sdk._types import TYPE_CHECKING
77

@@ -56,51 +56,47 @@ def patch_spark_context_init():
5656

5757
spark_context_init = SparkContext._do_init
5858

59+
@ensure_integration_enabled(SparkIntegration, spark_context_init)
5960
def _sentry_patched_spark_context_init(self, *args, **kwargs):
6061
# type: (SparkContext, *Any, **Any) -> Optional[Any]
61-
init = spark_context_init(self, *args, **kwargs)
62-
63-
if Hub.current.get_integration(SparkIntegration) is None:
64-
return init
65-
6662
_start_sentry_listener(self)
6763
_set_app_properties()
6864

69-
with configure_scope() as scope:
70-
71-
@scope.add_event_processor
72-
def process_event(event, hint):
73-
# type: (Event, Hint) -> Optional[Event]
74-
with capture_internal_exceptions():
75-
if Hub.current.get_integration(SparkIntegration) is None:
76-
return event
77-
78-
event.setdefault("user", {}).setdefault("id", self.sparkUser())
79-
80-
event.setdefault("tags", {}).setdefault(
81-
"executor.id", self._conf.get("spark.executor.id")
82-
)
83-
event["tags"].setdefault(
84-
"spark-submit.deployMode",
85-
self._conf.get("spark.submit.deployMode"),
86-
)
87-
event["tags"].setdefault(
88-
"driver.host", self._conf.get("spark.driver.host")
89-
)
90-
event["tags"].setdefault(
91-
"driver.port", self._conf.get("spark.driver.port")
92-
)
93-
event["tags"].setdefault("spark_version", self.version)
94-
event["tags"].setdefault("app_name", self.appName)
95-
event["tags"].setdefault("application_id", self.applicationId)
96-
event["tags"].setdefault("master", self.master)
97-
event["tags"].setdefault("spark_home", self.sparkHome)
98-
99-
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
100-
101-
return event
102-
103-
return init
65+
scope = Scope.get_isolation_scope()
66+
67+
@scope.add_event_processor
68+
def process_event(event, hint):
69+
# type: (Event, Hint) -> Optional[Event]
70+
with capture_internal_exceptions():
71+
if sentry_sdk.get_client().get_integration(SparkIntegration) is None:
72+
return event
73+
74+
event.setdefault("user", {}).setdefault("id", self.sparkUser())
75+
76+
event.setdefault("tags", {}).setdefault(
77+
"executor.id", self._conf.get("spark.executor.id")
78+
)
79+
event["tags"].setdefault(
80+
"spark-submit.deployMode",
81+
self._conf.get("spark.submit.deployMode"),
82+
)
83+
event["tags"].setdefault(
84+
"driver.host", self._conf.get("spark.driver.host")
85+
)
86+
event["tags"].setdefault(
87+
"driver.port", self._conf.get("spark.driver.port")
88+
)
89+
event["tags"].setdefault("spark_version", self.version)
90+
event["tags"].setdefault("app_name", self.appName)
91+
event["tags"].setdefault("application_id", self.applicationId)
92+
event["tags"].setdefault("master", self.master)
93+
event["tags"].setdefault("spark_home", self.sparkHome)
94+
95+
event.setdefault("extra", {}).setdefault("web_url", self.uiWebUrl)
96+
97+
return event
98+
99+
return spark_context_init(self, *args, **kwargs)
104100

105101
SparkContext._do_init = _sentry_patched_spark_context_init
106102

@@ -209,14 +205,10 @@ class Java:
209205

210206

211207
class SentryListener(SparkListener):
212-
def __init__(self):
213-
# type: () -> None
214-
self.hub = Hub.current
215-
216208
def onJobStart(self, jobStart): # noqa: N802,N803
217209
# type: (Any) -> None
218210
message = "Job {} Started".format(jobStart.jobId())
219-
self.hub.add_breadcrumb(level="info", message=message)
211+
sentry_sdk.add_breadcrumb(level="info", message=message)
220212
_set_app_properties()
221213

222214
def onJobEnd(self, jobEnd): # noqa: N802,N803
@@ -232,14 +224,14 @@ def onJobEnd(self, jobEnd): # noqa: N802,N803
232224
level = "warning"
233225
message = "Job {} Failed".format(jobEnd.jobId())
234226

235-
self.hub.add_breadcrumb(level=level, message=message, data=data)
227+
sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
236228

237229
def onStageSubmitted(self, stageSubmitted): # noqa: N802,N803
238230
# type: (Any) -> None
239231
stage_info = stageSubmitted.stageInfo()
240232
message = "Stage {} Submitted".format(stage_info.stageId())
241233
data = {"attemptId": stage_info.attemptId(), "name": stage_info.name()}
242-
self.hub.add_breadcrumb(level="info", message=message, data=data)
234+
sentry_sdk.add_breadcrumb(level="info", message=message, data=data)
243235
_set_app_properties()
244236

245237
def onStageCompleted(self, stageCompleted): # noqa: N802,N803
@@ -260,4 +252,4 @@ def onStageCompleted(self, stageCompleted): # noqa: N802,N803
260252
message = "Stage {} Completed".format(stage_info.stageId())
261253
level = "info"
262254

263-
self.hub.add_breadcrumb(level=level, message=message, data=data)
255+
sentry_sdk.add_breadcrumb(level=level, message=message, data=data)
Lines changed: 40 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import sys
22

3-
from sentry_sdk import configure_scope
4-
from sentry_sdk.hub import Hub
3+
import sentry_sdk
54
from sentry_sdk.integrations import Integration
5+
from sentry_sdk.scope import Scope
66
from sentry_sdk.utils import (
77
capture_internal_exceptions,
88
exc_info_from_error,
@@ -31,11 +31,9 @@ def setup_once():
3131
original_daemon.worker_main = _sentry_worker_main
3232

3333

34-
def _capture_exception(exc_info, hub):
35-
# type: (ExcInfo, Hub) -> None
36-
client = hub.client
37-
38-
client_options = client.options # type: ignore
34+
def _capture_exception(exc_info):
35+
# type: (ExcInfo) -> None
36+
client = sentry_sdk.get_client()
3937

4038
mechanism = {"type": "spark", "handled": False}
4139

@@ -49,7 +47,7 @@ def _capture_exception(exc_info, hub):
4947
if exc_type not in (SystemExit, EOFError, ConnectionResetError):
5048
rv.append(
5149
single_exception_from_error_tuple(
52-
exc_type, exc_value, tb, client_options, mechanism
50+
exc_type, exc_value, tb, client.options, mechanism
5351
)
5452
)
5553

@@ -60,52 +58,50 @@ def _capture_exception(exc_info, hub):
6058

6159
_tag_task_context()
6260

63-
hub.capture_event(event, hint=hint)
61+
sentry_sdk.capture_event(event, hint=hint)
6462

6563

6664
def _tag_task_context():
6765
# type: () -> None
6866
from pyspark.taskcontext import TaskContext
6967

70-
with configure_scope() as scope:
68+
scope = Scope.get_isolation_scope()
7169

72-
@scope.add_event_processor
73-
def process_event(event, hint):
74-
# type: (Event, Hint) -> Optional[Event]
75-
with capture_internal_exceptions():
76-
integration = Hub.current.get_integration(SparkWorkerIntegration)
77-
task_context = TaskContext.get()
70+
@scope.add_event_processor
71+
def process_event(event, hint):
72+
# type: (Event, Hint) -> Optional[Event]
73+
with capture_internal_exceptions():
74+
integration = sentry_sdk.get_client().get_integration(
75+
SparkWorkerIntegration
76+
)
77+
task_context = TaskContext.get()
7878

79-
if integration is None or task_context is None:
80-
return event
79+
if integration is None or task_context is None:
80+
return event
8181

82-
event.setdefault("tags", {}).setdefault(
83-
"stageId", str(task_context.stageId())
84-
)
85-
event["tags"].setdefault("partitionId", str(task_context.partitionId()))
86-
event["tags"].setdefault(
87-
"attemptNumber", str(task_context.attemptNumber())
88-
)
89-
event["tags"].setdefault(
90-
"taskAttemptId", str(task_context.taskAttemptId())
91-
)
82+
event.setdefault("tags", {}).setdefault(
83+
"stageId", str(task_context.stageId())
84+
)
85+
event["tags"].setdefault("partitionId", str(task_context.partitionId()))
86+
event["tags"].setdefault("attemptNumber", str(task_context.attemptNumber()))
87+
event["tags"].setdefault("taskAttemptId", str(task_context.taskAttemptId()))
9288

93-
if task_context._localProperties:
94-
if "sentry_app_name" in task_context._localProperties:
95-
event["tags"].setdefault(
96-
"app_name", task_context._localProperties["sentry_app_name"]
97-
)
98-
event["tags"].setdefault(
99-
"application_id",
100-
task_context._localProperties["sentry_application_id"],
101-
)
89+
if task_context._localProperties:
90+
if "sentry_app_name" in task_context._localProperties:
91+
event["tags"].setdefault(
92+
"app_name", task_context._localProperties["sentry_app_name"]
93+
)
94+
event["tags"].setdefault(
95+
"application_id",
96+
task_context._localProperties["sentry_application_id"],
97+
)
10298

103-
if "callSite.short" in task_context._localProperties:
104-
event.setdefault("extra", {}).setdefault(
105-
"callSite", task_context._localProperties["callSite.short"]
106-
)
99+
if "callSite.short" in task_context._localProperties:
100+
event.setdefault("extra", {}).setdefault(
101+
"callSite", task_context._localProperties["callSite.short"]
102+
)
107103

108-
return event
104+
return event
109105

110106

111107
def _sentry_worker_main(*args, **kwargs):
@@ -115,8 +111,7 @@ def _sentry_worker_main(*args, **kwargs):
115111
try:
116112
original_worker.main(*args, **kwargs)
117113
except SystemExit:
118-
if Hub.current.get_integration(SparkWorkerIntegration) is not None:
119-
hub = Hub.current
114+
if sentry_sdk.get_client().get_integration(SparkWorkerIntegration) is not None:
120115
exc_info = sys.exc_info()
121116
with capture_internal_exceptions():
122-
_capture_exception(exc_info, hub)
117+
_capture_exception(exc_info)

0 commit comments

Comments
 (0)