@@ -17,7 +17,7 @@ class SparkIntegration(Integration):
1717 @staticmethod
1818 def setup_once ():
1919 # type: () -> None
20- patch_spark_context_init ()
20+ _setup_sentry_tracing ()
2121
2222
2323def _set_app_properties ():
@@ -36,20 +36,63 @@ def _set_app_properties():
3636 )
3737
3838
39- def _start_sentry_listener (sc ):
39+ def _start_sentry_listener ():
4040 # type: (Any) -> None
4141 """
4242 Start java gateway server to add custom `SparkListener`
4343 """
44+ from pyspark import SparkContext
4445 from pyspark .java_gateway import ensure_callback_server_started
4546
47+ sc = SparkContext ._active_spark_context
4648 gw = sc ._gateway
4749 ensure_callback_server_started (gw )
4850 listener = SentryListener ()
4951 sc ._jsc .sc ().addSparkListener (listener )
5052
5153
52- def patch_spark_context_init ():
54+ def _activate_integration (sc ):
55+ # type: (SparkContext) -> None
56+ from pyspark import SparkContext
57+
58+ _start_sentry_listener ()
59+ _set_app_properties ()
60+
61+ scope = sentry_sdk .get_isolation_scope ()
62+
63+ @scope .add_event_processor
64+ def process_event (event , hint ):
65+ # type: (Event, Hint) -> Optional[Event]
66+ with capture_internal_exceptions ():
67+ if sentry_sdk .get_client ().get_integration (SparkIntegration ) is None :
68+ return event
69+
70+ if sc ._active_spark_context is None :
71+ return event
72+
73+ event .setdefault ("user" , {}).setdefault ("id" , sc .sparkUser ())
74+
75+ event .setdefault ("tags" , {}).setdefault (
76+ "executor.id" , sc ._conf .get ("spark.executor.id" )
77+ )
78+ event ["tags" ].setdefault (
79+ "spark-submit.deployMode" ,
80+ sc ._conf .get ("spark.submit.deployMode" ),
81+ )
82+ event ["tags" ].setdefault ("driver.host" , sc ._conf .get ("spark.driver.host" ))
83+ event ["tags" ].setdefault ("driver.port" , sc ._conf .get ("spark.driver.port" ))
84+ event ["tags" ].setdefault ("spark_version" , sc .version )
85+ event ["tags" ].setdefault ("app_name" , sc .appName )
86+ event ["tags" ].setdefault ("application_id" , sc .applicationId )
87+ event ["tags" ].setdefault ("master" , sc .master )
88+ event ["tags" ].setdefault ("spark_home" , sc .sparkHome )
89+
90+ event .setdefault ("extra" , {}).setdefault ("web_url" , sc .uiWebUrl )
91+
92+ return event
93+
94+
95+ def _patch_spark_context_init ():
5396 # type: () -> None
5497 from pyspark import SparkContext
5598
@@ -59,51 +102,22 @@ def patch_spark_context_init():
59102 def _sentry_patched_spark_context_init (self , * args , ** kwargs ):
60103 # type: (SparkContext, *Any, **Any) -> Optional[Any]
61104 rv = spark_context_init (self , * args , ** kwargs )
62- _start_sentry_listener (self )
63- _set_app_properties ()
64-
65- scope = sentry_sdk .get_isolation_scope ()
66-
67- @scope .add_event_processor
68- def process_event (event , hint ):
69- # type: (Event, Hint) -> Optional[Event]
70- with capture_internal_exceptions ():
71- if sentry_sdk .get_client ().get_integration (SparkIntegration ) is None :
72- return event
73-
74- if self ._active_spark_context is None :
75- return event
76-
77- event .setdefault ("user" , {}).setdefault ("id" , self .sparkUser ())
78-
79- event .setdefault ("tags" , {}).setdefault (
80- "executor.id" , self ._conf .get ("spark.executor.id" )
81- )
82- event ["tags" ].setdefault (
83- "spark-submit.deployMode" ,
84- self ._conf .get ("spark.submit.deployMode" ),
85- )
86- event ["tags" ].setdefault (
87- "driver.host" , self ._conf .get ("spark.driver.host" )
88- )
89- event ["tags" ].setdefault (
90- "driver.port" , self ._conf .get ("spark.driver.port" )
91- )
92- event ["tags" ].setdefault ("spark_version" , self .version )
93- event ["tags" ].setdefault ("app_name" , self .appName )
94- event ["tags" ].setdefault ("application_id" , self .applicationId )
95- event ["tags" ].setdefault ("master" , self .master )
96- event ["tags" ].setdefault ("spark_home" , self .sparkHome )
97-
98- event .setdefault ("extra" , {}).setdefault ("web_url" , self .uiWebUrl )
99-
100- return event
101-
105+ _activate_integration (self )
102106 return rv
103107
104108 SparkContext ._do_init = _sentry_patched_spark_context_init
105109
106110
111+ def _setup_sentry_tracing ():
112+ # type: () -> None
113+ from pyspark import SparkContext
114+
115+ if SparkContext ._active_spark_context is not None :
116+ _activate_integration (SparkContext ._active_spark_context )
117+ return
118+ _patch_spark_context_init ()
119+
120+
107121class SparkListener :
108122 def onApplicationEnd (self , applicationEnd ): # noqa: N802,N803
109123 # type: (Any) -> None
0 commit comments