|
9 | 9 | """
|
10 | 10 |
|
11 | 11 | import logging
|
| 12 | +import re |
12 | 13 | import secrets
|
13 | 14 | import string
|
14 | 15 | import subprocess
|
@@ -63,6 +64,10 @@ def __init__(self, *args):
|
63 | 64 | getattr(self.on, "get_session_ssl_cipher_action"), self._get_session_ssl_cipher
|
64 | 65 | )
|
65 | 66 |
|
| 67 | + self.framework.observe( |
| 68 | + getattr(self.on, "get_server_certificate_action"), self._get_server_certificate |
| 69 | + ) |
| 70 | + |
66 | 71 | # Database related events
|
67 | 72 | self.database = DatabaseRequires(self, "database", DATABASE_NAME)
|
68 | 73 | self.framework.observe(
|
@@ -319,6 +324,37 @@ def _get_session_ssl_cipher(self, event: ActionEvent) -> None:
|
319 | 324 |
|
320 | 325 | event.set_results({"cipher": cipher})
|
321 | 326 |
|
| 327 | + def _get_server_certificate(self, event: ActionEvent) -> None: |
| 328 | + """Get the server certificate.""" |
| 329 | + certificate = "error" |
| 330 | + if not self._database_config: |
| 331 | + event.fail() |
| 332 | + else: |
| 333 | + try: |
| 334 | + process = subprocess.run( |
| 335 | + [ |
| 336 | + "openssl", |
| 337 | + "s_client", |
| 338 | + "-starttls", |
| 339 | + "mysql", |
| 340 | + "-connect", |
| 341 | + f"{self._database_config['host']}:{self._database_config['port']}", |
| 342 | + ], |
| 343 | + capture_output=True, |
| 344 | + ) |
| 345 | + # butchered stdout due non utf chars after the certificate |
| 346 | + raw_output = process.stdout[:2800].decode("utf8") |
| 347 | + matches = re.search( |
| 348 | + r"^(-----BEGIN C.*END CERTIFICATE-----[,\s])", |
| 349 | + raw_output, |
| 350 | + re.MULTILINE | re.DOTALL, |
| 351 | + ) |
| 352 | + certificate = matches.group(0) |
| 353 | + except Exception: |
| 354 | + event.fail() |
| 355 | + |
| 356 | + event.set_results({"certificate": certificate}) |
| 357 | + |
322 | 358 |
|
323 | 359 | if __name__ == "__main__":
|
324 | 360 | main(MySQLTestApplication)
|
0 commit comments