Skip to content

Commit 2d4c9ca

Browse files
authored
W3c trace context (#331)
* adding traceparent,tracestate forwarding * added validation of traceparent/tracestate headers and update methods * send updated w3c context headers * adding span ctx calculation based on w3c trace context * adding the w3c trace context relevant span properties * apply several fixes discovered through testing * performing some optimisations * adding unittests for traceparent and tarcestate * add fixes for adjusting behaviour to the testing tracer suite * moving the logic of the extra span attributes relative to w3c to only be applied for entry spans * adding asgi in the registered spans * Update instana/version.py * changes based on the review, propagators got merged, a parameter is defining the w3c trace context propagation
1 parent c3f7fd7 commit 2d4c9ca

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+1495
-412
lines changed

instana/instrumentation/aiohttp/client.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@
1010
from ...singletons import agent, async_tracer
1111
from ...util.secrets import strip_secrets_from_query
1212

13-
1413
try:
1514
import aiohttp
1615
import asyncio
1716

17+
1818
async def stan_request_start(session, trace_config_ctx, params):
1919
try:
2020
parent_span = async_tracer.active_span
@@ -31,13 +31,15 @@ async def stan_request_start(session, trace_config_ctx, params):
3131

3232
parts = str(params.url).split('?')
3333
if len(parts) > 1:
34-
cleaned_qp = strip_secrets_from_query(parts[1], agent.options.secrets_matcher, agent.options.secrets_list)
34+
cleaned_qp = strip_secrets_from_query(parts[1], agent.options.secrets_matcher,
35+
agent.options.secrets_list)
3536
scope.span.set_tag("http.params", cleaned_qp)
3637
scope.span.set_tag("http.url", parts[0])
3738
scope.span.set_tag('http.method', params.method)
3839
except Exception:
3940
logger.debug("stan_request_start", exc_info=True)
4041

42+
4143
async def stan_request_end(session, trace_config_ctx, params):
4244
try:
4345
scope = trace_config_ctx.scope
@@ -56,6 +58,7 @@ async def stan_request_end(session, trace_config_ctx, params):
5658
except Exception:
5759
logger.debug("stan_request_end", exc_info=True)
5860

61+
5962
async def stan_request_exception(session, trace_config_ctx, params):
6063
try:
6164
scope = trace_config_ctx.scope
@@ -66,7 +69,8 @@ async def stan_request_exception(session, trace_config_ctx, params):
6669
except Exception:
6770
logger.debug("stan_request_exception", exc_info=True)
6871

69-
@wrapt.patch_function_wrapper('aiohttp.client','ClientSession.__init__')
72+
73+
@wrapt.patch_function_wrapper('aiohttp.client', 'ClientSession.__init__')
7074
def init_with_instana(wrapped, instance, argv, kwargs):
7175
instana_trace_config = aiohttp.TraceConfig()
7276
instana_trace_config.on_request_start.append(stan_request_start)
@@ -79,7 +83,7 @@ def init_with_instana(wrapped, instance, argv, kwargs):
7983

8084
return wrapped(*argv, **kwargs)
8185

86+
8287
logger.debug("Instrumenting aiohttp client")
8388
except ImportError:
8489
pass
85-

instana/instrumentation/aiohttp/server.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,13 @@
1010
from ...singletons import agent, async_tracer
1111
from ...util.secrets import strip_secrets_from_query
1212

13-
1413
try:
1514
import aiohttp
1615
import asyncio
1716

1817
from aiohttp.web import middleware
1918

19+
2020
@middleware
2121
async def stan_middleware(request, handler):
2222
try:
@@ -80,6 +80,7 @@ def init_with_instana(wrapped, instance, argv, kwargs):
8080

8181
return wrapped(*argv, **kwargs)
8282

83+
8384
logger.debug("Instrumenting aiohttp server")
8485
except ImportError:
8586
pass

instana/instrumentation/asgi.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,12 @@
1010
from ..singletons import async_tracer, agent
1111
from ..util.secrets import strip_secrets_from_query
1212

13+
1314
class InstanaASGIMiddleware:
1415
"""
1516
Instana ASGI Middleware
1617
"""
18+
1719
def __init__(self, app):
1820
self.app = app
1921

@@ -41,7 +43,8 @@ def _collect_kvs(self, scope, span):
4143
if isinstance(query, (str, bytes)) and len(query):
4244
if isinstance(query, bytes):
4345
query = query.decode('utf-8')
44-
scrubbed_params = strip_secrets_from_query(query, agent.options.secrets_matcher, agent.options.secrets_list)
46+
scrubbed_params = strip_secrets_from_query(query, agent.options.secrets_matcher,
47+
agent.options.secrets_list)
4548
span.set_tag("http.params", scrubbed_params)
4649

4750
app = scope.get('app')
@@ -55,7 +58,6 @@ def _collect_kvs(self, scope, span):
5558
except Exception:
5659
logger.debug("ASGI collect_kvs: ", exc_info=True)
5760

58-
5961
async def __call__(self, scope, receive, send):
6062
request_context = None
6163

instana/instrumentation/asynqp.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@
1313
import asynqp
1414
import asyncio
1515

16-
@wrapt.patch_function_wrapper('asynqp.exchange','Exchange.publish')
16+
17+
@wrapt.patch_function_wrapper('asynqp.exchange', 'Exchange.publish')
1718
def publish_with_instana(wrapped, instance, argv, kwargs):
1819
parent_span = async_tracer.active_span
1920

@@ -27,12 +28,13 @@ def publish_with_instana(wrapped, instance, argv, kwargs):
2728
msg = argv[0]
2829
if msg.headers is None:
2930
msg.headers = {}
30-
async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers)
31+
async_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, msg.headers,
32+
disable_w3c_trace_context=True)
3133

3234
try:
3335
scope.span.set_tag("exchange", instance.name)
3436
scope.span.set_tag("sort", "publish")
35-
scope.span.set_tag("address", host + ":" + str(port) )
37+
scope.span.set_tag("address", host + ":" + str(port))
3638

3739
if 'routing_key' in kwargs:
3840
scope.span.set_tag("key", kwargs['routing_key'])
@@ -46,8 +48,9 @@ def publish_with_instana(wrapped, instance, argv, kwargs):
4648
else:
4749
return rv
4850

51+
4952
@asyncio.coroutine
50-
@wrapt.patch_function_wrapper('asynqp.queue','Queue.get')
53+
@wrapt.patch_function_wrapper('asynqp.queue', 'Queue.get')
5154
def get_with_instana(wrapped, instance, argv, kwargs):
5255
parent_span = async_tracer.active_span
5356

@@ -59,7 +62,7 @@ def get_with_instana(wrapped, instance, argv, kwargs):
5962
host, port = instance.sender.protocol.transport._sock.getsockname()
6063

6164
scope.span.set_tag("sort", "consume")
62-
scope.span.set_tag("address", host + ":" + str(port) )
65+
scope.span.set_tag("address", host + ":" + str(port))
6366

6467
msg = yield from wrapped(*argv, **kwargs)
6568

@@ -69,23 +72,25 @@ def get_with_instana(wrapped, instance, argv, kwargs):
6972

7073
return msg
7174

75+
7276
@asyncio.coroutine
73-
@wrapt.patch_function_wrapper('asynqp.queue','Queue.consume')
77+
@wrapt.patch_function_wrapper('asynqp.queue', 'Queue.consume')
7478
def consume_with_instana(wrapped, instance, argv, kwargs):
7579
def callback_generator(original_callback):
7680
def callback_with_instana(*argv, **kwargs):
7781
ctx = None
7882
msg = argv[0]
7983
if msg.headers is not None:
80-
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers))
84+
ctx = async_tracer.extract(opentracing.Format.HTTP_HEADERS, dict(msg.headers),
85+
disable_w3c_trace_context=True)
8186

8287
with async_tracer.start_active_span("rabbitmq", child_of=ctx) as scope:
8388
host, port = msg.sender.protocol.transport._sock.getsockname()
8489

8590
try:
8691
scope.span.set_tag("exchange", msg.exchange_name)
8792
scope.span.set_tag("sort", "consume")
88-
scope.span.set_tag("address", host + ":" + str(port) )
93+
scope.span.set_tag("address", host + ":" + str(port))
8994
scope.span.set_tag("key", msg.routing_key)
9095

9196
original_callback(*argv, **kwargs)
@@ -99,6 +104,7 @@ def callback_with_instana(*argv, **kwargs):
99104
argv = (callback_generator(cb),)
100105
return wrapped(*argv, **kwargs)
101106

107+
102108
logger.debug("Instrumenting asynqp")
103109
except ImportError:
104110
pass

instana/instrumentation/aws/triggers.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
import json
99
import base64
1010
from io import BytesIO
11+
import opentracing as ot
1112

1213
from ...log import logger
1314

@@ -21,9 +22,9 @@ def get_context(tracer, event):
2122
is_application_load_balancer_trigger(event)
2223

2324
if is_proxy_event:
24-
return tracer.extract('http_headers', event.get('headers', {}))
25+
return tracer.extract(ot.Format.HTTP_HEADERS, event.get('headers', {}), disable_w3c_trace_context=True)
2526

26-
return tracer.extract('http_headers', event)
27+
return tracer.extract(ot.Format.HTTP_HEADERS, event, disable_w3c_trace_context=True)
2728

2829

2930
def is_api_gateway_proxy_trigger(event):

instana/instrumentation/boto3_inst.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from ..util.traceutils import get_active_tracer
1313

1414
try:
15+
import opentracing as ot
1516
import boto3
1617
from boto3.s3 import inject
1718

@@ -28,7 +29,7 @@ def lambda_inject_context(payload, scope):
2829
if not isinstance(invoke_payload, dict):
2930
invoke_payload = json.loads(invoke_payload)
3031

31-
tracer.inject(scope.span.context, 'http_headers', invoke_payload)
32+
tracer.inject(scope.span.context, ot.Format.HTTP_HEADERS, invoke_payload)
3233
payload['Payload'] = json.dumps(invoke_payload)
3334
except Exception:
3435
logger.debug("non-fatal lambda_inject_context: ", exc_info=True)

instana/instrumentation/celery/hooks.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import urlparse as parse
2020
import urllib
2121

22+
2223
def add_broker_tags(span, broker_url):
2324
try:
2425
url = parse.urlparse(broker_url)
@@ -45,6 +46,7 @@ def add_broker_tags(span, broker_url):
4546
except Exception:
4647
logger.debug("Error parsing broker URL: %s" % broker_url, exc_info=True)
4748

49+
4850
@signals.task_prerun.connect
4951
def task_prerun(*args, **kwargs):
5052
try:
@@ -55,7 +57,7 @@ def task_prerun(*args, **kwargs):
5557

5658
headers = task.request.get('headers', {})
5759
if headers is not None:
58-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers)
60+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, headers, disable_w3c_trace_context=True)
5961

6062
scope = tracer.start_active_span("celery-worker", child_of=ctx)
6163
scope.span.set_tag("task", task.name)
@@ -67,6 +69,7 @@ def task_prerun(*args, **kwargs):
6769
except:
6870
logger.debug("task_prerun: ", exc_info=True)
6971

72+
7073
@signals.task_postrun.connect
7174
def task_postrun(*args, **kwargs):
7275
try:
@@ -78,6 +81,7 @@ def task_postrun(*args, **kwargs):
7881
except:
7982
logger.debug("after_task_publish: ", exc_info=True)
8083

84+
8185
@signals.task_failure.connect
8286
def task_failure(*args, **kwargs):
8387
try:
@@ -95,6 +99,7 @@ def task_failure(*args, **kwargs):
9599
except:
96100
logger.debug("task_failure: ", exc_info=True)
97101

102+
98103
@signals.task_retry.connect
99104
def task_retry(*args, **kwargs):
100105
try:
@@ -109,6 +114,7 @@ def task_retry(*args, **kwargs):
109114
except:
110115
logger.debug("task_failure: ", exc_info=True)
111116

117+
112118
@signals.before_task_publish.connect
113119
def before_task_publish(*args, **kwargs):
114120
try:
@@ -127,7 +133,8 @@ def before_task_publish(*args, **kwargs):
127133

128134
# Context propagation
129135
context_headers = {}
130-
active_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, context_headers)
136+
active_tracer.inject(scope.span.context, opentracing.Format.HTTP_HEADERS, context_headers,
137+
disable_w3c_trace_context=True)
131138

132139
# Fix for broken header propagation
133140
# https://github.com/celery/celery/issues/4875
@@ -141,6 +148,7 @@ def before_task_publish(*args, **kwargs):
141148
except:
142149
logger.debug("before_task_publish: ", exc_info=True)
143150

151+
144152
@signals.after_task_publish.connect
145153
def after_task_publish(*args, **kwargs):
146154
try:
@@ -152,6 +160,7 @@ def after_task_publish(*args, **kwargs):
152160
except:
153161
logger.debug("after_task_publish: ", exc_info=True)
154162

163+
155164
logger.debug("Instrumenting celery")
156165
except ImportError:
157166
pass

instana/instrumentation/flask/vanilla.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,7 @@ def before_request_with_instana(*argv, **kwargs):
2222
env = flask.request.environ
2323
ctx = None
2424

25-
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
26-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
25+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
2726

2827
flask.g.scope = tracer.start_active_span('wsgi', child_of=ctx)
2928
span = flask.g.scope.span
@@ -39,7 +38,8 @@ def before_request_with_instana(*argv, **kwargs):
3938
if 'PATH_INFO' in env:
4039
span.set_tag(ext.HTTP_URL, env['PATH_INFO'])
4140
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
42-
scrubbed_params = strip_secrets_from_query(env['QUERY_STRING'], agent.options.secrets_matcher, agent.options.secrets_list)
41+
scrubbed_params = strip_secrets_from_query(env['QUERY_STRING'], agent.options.secrets_matcher,
42+
agent.options.secrets_list)
4343
span.set_tag("http.params", scrubbed_params)
4444
if 'HTTP_HOST' in env:
4545
span.set_tag("http.host", env['HTTP_HOST'])

instana/instrumentation/flask/with_blinker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,7 @@ def request_started_with_instana(sender, **extra):
2323
env = flask.request.environ
2424
ctx = None
2525

26-
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
27-
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
26+
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
2827

2928
flask.g.scope = tracer.start_active_span('wsgi', child_of=ctx)
3029
span = flask.g.scope.span
@@ -40,7 +39,8 @@ def request_started_with_instana(sender, **extra):
4039
if 'PATH_INFO' in env:
4140
span.set_tag(ext.HTTP_URL, env['PATH_INFO'])
4241
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
43-
scrubbed_params = strip_secrets_from_query(env['QUERY_STRING'], agent.options.secrets_matcher, agent.options.secrets_list)
42+
scrubbed_params = strip_secrets_from_query(env['QUERY_STRING'], agent.options.secrets_matcher,
43+
agent.options.secrets_list)
4444
span.set_tag("http.params", scrubbed_params)
4545
if 'HTTP_HOST' in env:
4646
span.set_tag("http.host", env['HTTP_HOST'])

instana/instrumentation/google/cloud/pubsub.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def publish_with_instana(wrapped, instance, args, kwargs):
4747
with tracer.start_active_span('gcps-producer', child_of=parent_span) as scope:
4848
# trace continuity, inject to the span context
4949
headers = dict()
50-
tracer.inject(scope.span.context, Format.TEXT_MAP, headers)
50+
tracer.inject(scope.span.context, Format.TEXT_MAP, headers, disable_w3c_trace_context=True)
5151

5252
# update the metadata dict with instana trace attributes
5353
kwargs.update(headers)
@@ -73,7 +73,7 @@ def subscribe_with_instana(wrapped, instance, args, kwargs):
7373

7474
def callback_with_instana(message):
7575
if message.attributes:
76-
parent_span = tracer.extract(Format.TEXT_MAP, message.attributes)
76+
parent_span = tracer.extract(Format.TEXT_MAP, message.attributes, disable_w3c_trace_context=True)
7777
else:
7878
parent_span = None
7979

0 commit comments

Comments
 (0)