Skip to content

Commit 615e490

Browse files
authored
Merge pull request datastax#933 from datastax/python-343
PYTHON-343 SSL support for Twisted
2 parents 4827711 + ff2acde commit 615e490

File tree

7 files changed

+120
-31
lines changed

7 files changed

+120
-31
lines changed

CHANGELOG.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ Features
66
* Add one() function to the ResultSet API (PYTHON-947)
77
* Create an utility function to fetch concurrently many keys from the same replica (PYTHON-647)
88
* Allow filter queries with fields that have an index managed outside of cqlengine (PYTHON-966)
9+
* Twisted SSL Support (PYTHON-343)
910
* Support IS NOT NULL operator in cqlengine (PYTHON-968)
1011

1112
Other

build.yaml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -118,13 +118,19 @@ build:
118118
export PATH=$JAVA_HOME/bin:$PATH
119119
export PYTHONPATH=""
120120
121+
# Install latest setuptools
122+
pip install --upgrade pip
123+
pip install -U setuptools
124+
121125
pip install git+https://github.com/pcmanus/ccm.git
122126
# Install dependencies
123-
sudo apt-get install -y libev4 libev-dev
127+
sudo apt-get install -y libev4 libev-dev libssl-dev
124128
125129
pip install -r test-requirements.txt
126130
pip install nose-ignore-docstring
127131
pip install nose-exclude
132+
pip install service_identity
133+
128134
FORCE_CYTHON=False
129135
if [[ $CYTHON == 'CYTHON' ]]; then
130136
FORCE_CYTHON=True

cassandra/io/twistedreactor.py

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ class TwistedConnectionProtocol(protocol.Protocol):
4242
made events.
4343
"""
4444

45+
def __init__(self):
46+
self.connection = None
47+
4548
def dataReceived(self, data):
4649
"""
4750
Callback function that is called when data has been received
@@ -50,21 +53,27 @@ def dataReceived(self, data):
5053
Reaches back to the Connection object and queues the data for
5154
processing.
5255
"""
53-
self.transport.connector.factory.conn._iobuf.write(data)
54-
self.transport.connector.factory.conn.handle_read()
55-
56+
self.connection._iobuf.write(data)
57+
self.connection.handle_read()
5658
def connectionMade(self):
5759
"""
5860
Callback function that is called when a connection has succeeded.
5961
6062
Reaches back to the Connection object and confirms that the connection
6163
is ready.
6264
"""
63-
self.transport.connector.factory.conn.client_connection_made()
65+
try:
66+
# Non SSL connection
67+
self.connection = self.transport.connector.factory.conn
68+
except AttributeError:
69+
# SSL connection
70+
self.connection = self.transport.connector.factory.wrappedFactory.conn
71+
72+
self.connection.client_connection_made(self.transport)
6473

6574
def connectionLost(self, reason):
6675
# reason is a Failure instance
67-
self.transport.connector.factory.conn.defunct(reason.value)
76+
self.connection.defunct(reason.value)
6877

6978

7079
class TwistedConnectionClientFactory(protocol.ClientFactory):
@@ -157,6 +166,49 @@ def _on_loop_timer(self):
157166
self._schedule_timeout(self._timers.next_timeout)
158167

159168

169+
try:
170+
from twisted.internet import ssl
171+
import OpenSSL.crypto
172+
from OpenSSL.crypto import load_certificate, FILETYPE_PEM
173+
174+
class _SSLContextFactory(ssl.ClientContextFactory):
175+
def __init__(self, ssl_options, check_hostname, host):
176+
self.ssl_options = ssl_options
177+
self.check_hostname = check_hostname
178+
self.host = host
179+
180+
def getContext(self):
181+
# This version has to be OpenSSL.SSL.DESIRED_VERSION
182+
# instead of ssl.DESIRED_VERSION as in other loops
183+
self.method = self.ssl_options["ssl_version"]
184+
context = ssl.ClientContextFactory.getContext(self)
185+
if "certfile" in self.ssl_options:
186+
context.use_certificate_file(self.ssl_options["certfile"])
187+
if "keyfile" in self.ssl_options:
188+
context.use_privatekey_file(self.ssl_options["keyfile"])
189+
if "ca_certs" in self.ssl_options:
190+
x509 = load_certificate(FILETYPE_PEM, open(self.ssl_options["ca_certs"]).read())
191+
store = context.get_cert_store()
192+
store.add_cert(x509)
193+
if "cert_reqs" in self.ssl_options:
194+
# This expects OpenSSL.SSL.VERIFY_NONE/OpenSSL.SSL.VERIFY_PEER
195+
# or OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT
196+
context.set_verify(self.ssl_options["cert_reqs"],
197+
callback=self.verify_callback)
198+
return context
199+
200+
def verify_callback(self, connection, x509, errnum, errdepth, ok):
201+
if ok:
202+
if self.check_hostname and self.host != x509.get_subject().commonName:
203+
return False
204+
return ok
205+
206+
_HAS_SSL = True
207+
208+
except ImportError as e:
209+
_HAS_SSL = False
210+
211+
160212
class TwistedConnection(Connection):
161213
"""
162214
An implementation of :class:`.Connection` that utilizes the
@@ -189,6 +241,7 @@ def __init__(self, *args, **kwargs):
189241

190242
self.is_closed = True
191243
self.connector = None
244+
self.transport = None
192245

193246
reactor.callFromThread(self.add_connection)
194247
self._loop.maybe_start()
@@ -198,18 +251,33 @@ def add_connection(self):
198251
Convenience function to connect and store the resulting
199252
connector.
200253
"""
201-
self.connector = reactor.connectTCP(
202-
host=self.host, port=self.port,
203-
factory=TwistedConnectionClientFactory(self),
204-
timeout=self.connect_timeout)
205-
206-
def client_connection_made(self):
254+
if self.ssl_options:
255+
256+
if not _HAS_SSL:
257+
raise ImportError(
258+
str(e) +
259+
', pyOpenSSL must be installed to enable SSL support with the Twisted event loop'
260+
)
261+
262+
self.connector = reactor.connectSSL(
263+
host=self.host, port=self.port,
264+
factory=TwistedConnectionClientFactory(self),
265+
contextFactory=_SSLContextFactory(self.ssl_options, self._check_hostname, self.host),
266+
timeout=self.connect_timeout)
267+
else:
268+
self.connector = reactor.connectTCP(
269+
host=self.host, port=self.port,
270+
factory=TwistedConnectionClientFactory(self),
271+
timeout=self.connect_timeout)
272+
273+
def client_connection_made(self, transport):
207274
"""
208275
Called by twisted protocol when a connection attempt has
209276
succeeded.
210277
"""
211278
with self.lock:
212279
self.is_closed = False
280+
self.transport = transport
213281
self._send_options_message()
214282

215283
def close(self):
@@ -222,7 +290,7 @@ def close(self):
222290
self.is_closed = True
223291

224292
log.debug("Closing connection (%s) to %s", id(self), self.host)
225-
self.connector.disconnect()
293+
reactor.callFromThread(self.connector.disconnect)
226294
log.debug("Closed socket to %s", self.host)
227295

228296
if not self.is_defunct:
@@ -246,4 +314,4 @@ def push(self, data):
246314
it is not thread-safe, so we schedule it to run from within
247315
the event loop when it gets the chance.
248316
"""
249-
reactor.callFromThread(self.connector.transport.write, data)
317+
reactor.callFromThread(self.transport.write, data)

docs/security.rst

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,3 +87,6 @@ This is only an example to show how to pass the ssl parameters. Consider reading
8787
the `python ssl documentation <https://docs.python.org/2/library/ssl.html#ssl.wrap_socket>`_ for
8888
your configuration. For further reading, Andrew Mussey has published a thorough guide on
8989
`Using SSL with the DataStax Python driver <http://blog.amussey.com/post/64036730812/cassandra-2-0-client-server-ssl-with-datastax-python>`_.
90+
91+
*Note*: In case the twisted event loop is used pyOpenSSL must be installed or an exception will be risen. Also
92+
to set the ``ssl_version`` and ``cert_reqs`` in ``ssl_opts`` the appropriate constants from pyOpenSSL are expected.

test-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ PyYAML
88
pytz
99
sure
1010
pure-sasl
11-
twisted
11+
twisted[tls]
1212
gevent>=1.0
1313
eventlet
1414
cython>=0.20,<0.28

tests/integration/long/test_ssl.py

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
from cassandra.cluster import Cluster, NoHostAvailable
2222
from cassandra import ConsistencyLevel
2323
from cassandra.query import SimpleStatement
24-
from tests.integration import PROTOCOL_VERSION, get_cluster, remove_cluster, use_single_node
24+
from tests.integration import PROTOCOL_VERSION, get_cluster, remove_cluster, use_single_node, EVENT_LOOP_MANAGER
2525

2626
log = logging.getLogger(__name__)
2727

@@ -37,6 +37,16 @@
3737
DRIVER_CERTFILE = "tests/integration/long/ssl/driver.pem"
3838
DRIVER_CERTFILE_BAD = "tests/integration/long/ssl/python_driver_bad.pem"
3939

40+
if "twisted" in EVENT_LOOP_MANAGER:
41+
import OpenSSL
42+
ssl_version = OpenSSL.SSL.TLSv1_METHOD
43+
verify_certs = {'cert_reqs': OpenSSL.SSL.VERIFY_PEER,
44+
'check_hostname': True}
45+
46+
else:
47+
ssl_version = ssl.PROTOCOL_TLSv1
48+
verify_certs = {'cert_reqs': ssl.CERT_REQUIRED,
49+
'check_hostname': True}
4050

4151
def setup_cluster_ssl(client_auth=False):
4252
"""
@@ -130,7 +140,7 @@ def test_can_connect_with_ssl_ca(self):
130140

131141
# find absolute path to client CA_CERTS
132142
abs_path_ca_cert_path = os.path.abspath(CLIENT_CA_CERTS)
133-
ssl_options = {'ca_certs': abs_path_ca_cert_path,'ssl_version': ssl.PROTOCOL_TLSv1}
143+
ssl_options = {'ca_certs': abs_path_ca_cert_path,'ssl_version': ssl_version}
134144
validate_ssl_options(ssl_options=ssl_options)
135145

136146
def test_can_connect_with_ssl_long_running(self):
@@ -147,7 +157,7 @@ def test_can_connect_with_ssl_long_running(self):
147157
# find absolute path to client CA_CERTS
148158
abs_path_ca_cert_path = os.path.abspath(CLIENT_CA_CERTS)
149159
ssl_options = {'ca_certs': abs_path_ca_cert_path,
150-
'ssl_version': ssl.PROTOCOL_TLSv1}
160+
'ssl_version': ssl_version}
151161
tries = 0
152162
while True:
153163
if tries > 5:
@@ -187,9 +197,8 @@ def test_can_connect_with_ssl_ca_host_match(self):
187197
# find absolute path to client CA_CERTS
188198
abs_path_ca_cert_path = os.path.abspath(CLIENT_CA_CERTS)
189199
ssl_options = {'ca_certs': abs_path_ca_cert_path,
190-
'ssl_version': ssl.PROTOCOL_TLSv1,
191-
'cert_reqs': ssl.CERT_REQUIRED,
192-
'check_hostname': True}
200+
'ssl_version': ssl_version}
201+
ssl_options.update(verify_certs)
193202

194203
validate_ssl_options(ssl_options=ssl_options)
195204

@@ -225,7 +234,7 @@ def test_can_connect_with_ssl_client_auth(self):
225234
abs_driver_keyfile = os.path.abspath(DRIVER_KEYFILE)
226235
abs_driver_certfile = os.path.abspath(DRIVER_CERTFILE)
227236
ssl_options = {'ca_certs': abs_path_ca_cert_path,
228-
'ssl_version': ssl.PROTOCOL_TLSv1,
237+
'ssl_version': ssl_version,
229238
'keyfile': abs_driver_keyfile,
230239
'certfile': abs_driver_certfile}
231240
validate_ssl_options(ssl_options)
@@ -251,11 +260,11 @@ def test_can_connect_with_ssl_client_auth_host_name(self):
251260
abs_driver_certfile = os.path.abspath(DRIVER_CERTFILE)
252261

253262
ssl_options = {'ca_certs': abs_path_ca_cert_path,
254-
'ssl_version': ssl.PROTOCOL_TLSv1,
263+
'ssl_version': ssl_version,
255264
'keyfile': abs_driver_keyfile,
256-
'certfile': abs_driver_certfile,
257-
'cert_reqs': ssl.CERT_REQUIRED,
258-
'check_hostname': True}
265+
'certfile': abs_driver_certfile}
266+
ssl_options.update(verify_certs)
267+
259268
validate_ssl_options(ssl_options)
260269

261270
def test_cannot_connect_without_client_auth(self):
@@ -273,7 +282,7 @@ def test_cannot_connect_without_client_auth(self):
273282

274283
abs_path_ca_cert_path = os.path.abspath(CLIENT_CA_CERTS)
275284
cluster = Cluster(protocol_version=PROTOCOL_VERSION, ssl_options={'ca_certs': abs_path_ca_cert_path,
276-
'ssl_version': ssl.PROTOCOL_TLSv1})
285+
'ssl_version': ssl_version})
277286
# attempt to connect and expect an exception
278287

279288
with self.assertRaises(NoHostAvailable) as context:
@@ -300,7 +309,7 @@ def test_cannot_connect_with_bad_client_auth(self):
300309
abs_driver_certfile = os.path.abspath(DRIVER_CERTFILE_BAD)
301310

302311
cluster = Cluster(protocol_version=PROTOCOL_VERSION, ssl_options={'ca_certs': abs_path_ca_cert_path,
303-
'ssl_version': ssl.PROTOCOL_TLSv1,
312+
'ssl_version': ssl_version,
304313
'keyfile': abs_driver_keyfile,
305314
'certfile': abs_driver_certfile})
306315
with self.assertRaises(NoHostAvailable) as context:

tests/unit/io/test_twistedreactor.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,7 @@ def test_client_connection_made(self):
156156
client_connection_made()
157157
"""
158158
self.obj_ut._send_options_message = Mock()
159-
self.obj_ut.client_connection_made()
159+
self.obj_ut.client_connection_made(Mock())
160160
self.obj_ut._send_options_message.assert_called_with()
161161

162162
@patch('twisted.internet.reactor.connectTCP')
@@ -168,7 +168,7 @@ def test_close(self, mock_connectTCP):
168168
self.obj_ut.add_connection()
169169
self.obj_ut.is_closed = False
170170
self.obj_ut.close()
171-
self.obj_ut.connector.disconnect.assert_called_with()
171+
172172
self.assertTrue(self.obj_ut.connected_event.is_set())
173173
self.assertTrue(self.obj_ut.error_all_requests.called)
174174

@@ -217,6 +217,8 @@ def test_push(self, mock_connectTCP):
217217
Verifiy that push() calls transport.write(data).
218218
"""
219219
self.obj_ut.add_connection()
220+
transport_mock = Mock()
221+
self.obj_ut.transport = transport_mock
220222
self.obj_ut.push('123 pickup')
221223
self.mock_reactor_cft.assert_called_with(
222-
self.obj_ut.connector.transport.write, '123 pickup')
224+
transport_mock.write, '123 pickup')

0 commit comments

Comments
 (0)