|
| 1 | +""" |
| 2 | +Regenerate internal ssl certificates for tests |
| 3 | +""" |
| 4 | + |
| 5 | +# PEP 773 dependencies |
| 6 | +# /// script |
| 7 | +# dependencies = [ |
| 8 | +# "certipy", |
| 9 | +# ] |
| 10 | +# /// |
| 11 | + |
| 12 | +import asyncio |
| 13 | +import shutil |
| 14 | +import ssl |
| 15 | +from pathlib import Path |
| 16 | + |
| 17 | +from certipy import Certipy |
| 18 | + |
| 19 | +ssl_dir = Path(__file__).parent.resolve() / "ssl" |
| 20 | +port = 12345 |
| 21 | + |
| 22 | + |
| 23 | +def make_certs(): |
| 24 | + """Create certificates for proxy client and ssl backend""" |
| 25 | + # start fresh |
| 26 | + shutil.rmtree(ssl_dir) |
| 27 | + alt_names = [ |
| 28 | + "IP:127.0.0.1", |
| 29 | + "IP:0:0:0:0:0:0:0:1", |
| 30 | + "DNS:localhost", |
| 31 | + ] |
| 32 | + certipy = Certipy(store_dir=ssl_dir) |
| 33 | + _trust_bundles = certipy.trust_from_graph({ |
| 34 | + "backend-ca": ["proxy-client-ca"], |
| 35 | + "proxy-client-ca": ["backend-ca"], |
| 36 | + }) |
| 37 | + for name in ("backend", "proxy-client"): |
| 38 | + certipy.create_signed_pair( |
| 39 | + name, f"{name}-ca", alt_names=alt_names |
| 40 | + ) |
| 41 | + |
| 42 | + |
| 43 | + |
| 44 | +async def client_connected(reader, writer): |
| 45 | + """Callback for ssl server""" |
| 46 | + print("client connected") |
| 47 | + msg = await reader.read(5) |
| 48 | + print("server received", msg.decode()) |
| 49 | + writer.write(b"pong") |
| 50 | + |
| 51 | + |
| 52 | +async def ssl_backend(): |
| 53 | + """Run a test ssl server""" |
| 54 | + ssl_context = ssl.create_default_context( |
| 55 | + ssl.Purpose.CLIENT_AUTH, cafile=ssl_dir / "backend-ca_trust.crt" |
| 56 | + ) |
| 57 | + ssl_context.verify_mode = ssl.CERT_REQUIRED |
| 58 | + ssl_context.load_default_certs() |
| 59 | + ssl_context.load_cert_chain( |
| 60 | + ssl_dir / "backend/backend.crt", ssl_dir / "backend/backend.key" |
| 61 | + ) |
| 62 | + await asyncio.start_server( |
| 63 | + client_connected, host="localhost", port=port, ssl=ssl_context |
| 64 | + ) |
| 65 | + |
| 66 | + |
| 67 | +async def ssl_client(): |
| 68 | + """Run a test ssl client""" |
| 69 | + ssl_context = ssl.create_default_context( |
| 70 | + ssl.Purpose.SERVER_AUTH, cafile=ssl_dir / "proxy-client-ca_trust.crt" |
| 71 | + ) |
| 72 | + ssl_context.verify_mode = ssl.CERT_REQUIRED |
| 73 | + ssl_context.load_default_certs() |
| 74 | + ssl_context.check_hostname = True |
| 75 | + ssl_context.load_cert_chain( |
| 76 | + ssl_dir / "proxy-client/proxy-client.crt", |
| 77 | + ssl_dir / "proxy-client/proxy-client.key", |
| 78 | + ) |
| 79 | + reader, writer = await asyncio.open_connection("localhost", port, ssl=ssl_context) |
| 80 | + writer.write(b"ping") |
| 81 | + msg = await reader.read(5) |
| 82 | + print("client received", msg.decode()) |
| 83 | + |
| 84 | + |
| 85 | +async def main(): |
| 86 | + # make the certs |
| 87 | + print(f"Making internal ssl certificates in {ssl_dir}") |
| 88 | + make_certs() |
| 89 | + print("Testing internal ssl setup") |
| 90 | + await ssl_backend() |
| 91 | + await ssl_client() |
| 92 | + print("OK") |
| 93 | + |
| 94 | + |
| 95 | +if __name__ == "__main__": |
| 96 | + asyncio.run(main()) |
0 commit comments