Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 72 additions & 31 deletions hsm_orchestrator/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ def __init__(self, orchestrator_config_filename: Path, **kwargs: Any) -> None:
self.usb_path = None
# The path on the workstation to the certificate authorities directory
self.local_ca_path = None
self.ca_cert_files_in_repo = []

def get_openssl_cnf_config(self) -> None:
"""Parse the OpenSSL configuration (.cnf) file.
Expand Down Expand Up @@ -116,6 +117,43 @@ def get_openssl_cnf_config(self) -> None:
"", inline_comment
)

def get_ca_cert_filename_on_usb(self) -> str:
"""Identify which file on USB stick is the CA cert

Compare files on the USB stick with CA certificate files in the repo
to identify which one is the CA certificate

:returns: The filename of the CA certificate on the USB stick
"""
for private_key_directory in [
x
for x in Path(self.repo_dir / "certificate-authorities").iterdir()
if x.is_dir()
]:
for certificate_authority_directory in private_key_directory.iterdir():
for certificate_file in certificate_authority_directory.iterdir():
self.ca_cert_files_in_repo.append(certificate_file)
crt_filenames_on_usb = set(x.name for x in self.usb_path.glob("*.crt"))
ca_cert_filenames_on_usb = (
set(x.name for x in self.ca_cert_files_in_repo) & crt_filenames_on_usb
)
if len(ca_cert_filenames_on_usb) == 0:
print(
"No .crt files were found on the USB stick with names that match CA"
" certificate files.We would expect there to be one CA certificate file"
" on the USB stick."
)
exit(1)
if len(ca_cert_filenames_on_usb) > 1:
print(
"There are multiple .crt files on the USB stick with names that match"
" CA certificate files.We would only expect there to be one CA"
" certificate file on the USB stick."
)
exit(1)
ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb))
return ca_cert_filename_on_usb

def check_update_private_key(self) -> None:
"""Validate and update the OpenSSL 'private_key' value.

Expand Down Expand Up @@ -329,6 +367,27 @@ def check_ca_files(self) -> None:
)
exit(1)

def check_for_matching_issued_cert(self):
"""Check to see if there is an existing cert which matches the .cnf/.csr

To prevent issuing a duplicate cert with the same name as an existing
issued cert, compare the .cnf/.csr filenames with the existing certs
in the certs_issued directory.
"""
issued_cert_map = {
x.name: x for x in Path(self.repo_dir / "certs_issued").glob("**/*.crt")
}
if self.csr_file.with_suffix(".crt").name in issued_cert_map.keys():
print(
f"The .csr file {self.csr_file}, were it to be used to create a .crt"
f" file, would create {self.csr_file.with_suffix('.crt').name} which"
" has the same name as the existing issued cert"
f" {issued_cert_map[self.csr_file.with_suffix('.crt').name]}. This"
f" could cause a collision. Please change the name of {self.csr_file}"
f" and {self.cnf_file} to something distinct."
)
exit(1)

def check_update_cnf_file(self) -> None:
"""Perform validation and update of the OpenSSL .cnf file.

Expand All @@ -355,7 +414,6 @@ def check_update_cnf_file(self) -> None:
self.check_update_ca_crt()
self.check_update_unique_subject()
self.check_ca_files()
print("Check completed.")

def check_environment(self, skip_git_fetch: bool) -> None:
"""Validate the git environment and prepare for CSR processing.
Expand Down Expand Up @@ -642,7 +700,8 @@ def process_usb_files(self, actions):
elif issubclass(type(actions[filename]), PurePath):
destination = Path(actions[filename] / filename.name)
shutil.copy2(filename, destination)
# We only need to ensure that the execute bit isn't set because that's all that git records
# We only need to ensure that the execute bit isn't set because
# that's all that git records
# https://github.com/git/git/commit/e447947
destination.chmod(
destination.stat().st_mode
Expand Down Expand Up @@ -814,36 +873,16 @@ def pull_from_stick(
"""
orchestrator = HsmOrchestrator(orchestrator_config_filename, repo_dir=repo_dir)
orchestrator.choose_usb_disk()

actions = {} # Key is the file and value is the action to perform on that file
ca_cert_files_in_repo = {}
for private_key_directory in [
x
for x in Path(orchestrator.repo_dir / "certificate-authorities").iterdir()
if x.is_dir()
]:
for certificate_authority_directory in private_key_directory.iterdir():
for certificate_file in certificate_authority_directory.iterdir():
ca_cert_files_in_repo[certificate_file.name] = certificate_file
crt_filenames_on_usb = set(x.name for x in orchestrator.usb_path.glob("*.crt"))
ca_cert_filenames_on_usb = ca_cert_files_in_repo.keys() & crt_filenames_on_usb
if len(ca_cert_filenames_on_usb) == 0:
print(
"No .crt files were found on the USB stick with names that match CA"
" certificate files.We would expect there to be one CA certificate file on"
" the USB stick."
)
exit(1)
if len(ca_cert_filenames_on_usb) > 1:
print(
"There are multiple .crt files on the USB stick with names that match CA"
" certificate files.We would only expect there to be one CA certificate"
" file on the USB stick."
)
exit(1)
ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb))
certificate_authority_directory = ca_cert_files_in_repo[
ca_cert_filename_on_usb
].parent
ca_cert_filename_on_usb = orchestrator.get_ca_cert_filename_on_usb()
certificate_authority_directory = next(
iter([
x
for x in orchestrator.ca_cert_files_in_repo
if x.name == ca_cert_filename_on_usb
])
).parent
# Delete the CA crt file on the USB stick
actions[Path(orchestrator.usb_path / ca_cert_filename_on_usb)] = "delete"

Expand Down Expand Up @@ -925,6 +964,8 @@ def check(
orchestrator.check_environment(skip_git_fetch)
orchestrator.get_openssl_cnf_config()
orchestrator.check_update_cnf_file()
orchestrator.check_for_matching_issued_cert()
print("Check completed.")


if __name__ == "__main__":
Expand Down
25 changes: 24 additions & 1 deletion tests/test_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,29 @@ def test_missing_openssl_cnf(tmp_path, datafiles):
re_search(r"The CSR .* has no associated \.cnf file\.", result.output)


@pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf")
def test_cnf_csr_match_existing_certs_issued(tmp_path, datafiles, monkeypatch):
runner = CliRunner()
with runner.isolated_filesystem(tmp_path):
env = set_up_environment(tmp_path, datafiles, monkeypatch)
Path(datafiles / "example.cnf").rename(env["cnf_file"])
Path(env["repo_dir"] / "certs_issued" / "test" / "example.cnf").touch()
Path(env["repo_dir"] / "certs_issued" / "test" / "example.csr").touch()
Path(env["repo_dir"] / "certs_issued" / "test" / "example.crt").touch()
result = runner.invoke(
main,
["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]],
input="",
)
re_search(
r"The \.csr file .*, were it to be used to create a \.crt file, would"
r" create example\.crt which has the same name as the existing issued cert"
r" .*\. This could cause a collision\. Please change the name of"
r" .*example\.csr and .*example\.cnf to something distinct\.",
result.output,
)


@pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf")
def test_missing_default_ca_setting(tmp_path, datafiles, monkeypatch):
runner = CliRunner()
Expand Down Expand Up @@ -783,7 +806,7 @@ def test_check(tmp_path, datafiles, monkeypatch):
runner = CliRunner()
with runner.isolated_filesystem(tmp_path):
env = set_up_environment(tmp_path, datafiles, monkeypatch)
shutil.copy2(Path(datafiles / "example.cnf"), env["cnf_file"])
Path(datafiles / "example.cnf").rename(env["cnf_file"])
result = runner.invoke(
main,
["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]],
Expand Down
3 changes: 2 additions & 1 deletion tests/test_pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,8 @@ def test_file_actions(tmp_path, datafiles, monkeypatch):
set_up_usb(env["usb_mount_point"])
# Remove the certs_issued directory to make sure it gets created
Path(env["repo_dir"] / "certs_issued" / "test").rmdir()
# Set the execute bits on the file so that we can test that they are cleared during the pull
# Set the execute bits on the file so that we can test that they are
# cleared during the pull
Path(env["usb_mount_point"] / "AUT-123-testing.crt").chmod(0o755)
keyboard_input = f"{env['usb_mount_point']}\nn\ny\n"
result = runner.invoke(
Expand Down