Skip to content

Commit e071c7a

Browse files
committed
initial cassandra instrumentation (#205)
This instruments `Cluster.connect` and `Session.execute`. The test matrix only includes version 3.4 and 3.14+, as earlier versions of the client don't work on Python 2.7.10+. closes #205
1 parent ab80891 commit e071c7a

File tree

14 files changed

+185
-1
lines changed

14 files changed

+185
-1
lines changed

CHANGELOG.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
https://github.com/elastic/apm-agent-python/compare/v2.1.1\...master[Check the HEAD diff]
1414

1515
* introduced consistent logger name scheme for all elasticapm internal log messages ({pull}212[#212])
16+
* added instrumentation of cassandra-driver ({pull}205[#205])
1617
* added instrumentation of elasticsearch-py ({pull}191[#191])
1718
* added Flask 1.0 to the test matrix ({pull}207[#207])
1819
* fixed an issue with our minimalistic SQL parser and "fully qualified" table names ({pull}206[#206])
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
from elasticapm.instrumentation.packages.base import AbstractInstrumentedModule
2+
from elasticapm.instrumentation.packages.dbapi2 import extract_signature
3+
from elasticapm.traces import capture_span
4+
from elasticapm.utils import compat
5+
6+
7+
class CassandraInstrumentation(AbstractInstrumentedModule):
8+
name = 'cassandra'
9+
10+
instrument_list = [
11+
("cassandra.cluster", "Session.execute"),
12+
("cassandra.cluster", "Cluster.connect"),
13+
]
14+
15+
def call(self, module, method, wrapped, instance, args, kwargs):
16+
name = self.get_wrapped_name(wrapped, instance, method)
17+
context = None
18+
if method == "Cluster.connect":
19+
span_type = 'db.cassandra.connect'
20+
else:
21+
span_type = 'db.cassandra.query'
22+
query = args[0] if args else kwargs.get('query')
23+
if hasattr(query, 'query_string'):
24+
query_str = query.query_string
25+
elif hasattr(query, 'prepared_statement') and hasattr(query.prepared_statement, 'query'):
26+
query_str = query.prepared_statement.query
27+
elif isinstance(query, compat.string_types):
28+
query_str = query
29+
else:
30+
query_str = None
31+
if query_str:
32+
name = extract_signature(query_str)
33+
context = {'db': {"type": "sql", "statement": query_str}}
34+
35+
with capture_span(name, span_type, context):
36+
return wrapped(*args, **kwargs)

elasticapm/instrumentation/register.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
'elasticapm.instrumentation.packages.urllib3.Urllib3Instrumentation',
1919
'elasticapm.instrumentation.packages.elasticsearch.ElasticsearchConnectionInstrumentation',
2020
'elasticapm.instrumentation.packages.elasticsearch.ElasticsearchInstrumentation',
21+
'elasticapm.instrumentation.packages.cassandra.CassandraInstrumentation',
2122

2223
'elasticapm.instrumentation.packages.django.template.DjangoTemplateInstrumentation',
2324
'elasticapm.instrumentation.packages.django.template.DjangoTemplateSourceInstrumentation',

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ multi_line_output=0
2828
known_standard_library=importlib,types,asyncio
2929
known_django=django
3030
known_first_party=elasticapm,tests
31-
known_third_party=pytest,flask,aiohttp,urllib3_mock,webob,memcache,pymongo,boto3,logbook,twisted,celery,zope,urllib3,redis,jinja2,requests,certifi,mock,jsonschema,werkzeug,pytest_localserver,psycopg2,async_timeout
31+
known_third_party=pytest,flask,aiohttp,urllib3_mock,webob,memcache,pymongo,boto3,cassandra,logbook,twisted,celery,zope,urllib3,redis,jinja2,requests,certifi,mock,jsonschema,werkzeug,pytest_localserver,psycopg2,async_timeout
3232
default_section=FIRSTPARTY
3333
sections=FUTURE,STDLIB,DJANGO,THIRDPARTY,FIRSTPARTY,LOCALFOLDER
3434

tests/.jenkins_framework.yml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,5 @@ FRAMEWORK:
1919
- elasticsearch-2
2020
- elasticsearch-5
2121
- elasticsearch-6
22+
- cassandra-newest
23+

tests/.jenkins_framework_full.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,7 @@ FRAMEWORK:
4444
- psycopg2-newest
4545
- memcached-1.51
4646
- memcached-newest
47+
- cassandra-3.4
48+
- cassandra-newest
49+
4750

tests/docker-compose.yml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,14 @@ services:
1717
timeout: 30s
1818
retries: 3
1919

20+
cassandra3:
21+
image: cassandra:3
22+
volumes:
23+
- cassandradata3:/var/lib/cassandra
24+
environment:
25+
MAX_HEAP_SIZE: "1G"
26+
HEAP_NEWSIZE: 400m
27+
2028
mongodb30:
2129
image: mongo:3.0
2230
volumes:
@@ -119,3 +127,5 @@ volumes:
119127
driver: local
120128
pyesdata2:
121129
driver: local
130+
cassandradata3:
131+
driver: local
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
import pytest # isort:skip
2+
pytest.importorskip("cassandra") # isort:skip
3+
4+
import os
5+
6+
from cassandra.cluster import Cluster
7+
from cassandra.query import SimpleStatement
8+
9+
from elasticapm.instrumentation.packages.dbapi2 import extract_signature
10+
11+
pytestmark = pytest.mark.cassandra
12+
13+
14+
@pytest.fixture()
15+
def cassandra_cluster():
16+
cluster = Cluster(
17+
[os.environ.get('CASSANDRA_HOST', 'localhost')]
18+
)
19+
yield cluster
20+
del cluster
21+
22+
23+
@pytest.fixture()
24+
def cassandra_session(cassandra_cluster):
25+
session = cassandra_cluster.connect()
26+
session.execute("""
27+
CREATE KEYSPACE testkeyspace
28+
WITH REPLICATION = { 'class' : 'SimpleStrategy' ,'replication_factor' : 1 }
29+
""")
30+
session.execute('USE testkeyspace;')
31+
session.execute('CREATE TABLE testkeyspace.users ( id UUID PRIMARY KEY, name text);')
32+
yield session
33+
session.execute('DROP KEYSPACE testkeyspace;')
34+
35+
36+
def test_cassandra_connect(instrument, elasticapm_client, cassandra_cluster):
37+
elasticapm_client.begin_transaction("transaction.test")
38+
sess = cassandra_cluster.connect()
39+
elasticapm_client.end_transaction("test")
40+
41+
transactions = elasticapm_client.instrumentation_store.get_all()
42+
span = transactions[0]['spans'][0]
43+
44+
assert span['type'] == 'db.cassandra.connect'
45+
assert span['duration'] > 0
46+
assert span['name'] == 'Cluster.connect'
47+
48+
49+
def test_select_query_string(instrument, cassandra_session, elasticapm_client):
50+
elasticapm_client.begin_transaction("transaction.test")
51+
cassandra_session.execute('SELECT name from users')
52+
elasticapm_client.end_transaction("test")
53+
transaction = elasticapm_client.instrumentation_store.get_all()[0]
54+
span = transaction['spans'][0]
55+
assert span['type'] == 'db.cassandra.query'
56+
assert span['name'] == 'SELECT FROM users'
57+
assert span['context'] == {'db': {'statement': 'SELECT name from users', 'type': 'sql'}}
58+
59+
60+
def test_select_simple_statement(instrument, cassandra_session, elasticapm_client):
61+
statement = SimpleStatement('SELECT name from users')
62+
elasticapm_client.begin_transaction("transaction.test")
63+
cassandra_session.execute(statement)
64+
elasticapm_client.end_transaction("test")
65+
transaction = elasticapm_client.instrumentation_store.get_all()[0]
66+
span = transaction['spans'][0]
67+
assert span['type'] == 'db.cassandra.query'
68+
assert span['name'] == 'SELECT FROM users'
69+
assert span['context'] == {'db': {'statement': 'SELECT name from users', 'type': 'sql'}}
70+
71+
72+
def test_select_prepared_statement(instrument, cassandra_session, elasticapm_client):
73+
prepared_statement = cassandra_session.prepare('SELECT name from users')
74+
elasticapm_client.begin_transaction("transaction.test")
75+
cassandra_session.execute(prepared_statement)
76+
elasticapm_client.end_transaction("test")
77+
transaction = elasticapm_client.instrumentation_store.get_all()[0]
78+
span = transaction['spans'][0]
79+
assert span['type'] == 'db.cassandra.query'
80+
assert span['name'] == 'SELECT FROM users'
81+
assert span['context'] == {'db': {'statement': 'SELECT name from users', 'type': 'sql'}}
82+
83+
84+
def test_signature_create_keyspace():
85+
assert extract_signature(
86+
"CREATE KEYSPACE testkeyspace WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };"
87+
) == 'CREATE KEYSPACE'
88+
89+
90+
def test_signature_create_columnfamily():
91+
assert extract_signature(
92+
"""CREATE COLUMNFAMILY users (
93+
userid text PRIMARY KEY,
94+
first_name text,
95+
last_name text,
96+
emails set<text>,
97+
top_scores list<int>,
98+
todo map<timestamp, text>
99+
);"""
100+
) == 'CREATE COLUMNFAMILY'
101+
102+
103+
def test_select_from_collection():
104+
assert extract_signature(
105+
"SELECT first, last FROM a.b WHERE id = 1;"
106+
) == "SELECT FROM a.b"
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
cassandra-driver>=3.4.0,<3.5.0
2+
-r requirements-base.txt
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
cassandra-driver>=3.14.0
2+
-r requirements-base.txt

0 commit comments

Comments
 (0)