Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 0 additions & 83 deletions Lib/test/test_httpservers.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import html
import http, http.client
import urllib.parse
import urllib.request
import tempfile
import time
import datetime
Expand All @@ -34,8 +33,6 @@
from test.support import (
is_apple, import_helper, os_helper, threading_helper
)
from test.support.script_helper import kill_python, spawn_python
from test.support.socket_helper import find_unused_port

try:
import ssl
Expand Down Expand Up @@ -1455,86 +1452,6 @@ def test_unknown_flag(self, _):
self.assertIn('error', stderr.getvalue())


class CommandLineRunTimeTestCase(unittest.TestCase):
served_data = os.urandom(32)
served_file_name = 'served_filename'
tls_cert = certdata_file('ssl_cert.pem')
tls_key = certdata_file('ssl_key.pem')
tls_password = 'somepass'

def setUp(self):
super().setUp()
with open(self.served_file_name, 'wb') as f:
f.write(self.served_data)
self.addCleanup(os_helper.unlink, self.served_file_name)
self.tls_password_file = tempfile.mktemp()
with open(self.tls_password_file, 'wb') as f:
f.write(self.tls_password.encode())
self.addCleanup(os_helper.unlink, self.tls_password_file)

def fetch_file(self, path):
context = ssl.create_default_context()
# allow self-signed certificates
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
req = urllib.request.Request(path, method='GET')
with urllib.request.urlopen(req, context=context) as res:
return res.read()

def parse_cli_output(self, output):
matches = re.search(r'\((https?)://([^/:]+):(\d+)/?\)', output)
if matches is None:
return None, None, None
return matches.group(1), matches.group(2), int(matches.group(3))

def wait_for_server(self, proc, protocol, port, bind, timeout=50):
"""Check the server process output.

Return True if the server was successfully started
and is listening on the given port and bind address.
"""
while timeout > 0:
line = proc.stdout.readline()
if not line:
time.sleep(0.1)
timeout -= 1
continue
protocol_, host_, port_ = self.parse_cli_output(line)
if not protocol_ or not host_ or not port_:
time.sleep(0.1)
timeout -= 1
continue
if protocol_ == protocol and host_ == bind and port_ == port:
return True
break
return False

def test_http_client(self):
port = find_unused_port()
bind = '127.0.0.1'
proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind,
bufsize=1, text=True)
self.addCleanup(kill_python, proc)
self.addCleanup(proc.terminate)
self.assertTrue(self.wait_for_server(proc, 'http', port, bind))
res = self.fetch_file(f'http://{bind}:{port}/{self.served_file_name}')
self.assertEqual(res, self.served_data)

def test_https_client(self):
port = find_unused_port()
bind = '127.0.0.1'
proc = spawn_python('-u', '-m', 'http.server', str(port), '-b', bind,
'--tls-cert', self.tls_cert,
'--tls-key', self.tls_key,
'--tls-password-file', self.tls_password_file,
bufsize=1, text=True)
self.addCleanup(kill_python, proc)
self.addCleanup(proc.terminate)
self.assertTrue(self.wait_for_server(proc, 'https', port, bind))
res = self.fetch_file(f'https://{bind}:{port}/{self.served_file_name}')
self.assertEqual(res, self.served_data)


def setUpModule():
unittest.addModuleCleanup(os.chdir, os.getcwd())

Expand Down
Loading