Skip to content

Commit 737a996

Browse files
bmermetEmanuele Palazzetti
authored andcommitted
[cassandra] trace execute_async operations (#333)
* [cassandra] patch execute_async instead of execute
1 parent 41fbcb7 commit 737a996

File tree

3 files changed

+231
-45
lines changed

3 files changed

+231
-45
lines changed

ddtrace/contrib/cassandra/session.py

Lines changed: 141 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
"""
22
Trace queries along a session to a cassandra cluster
33
"""
4+
import sys
5+
import logging
46
# 3p
57
import cassandra.cluster
68
import wrapt
@@ -9,11 +11,14 @@
911
from ddtrace import Pin
1012
from ddtrace.compat import stringify
1113
from ...util import deep_getattr, deprecated
12-
from ...ext import net, cassandra as cassx
14+
from ...ext import net, cassandra as cassx, errors
1315

16+
log = logging.getLogger(__name__)
1417

1518
RESOURCE_MAX_LENGTH = 5000
16-
SERVICE = "cassandra"
19+
SERVICE = 'cassandra'
20+
CURRENT_SPAN = '_ddtrace_current_span'
21+
PAGE_NUMBER = '_ddtrace_page_number'
1722

1823
# Original connect connect function
1924
_connect = cassandra.cluster.Cluster.connect
@@ -31,32 +36,142 @@ def traced_connect(func, instance, args, kwargs):
3136
session = func(*args, **kwargs)
3237
if not isinstance(session.execute, wrapt.FunctionWrapper):
3338
# FIXME[matt] this should probably be private.
34-
setattr(session, 'execute', wrapt.FunctionWrapper(session.execute, traced_execute))
39+
setattr(session, 'execute_async', wrapt.FunctionWrapper(session.execute_async, traced_execute_async))
3540
return session
3641

37-
def traced_execute(func, instance, args, kwargs):
38-
cluster = getattr(instance, 'cluster', None)
42+
def _close_span_on_success(result, future):
43+
span = getattr(future, CURRENT_SPAN, None)
44+
if not span:
45+
log.debug('traced_set_final_result was not able to get the current span from the ResponseFuture')
46+
return
47+
try:
48+
span.set_tags(_extract_result_metas(cassandra.cluster.ResultSet(future, result)))
49+
except Exception as e:
50+
log.debug('an exception occured while setting tags: %s', e)
51+
finally:
52+
span.finish()
53+
delattr(future, CURRENT_SPAN)
54+
55+
def traced_set_final_result(func, instance, args, kwargs):
56+
result = args[0]
57+
_close_span_on_success(result, instance)
58+
return func(*args, **kwargs)
59+
60+
def _close_span_on_error(exc, future):
61+
span = getattr(future, CURRENT_SPAN, None)
62+
if not span:
63+
log.debug('traced_set_final_exception was not able to get the current span from the ResponseFuture')
64+
return
65+
try:
66+
# handling the exception manually because we
67+
# don't have an ongoing exception here
68+
span.error = 1
69+
span.set_tag(errors.ERROR_MSG, exc.args[0])
70+
span.set_tag(errors.ERROR_TYPE, exc.__class__.__name__)
71+
except Exception as e:
72+
log.debug('traced_set_final_exception was not able to set the error, failed with error: %s', e)
73+
finally:
74+
span.finish()
75+
delattr(future, CURRENT_SPAN)
76+
77+
def traced_set_final_exception(func, instance, args, kwargs):
78+
exc = args[0]
79+
_close_span_on_error(exc, instance)
80+
return func(*args, **kwargs)
81+
82+
def traced_start_fetching_next_page(func, instance, args, kwargs):
83+
has_more_pages = getattr(instance, 'has_more_pages', True)
84+
if not has_more_pages:
85+
return func(*args, **kwargs)
86+
session = getattr(instance, 'session', None)
87+
cluster = getattr(session, 'cluster', None)
3988
pin = Pin.get_from(cluster)
4089
if not pin or not pin.enabled():
4190
return func(*args, **kwargs)
4291

43-
service = pin.service
44-
tracer = pin.tracer
92+
# In case the current span is not finished we make sure to finish it
93+
old_span = getattr(instance, CURRENT_SPAN, None)
94+
if old_span:
95+
log.debug('previous span was not finished before fetching next page')
96+
old_span.finish()
4597

46-
query = kwargs.get("kwargs") or args[0]
98+
query = getattr(instance, 'query', None)
4799

48-
with tracer.trace("cassandra.query", service=service, span_type=cassx.TYPE) as span:
49-
_sanitize_query(span, query)
50-
span.set_tags(_extract_session_metas(instance)) # FIXME[matt] do once?
51-
span.set_tags(_extract_cluster_metas(cluster))
52-
result = None
53-
try:
54-
result = func(*args, **kwargs)
55-
return result
56-
finally:
57-
if result:
58-
span.set_tags(_extract_result_metas(result))
100+
span = _start_span_and_set_tags(pin, query, session, cluster)
59101

102+
page_number = getattr(instance, PAGE_NUMBER, 1) + 1
103+
setattr(instance, PAGE_NUMBER, page_number)
104+
setattr(instance, CURRENT_SPAN, span)
105+
try:
106+
return func(*args, **kwargs)
107+
except:
108+
with span:
109+
span.set_exc_info(*sys.exc_info())
110+
raise
111+
112+
def traced_execute_async(func, instance, args, kwargs):
113+
cluster = getattr(instance, 'cluster', None)
114+
pin = Pin.get_from(cluster)
115+
if not pin or not pin.enabled():
116+
return func(*args, **kwargs)
117+
118+
query = kwargs.get("query") or args[0]
119+
120+
span = _start_span_and_set_tags(pin, query, instance, cluster)
121+
122+
try:
123+
result = func(*args, **kwargs)
124+
setattr(result, CURRENT_SPAN, span)
125+
setattr(result, PAGE_NUMBER, 1)
126+
setattr(
127+
result,
128+
'_set_final_result',
129+
wrapt.FunctionWrapper(
130+
result._set_final_result,
131+
traced_set_final_result
132+
)
133+
)
134+
setattr(
135+
result,
136+
'_set_final_exception',
137+
wrapt.FunctionWrapper(
138+
result._set_final_exception,
139+
traced_set_final_exception
140+
)
141+
)
142+
setattr(
143+
result,
144+
'start_fetching_next_page',
145+
wrapt.FunctionWrapper(
146+
result.start_fetching_next_page,
147+
traced_start_fetching_next_page
148+
)
149+
)
150+
# Since we cannot be sure that the previous methods were overwritten
151+
# before the call ended, we add callbacks that will be run
152+
# synchronously if the call already returned and we remove them right
153+
# after.
154+
result.add_callbacks(
155+
_close_span_on_success,
156+
_close_span_on_error,
157+
callback_args=(result,),
158+
errback_args=(result,)
159+
)
160+
result.clear_callbacks()
161+
return result
162+
except:
163+
with span:
164+
span.set_exc_info(*sys.exc_info())
165+
raise
166+
167+
def _start_span_and_set_tags(pin, query, session, cluster):
168+
service = pin.service
169+
tracer = pin.tracer
170+
span = tracer.trace("cassandra.query", service=service, span_type=cassx.TYPE)
171+
_sanitize_query(span, query)
172+
span.set_tags(_extract_session_metas(session)) # FIXME[matt] do once?
173+
span.set_tags(_extract_cluster_metas(cluster))
174+
return span
60175

61176
def _extract_session_metas(session):
62177
metas = {}
@@ -79,7 +194,7 @@ def _extract_cluster_metas(cluster):
79194

80195
def _extract_result_metas(result):
81196
metas = {}
82-
if not result:
197+
if result is None:
83198
return metas
84199

85200
future = getattr(result, "response_future", None)
@@ -100,12 +215,13 @@ def _extract_result_metas(result):
100215
if getattr(query, "keyspace", None):
101216
metas[cassx.KEYSPACE] = query.keyspace.lower()
102217

103-
if hasattr(result, "has_more_pages"):
104-
metas[cassx.PAGINATED] = bool(result.has_more_pages)
218+
page_number = getattr(future, PAGE_NUMBER, 1)
219+
has_more_pages = getattr(future, "has_more_pages")
220+
is_paginated = has_more_pages or page_number > 1
221+
metas[cassx.PAGINATED] = is_paginated
222+
if is_paginated:
223+
metas[cassx.PAGE_NUMBER] = page_number
105224

106-
# NOTE(aaditya): this number only reflects the first page of results
107-
# which could be misleading. But a true count would require iterating through
108-
# all pages which is expensive
109225
if hasattr(result, "current_rows"):
110226
result_rows = result.current_rows or []
111227
metas[cassx.ROW_COUNT] = len(result_rows)

ddtrace/ext/cassandra.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,3 +8,4 @@
88
CONSISTENCY_LEVEL = "cassandra.consistency_level"
99
PAGINATED = "cassandra.paginated"
1010
ROW_COUNT = "cassandra.row_count"
11+
PAGE_NUMBER = "cassandra.page_number"

0 commit comments

Comments
 (0)