2525# Original connect connect function
2626_connect = cassandra .cluster .Cluster .connect
2727
28+
2829def patch ():
2930 """ patch will add tracing to the cassandra library. """
3031 setattr (cassandra .cluster .Cluster , 'connect' ,
3132 wrapt .FunctionWrapper (_connect , traced_connect ))
3233 Pin (service = SERVICE , app = SERVICE , app_type = "db" ).onto (cassandra .cluster .Cluster )
3334
35+
3436def unpatch ():
3537 cassandra .cluster .Cluster .connect = _connect
3638
39+
3740def traced_connect (func , instance , args , kwargs ):
3841 session = func (* args , ** kwargs )
3942 if not isinstance (session .execute , wrapt .FunctionWrapper ):
4043 # FIXME[matt] this should probably be private.
4144 setattr (session , 'execute_async' , wrapt .FunctionWrapper (session .execute_async , traced_execute_async ))
4245 return session
4346
47+
4448def _close_span_on_success (result , future ):
4549 span = getattr (future , CURRENT_SPAN , None )
4650 if not span :
@@ -54,11 +58,13 @@ def _close_span_on_success(result, future):
5458 span .finish ()
5559 delattr (future , CURRENT_SPAN )
5660
61+
5762def traced_set_final_result (func , instance , args , kwargs ):
5863 result = args [0 ]
5964 _close_span_on_success (result , instance )
6065 return func (* args , ** kwargs )
6166
67+
6268def _close_span_on_error (exc , future ):
6369 span = getattr (future , CURRENT_SPAN , None )
6470 if not span :
@@ -76,11 +82,13 @@ def _close_span_on_error(exc, future):
7682 span .finish ()
7783 delattr (future , CURRENT_SPAN )
7884
85+
7986def traced_set_final_exception (func , instance , args , kwargs ):
8087 exc = args [0 ]
8188 _close_span_on_error (exc , instance )
8289 return func (* args , ** kwargs )
8390
91+
8492def traced_start_fetching_next_page (func , instance , args , kwargs ):
8593 has_more_pages = getattr (instance , 'has_more_pages' , True )
8694 if not has_more_pages :
@@ -106,11 +114,12 @@ def traced_start_fetching_next_page(func, instance, args, kwargs):
106114 setattr (instance , CURRENT_SPAN , span )
107115 try :
108116 return func (* args , ** kwargs )
109- except :
117+ except Exception :
110118 with span :
111119 span .set_exc_info (* sys .exc_info ())
112120 raise
113121
122+
114123def traced_execute_async (func , instance , args , kwargs ):
115124 cluster = getattr (instance , 'cluster' , None )
116125 pin = Pin .get_from (cluster )
@@ -161,11 +170,12 @@ def traced_execute_async(func, instance, args, kwargs):
161170 )
162171 result .clear_callbacks ()
163172 return result
164- except :
173+ except Exception :
165174 with span :
166175 span .set_exc_info (* sys .exc_info ())
167176 raise
168177
178+
169179def _start_span_and_set_tags (pin , query , session , cluster ):
170180 service = pin .service
171181 tracer = pin .tracer
@@ -175,6 +185,7 @@ def _start_span_and_set_tags(pin, query, session, cluster):
175185 span .set_tags (_extract_cluster_metas (cluster ))
176186 return span
177187
188+
178189def _extract_session_metas (session ):
179190 metas = {}
180191
@@ -185,6 +196,7 @@ def _extract_session_metas(session):
185196
186197 return metas
187198
199+
188200def _extract_cluster_metas (cluster ):
189201 metas = {}
190202 if deep_getattr (cluster , "metadata.cluster_name" ):
@@ -194,6 +206,7 @@ def _extract_cluster_metas(cluster):
194206
195207 return metas
196208
209+
197210def _extract_result_metas (result ):
198211 metas = {}
199212 if result is None :
@@ -230,6 +243,7 @@ def _extract_result_metas(result):
230243
231244 return metas
232245
246+
233247def _sanitize_query (span , query ):
234248 # TODO (aaditya): fix this hacky type check. we need it to avoid circular imports
235249 t = type (query ).__name__
@@ -250,7 +264,7 @@ def _sanitize_query(span, query):
250264 elif t == 'str' :
251265 resource = query
252266 else :
253- resource = 'unknown-query-type' # FIXME[matt] what else do to here?
267+ resource = 'unknown-query-type' # FIXME[matt] what else do to here?
254268
255269 span .resource = stringify (resource )[:RESOURCE_MAX_LENGTH ]
256270
0 commit comments