Skip to content
This repository was archived by the owner on Jan 13, 2021. It is now read-only.

Commit 6a12f2d

Browse files
committed
Add some tests for client certs.
1 parent 6d37bb3 commit 6a12f2d

File tree

2 files changed

+98
-27
lines changed

2 files changed

+98
-27
lines changed

test/test_integration.py

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import requests
1010
import threading
1111
import time
12-
import socket
1312
import hyper
1413
import hyper.http11.connection
1514
import pytest
@@ -26,7 +25,6 @@
2625
REQUEST_CODES, REQUEST_CODES_LENGTH
2726
)
2827
from hyper.http20.exceptions import ConnectionError, StreamResetError
29-
from hyper.tls import wrap_socket
3028
from server import SocketLevelTest
3129

3230
# Turn off certificate verification for the tests.
@@ -76,31 +74,6 @@ def receive_preamble(sock):
7674
return
7775

7876

79-
class TestBasicSocketManipulation(SocketLevelTest):
80-
# These aren't HTTP/2 tests, but it doesn't hurt to leave it.
81-
h2 = True
82-
83-
def test_connection_string(self):
84-
self.set_up()
85-
evt = threading.Event()
86-
87-
def socket_handler(listener):
88-
sock = listener.accept()[0]
89-
90-
evt.wait(5)
91-
sock.close()
92-
93-
self._start_server(socket_handler)
94-
s = socket.create_connection((self.host, self.port))
95-
s, proto = wrap_socket(s, "localhost", force_proto=b"test")
96-
s.close()
97-
evt.set()
98-
99-
assert proto == b"test"
100-
101-
self.tear_down()
102-
103-
10477
class TestHyperIntegration(SocketLevelTest):
10578
# These are HTTP/2 tests.
10679
h2 = True

test/test_ssl_socket.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# -*- coding: utf-8 -*-
2+
"""
3+
test/test_ssl_socket
4+
~~~~~~~~~~~~~~~~~~~~
5+
6+
This file defines tests for hyper that validate our TLS handling.
7+
"""
8+
import os
9+
import socket
10+
import ssl
11+
import threading
12+
13+
import pytest
14+
15+
from hyper.tls import wrap_socket, init_context
16+
17+
from server import SocketLevelTest
18+
19+
20+
TEST_DIR = os.path.abspath(os.path.dirname(__file__))
21+
TEST_CERTS_DIR = os.path.join(TEST_DIR, "certs")
22+
CLIENT_CERT_FILE = os.path.join(TEST_CERTS_DIR, 'client.crt')
23+
CLIENT_KEY_FILE = os.path.join(TEST_CERTS_DIR, 'client.key')
24+
CLIENT_PEM_FILE = os.path.join(TEST_CERTS_DIR, 'nopassword.pem')
25+
SERVER_CERT_FILE = os.path.join(TEST_CERTS_DIR, 'server.crt')
26+
SERVER_KEY_FILE = os.path.join(TEST_CERTS_DIR, 'server.key')
27+
28+
29+
class TestBasicSocketManipulation(SocketLevelTest):
30+
# These aren't HTTP/2 tests, but it doesn't hurt to leave it.
31+
h2 = True
32+
33+
def test_connection_string(self):
34+
self.set_up()
35+
evt = threading.Event()
36+
37+
def socket_handler(listener):
38+
sock = listener.accept()[0]
39+
40+
evt.wait(5)
41+
sock.close()
42+
43+
self._start_server(socket_handler)
44+
s = socket.create_connection((self.host, self.port))
45+
s, proto = wrap_socket(s, "localhost", force_proto=b"test")
46+
s.close()
47+
evt.set()
48+
49+
assert proto == b"test"
50+
51+
self.tear_down()
52+
53+
@pytest.mark.parametrize(
54+
'context_kwargs',
55+
[
56+
{'cert': CLIENT_PEM_FILE},
57+
{
58+
'cert': (CLIENT_CERT_FILE, CLIENT_KEY_FILE),
59+
'cert_password': b'abc123'
60+
},
61+
]
62+
)
63+
def test_client_certificate(self, context_kwargs):
64+
# Don't have the server thread do TLS: we'll do it ourselves.
65+
self.set_up(secure=False)
66+
certs = []
67+
evt = threading.Event()
68+
69+
def socket_handler(listener):
70+
sock = listener.accept()[0]
71+
sock = ssl.wrap_socket(
72+
sock,
73+
ssl_version=ssl.PROTOCOL_SSLv23,
74+
certfile=SERVER_CERT_FILE,
75+
keyfile=SERVER_KEY_FILE,
76+
cert_reqs=ssl.CERT_REQUIRED,
77+
ca_certs=CLIENT_PEM_FILE,
78+
server_side=True
79+
)
80+
certs.append(sock.getpeercert())
81+
evt.wait(5)
82+
sock.close()
83+
84+
self._start_server(socket_handler)
85+
86+
# Set up the client context. Don't validate the server cert though.
87+
context = init_context(**context_kwargs)
88+
context.check_hostname = False
89+
context.verify_mode = ssl.CERT_NONE
90+
91+
s = socket.create_connection((self.host, self.port))
92+
s, proto = wrap_socket(s, "localhost", ssl_context=context)
93+
s.close()
94+
evt.set()
95+
96+
assert len(certs) == 1
97+
98+
self.tear_down()

0 commit comments

Comments
 (0)