Skip to content

Commit 0fd7416

Browse files
authored
Merge pull request #49 from DataDog/matt/mango
Matt/mango
2 parents 9df15cd + 966f128 commit 0fd7416

File tree

8 files changed

+246
-72
lines changed

8 files changed

+246
-72
lines changed

Rakefile

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,17 @@
11

2-
desc "run tests"
2+
desc "run all tests"
33
task :test do
4-
sh "python setup.py test"
4+
sh "tox"
5+
end
6+
7+
desc "Run tests with envs matching the given pattern."
8+
task :"test:envs", [:grep] do |t, args|
9+
pattern = args[:grep]
10+
if !pattern
11+
puts 'specify a pattern like rake test:envs["py27.*mongo"]'
12+
else
13+
sh "tox -l | grep '#{pattern}' | xargs tox -e"
14+
end
515
end
616

717
desc "install the library in dev mode"

ddtrace/compat.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,13 @@ def iteritems(obj, **kwargs):
3434
func = obj.items
3535
return func(**kwargs)
3636

37+
def to_unicode(s):
38+
""" Return a unicode string for the given bytes or string instance. """
39+
if hasattr(s, "decode"):
40+
return s.decode("utf-8")
41+
else:
42+
return stringify(s)
43+
3744
if PY2:
3845
numeric_types = (int, long, float)
3946
else:

ddtrace/contrib/pymongo/parse.py

Lines changed: 133 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,140 @@
11

2+
import ctypes
3+
import logging
4+
import struct
5+
6+
# 3p
7+
import bson
8+
from bson.codec_options import CodecOptions
9+
from bson.son import SON
10+
11+
# project
12+
from ...compat import to_unicode
13+
from ...ext import net as netx
14+
15+
16+
log = logging.getLogger(__name__)
17+
18+
19+
# MongoDB wire protocol commands
20+
# http://docs.mongodb.com/manual/reference/mongodb-wire-protocol
21+
OP_CODES = {
22+
1 : "reply",
23+
1000 : "msg",
24+
2001 : "update",
25+
2002 : "insert",
26+
2003 : "reserved",
27+
2004 : "query",
28+
2005 : "get_more",
29+
2006 : "delete",
30+
2007 : "kill_cursors",
31+
2010 : "command",
32+
2011 : "command_reply",
33+
}
34+
35+
# The maximum message length we'll try to parse
36+
MAX_MSG_PARSE_LEN = 1024 * 1024
37+
38+
header_struct = struct.Struct("<iiii")
39+
240

341
class Command(object):
442
""" Command stores information about a pymongo network command, """
543

6-
__slots__ = ['name', 'coll', 'tags', 'metrics', 'query']
44+
__slots__ = ['name', 'coll', 'db', 'tags', 'metrics', 'query']
745

8-
def __init__(self, name, coll):
46+
def __init__(self, name, db, coll):
947
self.name = name
1048
self.coll = coll
49+
self.db = db
1150
self.tags = {}
1251
self.metrics = {}
1352
self.query = None
1453

54+
def __repr__(self):
55+
return (
56+
"Command("
57+
"name=%s,"
58+
"db=%s,"
59+
"coll=%s)"
60+
) % (self.name, self.db, self.coll)
61+
62+
63+
def parse_msg(msg_bytes):
64+
""" Return a command from a binary mongo db message or None if we shoudln't
65+
trace it. The protocol is documented here:
66+
http://docs.mongodb.com/manual/reference/mongodb-wire-protocol
67+
"""
68+
# NOTE[matt] this is used for queries in pymongo <= 3.0.0 and for inserts
69+
# in up to date versions.
70+
msg_len = len(msg_bytes)
71+
if msg_len <= 0:
72+
return None
73+
74+
header = header_struct.unpack_from(msg_bytes, 0)
75+
(length, req_id, response_to, op_code) = header
76+
77+
op = OP_CODES.get(op_code)
78+
if not op:
79+
log.debug("unknown op code: %s", op_code)
80+
return None
81+
82+
db = None
83+
coll = None
84+
85+
offset = header_struct.size
86+
cmd = None
87+
if op == "query":
88+
# NOTE[matt] inserts, updates and queries can all use this opcode
89+
90+
offset += 4 # skip flags
91+
ns = _cstring(msg_bytes[offset:])
92+
offset += len(ns) + 1 # include null terminator
93+
94+
# note: here coll could be '$cmd' because it can be overridden in the
95+
# query itself (like {"insert":"songs"})
96+
db, coll = _split_namespace(ns)
97+
98+
offset += 8 # skip numberToSkip & numberToReturn
99+
if msg_len <= MAX_MSG_PARSE_LEN:
100+
# FIXME[matt] don't try to parse large messages for performance
101+
# reasons. ideally we'd just peek at the first bytes to get
102+
# the critical info (op type, collection, query, # of docs)
103+
# rather than parse the whole thing. i suspect only massive
104+
# inserts will be affected.
105+
codec = CodecOptions(SON)
106+
spec = next(bson.decode_iter(msg_bytes[offset:], codec_options=codec))
107+
cmd = parse_spec(spec, db)
108+
else:
109+
# let's still note that a command happened.
110+
cmd = Command("command", db, "untraced_message_too_large")
111+
112+
# If the command didn't contain namespace info, set it here.
113+
if not cmd.coll:
114+
cmd.coll = coll
115+
116+
cmd.metrics[netx.BYTES_OUT] = msg_len
117+
return cmd
15118

16119
def parse_query(query):
17120
""" Return a command parsed from the given mongo db query. """
18-
cmd = Command("query", query.coll)
121+
db, coll = None, None
122+
ns = getattr(query, "ns", None)
123+
if ns:
124+
# version < 3.1 stores the full namespace
125+
db, coll = _split_namespace(ns)
126+
else:
127+
# version >= 3.1 stores the db and coll seperately
128+
coll = getattr(query, "coll", None)
129+
db = getattr(query, "db", None)
130+
131+
# FIXME[matt] mongo < 3.1 _Query doesn't not have a name field,
132+
# so hardcode to query.
133+
cmd = Command("query", db, coll)
19134
cmd.query = query.spec
20135
return cmd
21136

22-
def parse_spec(spec):
137+
def parse_spec(spec, db=None):
23138
""" Return a Command that has parsed the relevant detail for the given
24139
pymongo SON spec.
25140
"""
@@ -29,7 +144,7 @@ def parse_spec(spec):
29144
if not items:
30145
return None
31146
name, coll = items[0]
32-
cmd = Command(name, coll)
147+
cmd = Command(name, db, coll)
33148

34149
if 'ordered' in spec: # in insert and update
35150
cmd.tags['mongodb.ordered'] = spec['ordered']
@@ -52,4 +167,17 @@ def parse_spec(spec):
52167

53168
return cmd
54169

170+
def _cstring(raw):
171+
""" Return the first null terminated cstring from the bufffer. """
172+
return ctypes.create_string_buffer(raw).value
55173

174+
def _split_namespace(ns):
175+
""" Return a tuple of (db, collecton) from the "db.coll" string. """
176+
if ns:
177+
# NOTE[matt] ns is unicode or bytes depending on the client version
178+
# so force cast to unicode
179+
split = to_unicode(ns).split(".", 1)
180+
if len(split) == 1:
181+
raise Exception("namespace doesn't contain period: %s" % ns)
182+
return split
183+
return (None, None)

ddtrace/contrib/pymongo/trace.py

Lines changed: 48 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@
77
from wrapt import ObjectProxy
88

99
# project
10-
from ...compat import iteritems
10+
from ...compat import iteritems, json
1111
from ...ext import AppTypes
1212
from ...ext import mongo as mongox
1313
from ...ext import net as netx
14-
from .parse import parse_spec, parse_query, Command
14+
from .parse import parse_spec, parse_query, parse_msg
1515

1616

1717
log = logging.getLogger(__name__)
@@ -39,38 +39,44 @@ def __init__(self, tracer, service, sock):
3939
def command(self, dbname, spec, *args, **kwargs):
4040
cmd = None
4141
try:
42-
cmd = parse_spec(spec)
42+
cmd = parse_spec(spec, dbname)
4343
except Exception:
4444
log.exception("error parsing spec. skipping trace")
4545

4646
# skip tracing if we don't have a piece of data we need
4747
if not dbname or not cmd:
4848
return self.__wrapped__.command(dbname, spec, *args, **kwargs)
4949

50-
with self.__trace(dbname, cmd):
50+
cmd.db = dbname
51+
with self.__trace(cmd):
5152
return self.__wrapped__.command(dbname, spec, *args, **kwargs)
5253

53-
def write_command(self, *args, **kwargs):
54-
# FIXME[matt] parse the db name and collection from the
55-
# message.
56-
coll = ""
57-
db = ""
58-
cmd = Command("insert_many", coll)
59-
with self.__trace(db, cmd) as s:
60-
s.resource = "insert_many"
61-
result = self.__wrapped__.write_command(*args, **kwargs)
54+
def write_command(self, request_id, msg):
55+
cmd = None
56+
try:
57+
cmd = parse_msg(msg)
58+
except Exception:
59+
log.exception("error parsing msg")
60+
61+
# if we couldn't parse it, don't try to trace it.
62+
if not cmd:
63+
return self.__wrapped__.write_command(request_id, msg)
64+
65+
with self.__trace(cmd) as s:
66+
s.resource = _resource_from_cmd(cmd)
67+
result = self.__wrapped__.write_command(request_id, msg)
6268
if result:
6369
s.set_metric(mongox.ROWS, result.get("n", -1))
6470
return result
6571

66-
def __trace(self, db, cmd):
72+
def __trace(self, cmd):
6773
s = self._tracer.trace(
6874
"pymongo.cmd",
6975
span_type=mongox.TYPE,
7076
service=self._srv)
7177

72-
if db:
73-
s.set_tag(mongox.DB, db)
78+
if cmd.db:
79+
s.set_tag(mongox.DB, cmd.db)
7480
if cmd:
7581
s.set_tag(mongox.COLLECTION, cmd.coll)
7682
s.set_tags(cmd.tags)
@@ -93,31 +99,35 @@ def __init__(self, tracer, service, topology):
9399
self._srv = service
94100

95101
def send_message_with_response(self, operation, *args, **kwargs):
96-
97-
# if we're processing something unexpected, just skip tracing.
98-
if getattr(operation, 'name', None) != 'find':
102+
cmd = None
103+
# Only try to parse something we think is a query.
104+
if self._is_query(operation):
105+
try:
106+
cmd = parse_query(operation)
107+
except Exception:
108+
log.exception("error parsing query")
109+
110+
# if we couldn't parse or shouldn't trace the message, just go.
111+
if not cmd:
99112
return self.__wrapped__.send_message_with_response(
100113
operation,
101114
*args,
102115
**kwargs)
103116

104-
# trace the given query.
105-
cmd = parse_query(operation)
106117
with self._tracer.trace(
107118
"pymongo.cmd",
108119
span_type=mongox.TYPE,
109120
service=self._srv) as span:
110121

111122
span.resource = _resource_from_cmd(cmd)
112-
span.set_tag(mongox.DB, operation.db)
123+
span.set_tag(mongox.DB, cmd.db)
113124
span.set_tag(mongox.COLLECTION, cmd.coll)
114125
span.set_tags(cmd.tags)
115126

116127
result = self.__wrapped__.send_message_with_response(
117128
operation,
118129
*args,
119-
**kwargs
120-
)
130+
**kwargs)
121131

122132
if result and result.address:
123133
_set_address_tags(span, result.address)
@@ -131,6 +141,12 @@ def get_socket(self, *args, **kwargs):
131141
else:
132142
yield TracedSocket(self._tracer, self._srv, s)
133143

144+
@staticmethod
145+
def _is_query(op):
146+
# NOTE: _Query should alwyas have a spec field
147+
return hasattr(op, 'spec')
148+
149+
134150
class TracedTopology(ObjectProxy):
135151

136152
_tracer = None
@@ -155,6 +171,10 @@ class TracedMongoClient(ObjectProxy):
155171
_srv = None
156172

157173
def __init__(self, tracer, service, client):
174+
# NOTE[matt] the TracedMongoClient attempts to trace all of the network
175+
# calls in the trace library. This is good because it measures the
176+
# actual network time. It's bad because it uses a private API which
177+
# could change. We'll see how this goes.
158178
client._topology = TracedTopology(tracer, service, client._topology)
159179
super(TracedMongoClient, self).__init__(client)
160180
self._tracer = tracer
@@ -189,6 +209,9 @@ def _set_address_tags(span, address):
189209
def _resource_from_cmd(cmd):
190210
if cmd.query is not None:
191211
nq = normalize_filter(cmd.query)
192-
return "%s %s %s" % (cmd.name, cmd.coll, nq)
212+
# needed to dump json so we don't get unicode
213+
# dict keys like {u'foo':'bar'}
214+
q = json.dumps(nq)
215+
return "%s %s %s" % (cmd.name, cmd.coll, q)
193216
else:
194217
return "%s %s" % (cmd.name, cmd.coll)

ddtrace/ext/net.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@
66
TARGET_HOST = "out.host"
77
TARGET_PORT = "out.port"
88

9+
BYTES_OUT = "net.out.bytes"

tests/contrib/mongoengine/test.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ def test_insert_update_delete_query():
8686
spans = tracer.writer.pop()
8787
eq_(len(spans), 1)
8888
span = spans[0]
89-
eq_(span.resource, "query artist {'first_name': '?'}")
89+
eq_(span.resource, 'query artist {"first_name": "?"}')
9090
eq_(span.span_type, 'mongodb')
9191
eq_(span.service, 'my-mongo')
9292
_assert_timing(span, start, end)
@@ -100,7 +100,7 @@ def test_insert_update_delete_query():
100100
spans = tracer.writer.pop()
101101
eq_(len(spans), 1)
102102
span = spans[0]
103-
eq_(span.resource, "update artist {'_id': '?'}")
103+
eq_(span.resource, 'update artist {"_id": "?"}')
104104
eq_(span.span_type, 'mongodb')
105105
eq_(span.service, 'my-mongo')
106106
_assert_timing(span, start, end)
@@ -113,7 +113,7 @@ def test_insert_update_delete_query():
113113
spans = tracer.writer.pop()
114114
eq_(len(spans), 1)
115115
span = spans[0]
116-
eq_(span.resource, "delete artist {'_id': '?'}")
116+
eq_(span.resource, 'delete artist {"_id": "?"}')
117117
eq_(span.span_type, 'mongodb')
118118
eq_(span.service, 'my-mongo')
119119
_assert_timing(span, start, end)

0 commit comments

Comments
 (0)