Skip to content

Commit 1a0fc4b

Browse files
authored
Merge pull request #27 from gene1wood/check_for_already_issued_duplicate_crt
Check if filenames of the .csr .cnf pair match existing cert
2 parents bdec062 + 1db4019 commit 1a0fc4b

File tree

3 files changed

+98
-33
lines changed

3 files changed

+98
-33
lines changed

hsm_orchestrator/__init__.py

Lines changed: 72 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ def __init__(self, orchestrator_config_filename: Path, **kwargs: Any) -> None:
6868
self.usb_path = None
6969
# The path on the workstation to the certificate authorities directory
7070
self.local_ca_path = None
71+
self.ca_cert_files_in_repo = []
7172

7273
def get_openssl_cnf_config(self) -> None:
7374
"""Parse the OpenSSL configuration (.cnf) file.
@@ -116,6 +117,43 @@ def get_openssl_cnf_config(self) -> None:
116117
"", inline_comment
117118
)
118119

120+
def get_ca_cert_filename_on_usb(self) -> str:
121+
"""Identify which file on USB stick is the CA cert
122+
123+
Compare files on the USB stick with CA certificate files in the repo
124+
to identify which one is the CA certificate
125+
126+
:returns: The filename of the CA certificate on the USB stick
127+
"""
128+
for private_key_directory in [
129+
x
130+
for x in Path(self.repo_dir / "certificate-authorities").iterdir()
131+
if x.is_dir()
132+
]:
133+
for certificate_authority_directory in private_key_directory.iterdir():
134+
for certificate_file in certificate_authority_directory.iterdir():
135+
self.ca_cert_files_in_repo.append(certificate_file)
136+
crt_filenames_on_usb = set(x.name for x in self.usb_path.glob("*.crt"))
137+
ca_cert_filenames_on_usb = (
138+
set(x.name for x in self.ca_cert_files_in_repo) & crt_filenames_on_usb
139+
)
140+
if len(ca_cert_filenames_on_usb) == 0:
141+
print(
142+
"No .crt files were found on the USB stick with names that match CA"
143+
" certificate files.We would expect there to be one CA certificate file"
144+
" on the USB stick."
145+
)
146+
exit(1)
147+
if len(ca_cert_filenames_on_usb) > 1:
148+
print(
149+
"There are multiple .crt files on the USB stick with names that match"
150+
" CA certificate files.We would only expect there to be one CA"
151+
" certificate file on the USB stick."
152+
)
153+
exit(1)
154+
ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb))
155+
return ca_cert_filename_on_usb
156+
119157
def check_update_private_key(self) -> None:
120158
"""Validate and update the OpenSSL 'private_key' value.
121159
@@ -329,6 +367,27 @@ def check_ca_files(self) -> None:
329367
)
330368
exit(1)
331369

370+
def check_for_matching_issued_cert(self):
371+
"""Check to see if there is an existing cert which matches the .cnf/.csr
372+
373+
To prevent issuing a duplicate cert with the same name as an existing
374+
issued cert, compare the .cnf/.csr filenames with the existing certs
375+
in the certs_issued directory.
376+
"""
377+
issued_cert_map = {
378+
x.name: x for x in Path(self.repo_dir / "certs_issued").glob("**/*.crt")
379+
}
380+
if self.csr_file.with_suffix(".crt").name in issued_cert_map.keys():
381+
print(
382+
f"The .csr file {self.csr_file}, were it to be used to create a .crt"
383+
f" file, would create {self.csr_file.with_suffix('.crt').name} which"
384+
" has the same name as the existing issued cert"
385+
f" {issued_cert_map[self.csr_file.with_suffix('.crt').name]}. This"
386+
f" could cause a collision. Please change the name of {self.csr_file}"
387+
f" and {self.cnf_file} to something distinct."
388+
)
389+
exit(1)
390+
332391
def check_update_cnf_file(self) -> None:
333392
"""Perform validation and update of the OpenSSL .cnf file.
334393
@@ -355,7 +414,6 @@ def check_update_cnf_file(self) -> None:
355414
self.check_update_ca_crt()
356415
self.check_update_unique_subject()
357416
self.check_ca_files()
358-
print("Check completed.")
359417

360418
def check_environment(self, skip_git_fetch: bool) -> None:
361419
"""Validate the git environment and prepare for CSR processing.
@@ -642,7 +700,8 @@ def process_usb_files(self, actions):
642700
elif issubclass(type(actions[filename]), PurePath):
643701
destination = Path(actions[filename] / filename.name)
644702
shutil.copy2(filename, destination)
645-
# We only need to ensure that the execute bit isn't set because that's all that git records
703+
# We only need to ensure that the execute bit isn't set because
704+
# that's all that git records
646705
# https://github.com/git/git/commit/e447947
647706
destination.chmod(
648707
destination.stat().st_mode
@@ -814,36 +873,16 @@ def pull_from_stick(
814873
"""
815874
orchestrator = HsmOrchestrator(orchestrator_config_filename, repo_dir=repo_dir)
816875
orchestrator.choose_usb_disk()
876+
817877
actions = {} # Key is the file and value is the action to perform on that file
818-
ca_cert_files_in_repo = {}
819-
for private_key_directory in [
820-
x
821-
for x in Path(orchestrator.repo_dir / "certificate-authorities").iterdir()
822-
if x.is_dir()
823-
]:
824-
for certificate_authority_directory in private_key_directory.iterdir():
825-
for certificate_file in certificate_authority_directory.iterdir():
826-
ca_cert_files_in_repo[certificate_file.name] = certificate_file
827-
crt_filenames_on_usb = set(x.name for x in orchestrator.usb_path.glob("*.crt"))
828-
ca_cert_filenames_on_usb = ca_cert_files_in_repo.keys() & crt_filenames_on_usb
829-
if len(ca_cert_filenames_on_usb) == 0:
830-
print(
831-
"No .crt files were found on the USB stick with names that match CA"
832-
" certificate files.We would expect there to be one CA certificate file on"
833-
" the USB stick."
834-
)
835-
exit(1)
836-
if len(ca_cert_filenames_on_usb) > 1:
837-
print(
838-
"There are multiple .crt files on the USB stick with names that match CA"
839-
" certificate files.We would only expect there to be one CA certificate"
840-
" file on the USB stick."
841-
)
842-
exit(1)
843-
ca_cert_filename_on_usb = next(iter(ca_cert_filenames_on_usb))
844-
certificate_authority_directory = ca_cert_files_in_repo[
845-
ca_cert_filename_on_usb
846-
].parent
878+
ca_cert_filename_on_usb = orchestrator.get_ca_cert_filename_on_usb()
879+
certificate_authority_directory = next(
880+
iter([
881+
x
882+
for x in orchestrator.ca_cert_files_in_repo
883+
if x.name == ca_cert_filename_on_usb
884+
])
885+
).parent
847886
# Delete the CA crt file on the USB stick
848887
actions[Path(orchestrator.usb_path / ca_cert_filename_on_usb)] = "delete"
849888

@@ -925,6 +964,8 @@ def check(
925964
orchestrator.check_environment(skip_git_fetch)
926965
orchestrator.get_openssl_cnf_config()
927966
orchestrator.check_update_cnf_file()
967+
orchestrator.check_for_matching_issued_cert()
968+
print("Check completed.")
928969

929970

930971
if __name__ == "__main__":

tests/test_check.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,29 @@ def test_missing_openssl_cnf(tmp_path, datafiles):
318318
re_search(r"The CSR .* has no associated \.cnf file\.", result.output)
319319

320320

321+
@pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf")
322+
def test_cnf_csr_match_existing_certs_issued(tmp_path, datafiles, monkeypatch):
323+
runner = CliRunner()
324+
with runner.isolated_filesystem(tmp_path):
325+
env = set_up_environment(tmp_path, datafiles, monkeypatch)
326+
Path(datafiles / "example.cnf").rename(env["cnf_file"])
327+
Path(env["repo_dir"] / "certs_issued" / "test" / "example.cnf").touch()
328+
Path(env["repo_dir"] / "certs_issued" / "test" / "example.csr").touch()
329+
Path(env["repo_dir"] / "certs_issued" / "test" / "example.crt").touch()
330+
result = runner.invoke(
331+
main,
332+
["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]],
333+
input="",
334+
)
335+
re_search(
336+
r"The \.csr file .*, were it to be used to create a \.crt file, would"
337+
r" create example\.crt which has the same name as the existing issued cert"
338+
r" .*\. This could cause a collision\. Please change the name of"
339+
r" .*example\.csr and .*example\.cnf to something distinct\.",
340+
result.output,
341+
)
342+
343+
321344
@pytest.mark.datafiles(FIXTURE_DIR / "example.csr", FIXTURE_DIR / "example.cnf")
322345
def test_missing_default_ca_setting(tmp_path, datafiles, monkeypatch):
323346
runner = CliRunner()
@@ -783,7 +806,7 @@ def test_check(tmp_path, datafiles, monkeypatch):
783806
runner = CliRunner()
784807
with runner.isolated_filesystem(tmp_path):
785808
env = set_up_environment(tmp_path, datafiles, monkeypatch)
786-
shutil.copy2(Path(datafiles / "example.cnf"), env["cnf_file"])
809+
Path(datafiles / "example.cnf").rename(env["cnf_file"])
787810
result = runner.invoke(
788811
main,
789812
["check", "--skip-git-fetch", "--config", env["orchestrator_config_file"]],

tests/test_pull.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,8 @@ def test_file_actions(tmp_path, datafiles, monkeypatch):
204204
set_up_usb(env["usb_mount_point"])
205205
# Remove the certs_issued directory to make sure it gets created
206206
Path(env["repo_dir"] / "certs_issued" / "test").rmdir()
207-
# Set the execute bits on the file so that we can test that they are cleared during the pull
207+
# Set the execute bits on the file so that we can test that they are
208+
# cleared during the pull
208209
Path(env["usb_mount_point"] / "AUT-123-testing.crt").chmod(0o755)
209210
keyboard_input = f"{env['usb_mount_point']}\nn\ny\n"
210211
result = runner.invoke(

0 commit comments

Comments
 (0)