diff --git a/hsm_orchestrator/__init__.py b/hsm_orchestrator/__init__.py index f2399ad..669b177 100644 --- a/hsm_orchestrator/__init__.py +++ b/hsm_orchestrator/__init__.py @@ -68,6 +68,7 @@ def __init__(self, orchestrator_config_filename: Path, **kwargs: Any) -> None: self.usb_path = None # The path on the workstation to the certificate authorities directory self.local_ca_path = None + self.ca_cert_files_in_repo = [] def get_openssl_cnf_config(self) -> None: """Parse the OpenSSL configuration (.cnf) file. @@ -116,6 +117,43 @@ def get_openssl_cnf_config(self) -> None: "", inline_comment ) + def get_ca_cert_filename_on_usb(self) -> str: + """Identify which file on USB stick is the CA cert + + Compare files on the USB stick with CA certificate files in the repo + to identify which one is the CA certificate + + :returns: The filename of the CA certificate on the USB stick + """ + for private_key_directory in [ + x + for x in Path(self.repo_dir / "certificate-authorities").iterdir() + if x.is_dir() + ]: + for certificate_authority_directory in private_key_directory.iterdir(): + for certificate_file in certificate_authority_directory.iterdir(): + self.ca_cert_files_in_repo.append(certificate_file) + crt_filenames_on_usb = set(x.name for x in self.usb_path.glob("*.crt")) + ca_cert_filenames_on_usb = ( + set(x.name for x in self.ca_cert_files_in_repo) & crt_filenames_on_usb + ) + if len(ca_cert_filenames_on_usb) == 0: + print( + "No .crt files were found on the USB stick with names that match CA" + " certificate files.We would expect there to be one CA certificate file" + " on the USB stick." + ) + exit(1) + if len(ca_cert_filenames_on_usb) > 1: + print( + "There are multiple .crt files on the USB stick with names that match" + " CA certificate files.We would only expect there to be one CA" + " certificate file on the USB stick." + ) + exit(1) + ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb)) + return ca_cert_filename_on_usb + def check_update_private_key(self) -> None: """Validate and update the OpenSSL 'private_key' value. @@ -329,6 +367,27 @@ def check_ca_files(self) -> None: ) exit(1) + def check_for_matching_issued_cert(self): + """Check to see if there is an existing cert which matches the .cnf/.csr + + To prevent issuing a duplicate cert with the same name as an existing + issued cert, compare the .cnf/.csr filenames with the existing certs + in the certs_issued directory. + """ + issued_cert_map = { + x.name: x for x in Path(self.repo_dir / "certs_issued").glob("**/*.crt") + } + if self.csr_file.with_suffix(".crt").name in issued_cert_map.keys(): + print( + f"The .csr file {self.csr_file}, were it to be used to create a .crt" + f" file, would create {self.csr_file.with_suffix('.crt').name} which" + " has the same name as the existing issued cert" + f" {issued_cert_map[self.csr_file.with_suffix('.crt').name]}. This" + f" could cause a collision. Please change the name of {self.csr_file}" + f" and {self.cnf_file} to something distinct." + ) + exit(1) + def check_update_cnf_file(self) -> None: """Perform validation and update of the OpenSSL .cnf file. @@ -355,7 +414,6 @@ def check_update_cnf_file(self) -> None: self.check_update_ca_crt() self.check_update_unique_subject() self.check_ca_files() - print("Check completed.") def check_environment(self, skip_git_fetch: bool) -> None: """Validate the git environment and prepare for CSR processing. @@ -642,7 +700,8 @@ def process_usb_files(self, actions): elif issubclass(type(actions[filename]), PurePath): destination = Path(actions[filename] / filename.name) shutil.copy2(filename, destination) - # We only need to ensure that the execute bit isn't set because that's all that git records + # We only need to ensure that the execute bit isn't set because + # that's all that git records # https://github.com/git/git/commit/e447947 destination.chmod( destination.stat().st_mode @@ -814,36 +873,16 @@ def pull_from_stick( """ orchestrator = HsmOrchestrator(orchestrator_config_filename, repo_dir=repo_dir) orchestrator.choose_usb_disk() + actions = {} # Key is the file and value is the action to perform on that file - ca_cert_files_in_repo = {} - for private_key_directory in [ - x - for x in Path(orchestrator.repo_dir / "certificate-authorities").iterdir() - if x.is_dir() - ]: - for certificate_authority_directory in private_key_directory.iterdir(): - for certificate_file in certificate_authority_directory.iterdir(): - ca_cert_files_in_repo[certificate_file.name] = certificate_file - crt_filenames_on_usb = set(x.name for x in orchestrator.usb_path.glob("*.crt")) - ca_cert_filenames_on_usb = ca_cert_files_in_repo.keys() & crt_filenames_on_usb - if len(ca_cert_filenames_on_usb) == 0: - print( - "No .crt files were found on the USB stick with names that match CA" - " certificate files.We would expect there to be one CA certificate file on" - " the USB stick." - ) - exit(1) - if len(ca_cert_filenames_on_usb) > 1: - print( - "There are multiple .crt files on the USB stick with names that match CA" - " certificate files.We would only expect there to be one CA certificate" - " file on the USB stick." - ) - exit(1) - ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb)) - certificate_authority_directory = ca_cert_files_in_repo[ - ca_cert_filename_on_usb - ].parent + ca_cert_filename_on_usb = orchestrator.get_ca_cert_filename_on_usb() + certificate_authority_directory = next( + iter([ + x + for x in orchestrator.ca_cert_files_in_repo + if x.name == ca_cert_filename_on_usb + ]) + ).parent # Delete the CA crt file on the USB stick actions[Path(orchestrator.usb_path / ca_cert_filename_on_usb)] = "delete" @@ -925,6 +964,8 @@ def check( orchestrator.check_environment(skip_git_fetch) orchestrator.get_openssl_cnf_config() orchestrator.check_update_cnf_file() + orchestrator.check_for_matching_issued_cert() + print("Check completed.") if __name__ == "__main__": diff --git a/tests/test_check.py b/tests/test_check.py index 422c8cb..e4c180e 100644 --- a/tests/test_check.py +++ b/tests/test_check.py @@ -318,6 +318,29 @@ def test_missing_openssl_cnf(tmp_path, datafiles): re_search(r"The CSR .* has no associated \.cnf file\.", result.output) +@pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf") +def test_cnf_csr_match_existing_certs_issued(tmp_path, datafiles, monkeypatch): + runner = CliRunner() + with runner.isolated_filesystem(tmp_path): + env = set_up_environment(tmp_path, datafiles, monkeypatch) + Path(datafiles / "example.cnf").rename(env["cnf_file"]) + Path(env["repo_dir"] / "certs_issued" / "test" / "example.cnf").touch() + Path(env["repo_dir"] / "certs_issued" / "test" / "example.csr").touch() + Path(env["repo_dir"] / "certs_issued" / "test" / "example.crt").touch() + result = runner.invoke( + main, + ["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]], + input="", + ) + re_search( + r"The \.csr file .*, were it to be used to create a \.crt file, would" + r" create example\.crt which has the same name as the existing issued cert" + r" .*\. This could cause a collision\. Please change the name of" + r" .*example\.csr and .*example\.cnf to something distinct\.", + result.output, + ) + + @pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf") def test_missing_default_ca_setting(tmp_path, datafiles, monkeypatch): runner = CliRunner() @@ -783,7 +806,7 @@ def test_check(tmp_path, datafiles, monkeypatch): runner = CliRunner() with runner.isolated_filesystem(tmp_path): env = set_up_environment(tmp_path, datafiles, monkeypatch) - shutil.copy2(Path(datafiles / "example.cnf"), env["cnf_file"]) + Path(datafiles / "example.cnf").rename(env["cnf_file"]) result = runner.invoke( main, ["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]], diff --git a/tests/test_pull.py b/tests/test_pull.py index f80e823..7ae8972 100644 --- a/tests/test_pull.py +++ b/tests/test_pull.py @@ -204,7 +204,8 @@ def test_file_actions(tmp_path, datafiles, monkeypatch): set_up_usb(env["usb_mount_point"]) # Remove the certs_issued directory to make sure it gets created Path(env["repo_dir"] / "certs_issued" / "test").rmdir() - # Set the execute bits on the file so that we can test that they are cleared during the pull + # Set the execute bits on the file so that we can test that they are + # cleared during the pull Path(env["usb_mount_point"] / "AUT-123-testing.crt").chmod(0o755) keyboard_input = f"{env['usb_mount_point']}\nn\ny\n" result = runner.invoke(