Skip to content

Commit 5195960

Browse files
tebrielbrettlangdon
authored andcommitted
[kombu] add Kombu integration (#515)
Adding routing key to tags Initial tracing of Consumer.receive Using the Propagator for kombu publish Addressing first bit of feedback, more to come. Rolling out tests for python 2.7-3.6 and Kombu 4.0,4.1,4.2 Fixing flake8 issue, adding a second test, cleaning up utils. Actually tell CircleCI to run Kombu (oops!) Use the config settings for the service name Remove superflous function wrapper, remove extra patch call in example PR Feedback, documentation improvements, rebase from master Updating circleci config Fixing CircleCI Update ddtrace/contrib/kombu/patch.py Co-Authored-By: tebriel <[email protected]> Update ddtrace/contrib/kombu/patch.py Co-Authored-By: tebriel <[email protected]>
1 parent b0a4a22 commit 5195960

File tree

13 files changed

+364
-12
lines changed

13 files changed

+364
-12
lines changed

.circleci/config.yml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -695,6 +695,21 @@ jobs:
695695
- vertica.results
696696
- *save_cache_step
697697

698+
kombu:
699+
docker:
700+
- *test_runner
701+
- image: rabbitmq:3.7-alpine
702+
steps:
703+
- checkout
704+
- *restore_cache_step
705+
- run: tox -e wait rabbitmq
706+
- run: tox -e 'kombu_contrib-{py27,py34,py35,py36}-kombu{40,41,42}' --result-json /tmp/kombu.results
707+
- persist_to_workspace:
708+
root: /tmp
709+
paths:
710+
- kombu.results
711+
- *save_cache_step
712+
698713
sqlite3:
699714
docker:
700715
- *test_runner
@@ -860,6 +875,7 @@ workflows:
860875
- aiopg
861876
- redis
862877
- rediscluster
878+
- kombu
863879
- sqlite3
864880
- msgpack
865881
- vertica
@@ -906,6 +922,7 @@ workflows:
906922
- aiopg
907923
- redis
908924
- rediscluster
925+
- kombu
909926
- sqlite3
910927
- msgpack
911928
- vertica

ddtrace/contrib/kombu/__init__.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
"""Instrument kombu to report AMQP messaging.
2+
3+
``patch_all`` will not automatically patch your Kombu client to make it work, as this would conflict with the
4+
Celery integration. You must specifically request kombu be patched, as in the example below.
5+
6+
Note: To permit distributed tracing for the kombu integration you must enable the tracer with priority
7+
sampling. Refer to the documentation here:
8+
http://pypi.datadoghq.com/trace/docs/advanced_usage.html#priority-sampling
9+
10+
Without enabling distributed tracing, spans within a trace generated by the kombu integration might be dropped
11+
without the whole trace being dropped.
12+
::
13+
14+
from ddtrace import Pin, patch
15+
import kombu
16+
17+
# If not patched yet, you can patch kombu specifically
18+
patch(kombu=True)
19+
20+
# This will report a span with the default settings
21+
conn = kombu.Connection("amqp://guest:[email protected]:5672//")
22+
conn.connect()
23+
task_queue = kombu.Queue('tasks', kombu.Exchange('tasks'), routing_key='tasks')
24+
to_publish = {'hello': 'world'}
25+
producer = conn.Producer()
26+
producer.publish(to_publish,
27+
exchange=task_queue.exchange,
28+
routing_key=task_queue.routing_key,
29+
declare=[task_queue])
30+
31+
# Use a pin to specify metadata related to this client
32+
Pin.override(producer, service='kombu-consumer')
33+
"""
34+
35+
from ...utils.importlib import require_modules
36+
37+
required_modules = ['kombu', 'kombu.messaging']
38+
39+
with require_modules(required_modules) as missing_modules:
40+
if not missing_modules:
41+
from .patch import patch
42+
43+
__all__ = ['patch']

ddtrace/contrib/kombu/constants.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
DEFAULT_SERVICE = 'kombu'

ddtrace/contrib/kombu/patch.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
# 3p
2+
import kombu
3+
import wrapt
4+
5+
# project
6+
from ddtrace import config
7+
8+
from ...pin import Pin
9+
from ...utils.formats import get_env
10+
from .constants import DEFAULT_SERVICE
11+
from ...ext import kombu as kombux
12+
from ...ext import AppTypes
13+
from ...utils.wrappers import unwrap
14+
from ...propagation.http import HTTPPropagator
15+
from .utils import (
16+
get_exchange_from_args,
17+
get_body_length_from_args,
18+
get_routing_key_from_args,
19+
extract_conn_tags,
20+
HEADER_POS
21+
)
22+
23+
# kombu default settings
24+
config._add('kombu',{
25+
'service_name': get_env('kombu', 'service_name', DEFAULT_SERVICE)
26+
})
27+
28+
propagator = HTTPPropagator()
29+
30+
31+
def patch():
32+
"""Patch the instrumented methods
33+
34+
This duplicated doesn't look nice. The nicer alternative is to use an ObjectProxy on top
35+
of Kombu. However, it means that any "import kombu.Connection" won't be instrumented.
36+
"""
37+
if getattr(kombu, '_datadog_patch', False):
38+
return
39+
setattr(kombu, '_datadog_patch', True)
40+
41+
_w = wrapt.wrap_function_wrapper
42+
# We wrap the _publish method because the publish method:
43+
# * defines defaults in its kwargs
44+
# * potentially overrides kwargs with values from self
45+
# * extracts/normalizes things like exchange
46+
_w(kombux.TYPE, 'Producer._publish', traced_publish)
47+
_w(kombux.TYPE, 'Consumer.receive', traced_receive)
48+
Pin(
49+
service=config.kombu['service_name'],
50+
app='kombu',
51+
app_type=AppTypes.worker,
52+
).onto(kombu.messaging.Producer)
53+
54+
Pin(
55+
service=config.kombu['service_name'],
56+
app='kombu',
57+
app_type=AppTypes.worker,
58+
).onto(kombu.messaging.Consumer)
59+
60+
61+
def unpatch():
62+
if getattr(kombu, '_datadog_patch', False):
63+
setattr(kombu, '_datadog_patch', False)
64+
unwrap(kombu.Producer, '_publish')
65+
unwrap(kombu.Consumer, 'receive')
66+
67+
#
68+
# tracing functions
69+
#
70+
71+
72+
def traced_receive(func, instance, args, kwargs):
73+
pin = Pin.get_from(instance)
74+
if not pin or not pin.enabled():
75+
return func(*args, **kwargs)
76+
77+
# Signature only takes 2 args: (body, message)
78+
message = args[1]
79+
context = propagator.extract(message.headers)
80+
# only need to active the new context if something was propagated
81+
if context.trace_id:
82+
pin.tracer.context_provider.activate(context)
83+
with pin.tracer.trace(kombux.RECEIVE_NAME, service=pin.service, span_type='kombu') as s:
84+
# run the command
85+
exchange = message.delivery_info['exchange']
86+
s.resource = exchange
87+
s.set_tag(kombux.EXCHANGE, exchange)
88+
89+
s.set_tags(extract_conn_tags(message.channel.connection))
90+
s.set_tag(kombux.ROUTING_KEY, message.delivery_info['routing_key'])
91+
return func(*args, **kwargs)
92+
93+
94+
def traced_publish(func, instance, args, kwargs):
95+
pin = Pin.get_from(instance)
96+
if not pin or not pin.enabled():
97+
return func(*args, **kwargs)
98+
99+
with pin.tracer.trace(kombux.PUBLISH_NAME, service=pin.service, span_type='kombu') as s:
100+
exchange_name = get_exchange_from_args(args)
101+
s.resource = exchange_name
102+
s.set_tag(kombux.EXCHANGE, exchange_name)
103+
if pin.tags:
104+
s.set_tags(pin.tags)
105+
s.set_tag(kombux.ROUTING_KEY, get_routing_key_from_args(args))
106+
s.set_tags(extract_conn_tags(instance.channel.connection))
107+
s.set_metric(kombux.BODY_LEN, get_body_length_from_args(args))
108+
# run the command
109+
propagator.inject(s.context, args[HEADER_POS])
110+
return func(*args, **kwargs)

ddtrace/contrib/kombu/utils.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""
2+
Some utils used by the dogtrace kombu integration
3+
"""
4+
from ...ext import kombu as kombux, net
5+
6+
PUBLISH_BODY_IDX = 0
7+
PUBLISH_ROUTING_KEY = 6
8+
PUBLISH_EXCHANGE_IDX = 9
9+
10+
HEADER_POS = 4
11+
12+
13+
def extract_conn_tags(connection):
14+
""" Transform kombu conn info into dogtrace metas """
15+
try:
16+
host, port = connection.host.split(':')
17+
return {
18+
net.TARGET_HOST: host,
19+
net.TARGET_PORT: port,
20+
kombux.VHOST: connection.virtual_host,
21+
}
22+
except AttributeError:
23+
# Unlikely that we don't have .host or .virtual_host but let's not die over it
24+
return {}
25+
26+
27+
def get_exchange_from_args(args):
28+
"""Extract the exchange
29+
30+
The publish method extracts the name and hands that off to _publish (what we patch)
31+
"""
32+
33+
return args[PUBLISH_EXCHANGE_IDX]
34+
35+
36+
def get_routing_key_from_args(args):
37+
"""Extract the routing key"""
38+
39+
name = args[PUBLISH_ROUTING_KEY]
40+
return name
41+
42+
43+
def get_body_length_from_args(args):
44+
"""Extract the length of the body"""
45+
46+
length = len(args[PUBLISH_BODY_IDX])
47+
return length

ddtrace/ext/kombu.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
# type of the spans
2+
TYPE = 'kombu'
3+
4+
# net extension
5+
VHOST = 'out.vhost'
6+
7+
# standard tags
8+
EXCHANGE = 'kombu.exchange'
9+
BODY_LEN = 'kombu.body_length'
10+
ROUTING_KEY = 'kombu.routing_key'
11+
12+
PUBLISH_NAME = 'kombu.publish'
13+
RECEIVE_NAME = 'kombu.receive'

ddtrace/monkey.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
'vertica': True,
4747
'jinja2': True,
4848
'flask': True,
49+
'kombu': False,
4950

5051
# Ignore some web framework integrations that might be configured explicitly in code
5152
"django": False,

docker-compose.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ services:
6262
- "127.0.0.1:5003:5003"
6363
- "127.0.0.1:5004:5004"
6464
- "127.0.0.1:5005:5005"
65+
rabbitmq:
66+
image: rabbitmq:3.7-alpine
67+
ports:
68+
- "127.0.0.1:5672:5672"
6569
ddagent:
6670
image: datadog/docker-dd-agent
6771
environment:

tests/contrib/config.py

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,19 @@
1818
}
1919

2020
POSTGRES_CONFIG = {
21-
'host' : 'localhost',
21+
'host': 'localhost',
2222
'port': int(os.getenv("TEST_POSTGRES_PORT", 5432)),
23-
'user' : os.getenv("TEST_POSTGRES_USER", "postgres"),
24-
'password' : os.getenv("TEST_POSTGRES_PASSWORD", "postgres"),
25-
'dbname' : os.getenv("TEST_POSTGRES_DB", "postgres"),
23+
'user': os.getenv("TEST_POSTGRES_USER", "postgres"),
24+
'password': os.getenv("TEST_POSTGRES_PASSWORD", "postgres"),
25+
'dbname': os.getenv("TEST_POSTGRES_DB", "postgres"),
2626
}
2727

2828
MYSQL_CONFIG = {
29-
'host' : '127.0.0.1',
30-
'port' : int(os.getenv("TEST_MYSQL_PORT", 3306)),
31-
'user' : os.getenv("TEST_MYSQL_USER", 'test'),
32-
'password' : os.getenv("TEST_MYSQL_PASSWORD", 'test'),
33-
'database' : os.getenv("TEST_MYSQL_DATABASE", 'test'),
29+
'host': '127.0.0.1',
30+
'port': int(os.getenv("TEST_MYSQL_PORT", 3306)),
31+
'user': os.getenv("TEST_MYSQL_USER", 'test'),
32+
'password': os.getenv("TEST_MYSQL_PASSWORD", 'test'),
33+
'database': os.getenv("TEST_MYSQL_DATABASE", 'test'),
3434
}
3535

3636
REDIS_CONFIG = {
@@ -47,7 +47,7 @@
4747
}
4848

4949
MEMCACHED_CONFIG = {
50-
'host' : os.getenv('TEST_MEMCACHED_HOST', '127.0.0.1'),
50+
'host': os.getenv('TEST_MEMCACHED_HOST', '127.0.0.1'),
5151
'port': int(os.getenv("TEST_MEMCACHED_PORT", 11211)),
5252
}
5353

@@ -58,3 +58,10 @@
5858
'password': os.getenv('TEST_VERTICA_PASSWORD', 'abc123'),
5959
'database': os.getenv('TEST_VERTICA_DATABASE', 'docker'),
6060
}
61+
62+
RABBITMQ_CONFIG = {
63+
'host': os.getenv('TEST_RABBITMQ_HOST', '127.0.0.1'),
64+
'user': os.getenv('TEST_RABBITMQ_USER', 'guest'),
65+
'password': os.getenv('TEST_RABBITMQ_PASSWORD', 'guest'),
66+
'port': int(os.getenv("TEST_RABBITMQ_PORT", 5672)),
67+
}

tests/contrib/kombu/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)