Skip to content

Commit a7e5cf8

Browse files
authored
[NDR-284] Add E2E test for invalid common name (#876)
1 parent 60b18ac commit a7e5cf8

File tree

11 files changed

+141
-102
lines changed

11 files changed

+141
-102
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ share/python-wheels/
2727
MANIFEST
2828
.pip_cache/
2929
package/
30+
lambdas/mtls_env_certs/
3031

3132
# PyInstaller
3233
# Usually these files are written by a python script from a template

lambdas/tests/e2e/api/fhir/conftest.py

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,9 @@
1414
PDM_SNOMED = 717391000000106
1515
PDM_METADATA_TABLE = os.environ.get("PDM_METADATA_TABLE")
1616
PDM_S3_BUCKET = os.environ.get("PDM_S3_BUCKET") or ""
17-
MTLS_ENDPOINT = os.environ["MTLS_ENDPOINT"]
18-
CLIENT_CERT_PATH = os.environ["CLIENT_CERT_PATH"]
19-
CLIENT_KEY_PATH = os.environ["CLIENT_KEY_PATH"]
17+
MTLS_ENDPOINT = os.environ.get("MTLS_ENDPOINT")
18+
CLIENT_CERT_PATH = os.environ.get("CLIENT_CERT_PATH")
19+
CLIENT_KEY_PATH = os.environ.get("CLIENT_KEY_PATH")
2020

2121

2222
@pytest.fixture
@@ -47,9 +47,11 @@ def fetch_with_retry_mtls(
4747
raise Exception("Condition not met within retry limit")
4848

4949

50-
def create_mtls_session():
50+
def create_mtls_session(
51+
client_cert_path=CLIENT_CERT_PATH, client_key_path=CLIENT_KEY_PATH
52+
):
5153
session = requests.Session()
52-
session.cert = (CLIENT_CERT_PATH, CLIENT_KEY_PATH)
54+
session.cert = (client_cert_path, client_key_path)
5355
return session
5456

5557

@@ -83,3 +85,36 @@ def temp_cert_and_key():
8385
yield cert_path, key_path
8486
finally:
8587
shutil.rmtree(temp_dir)
88+
89+
90+
def get_pdm_document_reference(record_id, client_cert_path=None, client_key_path=None):
91+
url = f"https://{MTLS_ENDPOINT}/DocumentReference/{PDM_SNOMED}~{record_id}"
92+
headers = {
93+
"X-Correlation-Id": "1234",
94+
}
95+
96+
# Call with invalid or unauthorised certs
97+
if client_cert_path and client_key_path:
98+
session = create_mtls_session(client_cert_path, client_key_path)
99+
else:
100+
# Call with default valid certs
101+
session = create_mtls_session()
102+
103+
response = session.get(url, headers=headers)
104+
return response
105+
106+
107+
def create_and_store_pdm_record(
108+
test_data,
109+
nhs_number: str = "9912003071",
110+
doc_status: str | None = None,
111+
size: int | None = None,
112+
):
113+
"""Helper to create metadata and resource for a record."""
114+
record = pdm_data_helper.build_record(
115+
nhs_number=nhs_number, doc_status=doc_status, size=size
116+
)
117+
test_data.append(record)
118+
pdm_data_helper.create_metadata(record)
119+
pdm_data_helper.create_resource(record)
120+
return record
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import os
2+
import uuid
3+
4+
from lambdas.tests.e2e.api.fhir.conftest import get_pdm_document_reference
5+
6+
UNAUTHORISED_CLIENT_CERT_PATH = os.environ.get("UNAUTHORISED_CLIENT_CERT_PATH")
7+
UNAUTHORISED_CLIENT_KEY_PATH = os.environ.get("UNAUTHORISED_CLIENT_KEY_PATH")
8+
9+
10+
def test_mtls_invalid_common_name():
11+
record_id = str(uuid.uuid4())
12+
response = get_pdm_document_reference(
13+
record_id, UNAUTHORISED_CLIENT_CERT_PATH, UNAUTHORISED_CLIENT_KEY_PATH
14+
)
15+
assert response.status_code == 400
16+
17+
data = response.json()
18+
diagnostics = data.get("issue", [{}])[0].get("diagnostics", "")
19+
assert diagnostics == "Invalid document type requested"
Lines changed: 17 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,43 +1,25 @@
11
import base64
2-
import io
2+
import uuid
33

44
import pytest
5-
import requests
65
from tests.e2e.helpers.data_helper import PdmDataHelper
76

87
from lambdas.tests.e2e.api.fhir.conftest import (
9-
MTLS_ENDPOINT,
108
PDM_S3_BUCKET,
11-
create_mtls_session,
9+
create_and_store_pdm_record,
10+
get_pdm_document_reference,
1211
)
13-
from lambdas.tests.e2e.conftest import PDM_SNOMED
1412

1513
pdm_data_helper = PdmDataHelper()
1614

1715

18-
def get_document_reference(record_id):
19-
"""Helper to perform GET request for DocumentReference."""
20-
url = f"https://{MTLS_ENDPOINT}/DocumentReference/{PDM_SNOMED}~{record_id}"
21-
headers = {
22-
"X-Correlation-Id": "1234",
23-
}
24-
session = create_mtls_session()
25-
return session.get(url, headers=headers)
26-
27-
2816
@pytest.mark.parametrize("file_size", [None, 10 * 1024 * 1024])
2917
def test_file_retrieval(test_data, file_size):
3018
"""Test retrieval for small and large files."""
31-
pdm_record = pdm_data_helper.build_record(
32-
data=io.BytesIO(b"A" * file_size) if file_size else None,
33-
size=file_size * 1024 if file_size else None,
19+
pdm_record = create_and_store_pdm_record(
20+
test_data, size=file_size if file_size else None
3421
)
35-
test_data.append(pdm_record)
36-
37-
pdm_data_helper.create_metadata(pdm_record)
38-
pdm_data_helper.create_resource(pdm_record)
39-
40-
response = get_document_reference(pdm_record["id"])
22+
response = get_pdm_document_reference(pdm_record["id"])
4123
assert response.status_code == 200
4224

4325
json = response.json()
@@ -56,15 +38,15 @@ def test_file_retrieval(test_data, file_size):
5638

5739

5840
@pytest.mark.parametrize(
59-
"nhs_id,expected_status,expected_code,expected_diagnostics",
41+
"record_id,expected_status,expected_code,expected_diagnostics",
6042
[
61-
("9912003071", 404, "RESOURCE_NOT_FOUND", "Document reference not found"),
43+
(str(uuid.uuid4()), 404, "RESOURCE_NOT_FOUND", "Document reference not found"),
6244
],
6345
)
6446
def test_retrieve_edge_cases(
65-
nhs_id, expected_status, expected_code, expected_diagnostics
47+
record_id, expected_status, expected_code, expected_diagnostics
6648
):
67-
response = get_document_reference(nhs_id)
49+
response = get_pdm_document_reference(record_id)
6850
assert response.status_code == expected_status
6951

7052
body = response.json()
@@ -78,30 +60,25 @@ def test_retrieve_edge_cases(
7860

7961

8062
def test_preliminary_file(test_data):
81-
pdm_record = pdm_data_helper.build_record(doc_status="preliminary")
82-
test_data.append(pdm_record)
83-
pdm_data_helper.create_metadata(pdm_record)
84-
pdm_data_helper.create_resource(pdm_record)
63+
pdm_record = create_and_store_pdm_record(test_data, doc_status="preliminary")
8564

86-
response = get_document_reference(pdm_record["id"])
65+
response = get_pdm_document_reference(pdm_record["id"])
8766
assert response.status_code == 200
8867

8968
response_json = response.json()
9069
assert response_json.get("docStatus") == "preliminary"
9170

9271

9372
def test_forbidden_with_invalid_cert(test_data, temp_cert_and_key):
94-
pdm_record = pdm_data_helper.build_record()
95-
test_data.append(pdm_record)
96-
pdm_data_helper.create_metadata(pdm_record)
97-
pdm_data_helper.create_resource(pdm_record)
73+
pdm_record = create_and_store_pdm_record(test_data)
9874

9975
# Use an invalid cert that is trusted by TLS but fails truststore validation
10076
cert_path, key_path = temp_cert_and_key
101-
url = f"https://{MTLS_ENDPOINT}/DocumentReference/{PDM_SNOMED}~{pdm_record['id']}"
102-
headers = {"Authorization": "Bearer 123", "X-Correlation-Id": "1234"}
10377

104-
response = requests.get(url, headers=headers, cert=(cert_path, key_path))
78+
response = get_pdm_document_reference(
79+
pdm_record["id"], client_cert_path=cert_path, client_key_path=key_path
80+
)
81+
10582
body = response.json()
10683
assert response.status_code == 403
10784
assert body["message"] == "Forbidden"

lambdas/tests/e2e/api/fhir/test_search_patient_fhir_api.py

Lines changed: 21 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,40 +1,37 @@
11
import pytest
2-
import requests
32

4-
from lambdas.tests.e2e.api.fhir.conftest import MTLS_ENDPOINT, create_mtls_session
3+
from lambdas.tests.e2e.api.fhir.conftest import (
4+
MTLS_ENDPOINT,
5+
create_and_store_pdm_record,
6+
create_mtls_session,
7+
)
58
from lambdas.tests.e2e.conftest import APIM_ENDPOINT, PDM_SNOMED
69
from lambdas.tests.e2e.helpers.data_helper import PdmDataHelper
710

811
pdm_data_helper = PdmDataHelper()
912

1013

11-
def search_document_reference(nhs_number):
12-
"""Helper to perform search by NHS number."""
14+
def search_document_reference(nhs_number, client_cert_path=None, client_key_path=None):
15+
"""Helper to perform search by NHS number with optional mTLS certs."""
1316
url = (
1417
f"https://{MTLS_ENDPOINT}/DocumentReference?"
1518
f"subject:identifier=https://fhir.nhs.uk/Id/nhs-number|{nhs_number}"
1619
)
1720
headers = {
18-
"Authorization": "Bearer 123",
1921
"X-Correlation-Id": "1234",
2022
}
21-
session = create_mtls_session()
22-
return session.get(url, headers=headers)
2323

24+
# Use provided certs if available, else defaults
25+
if client_cert_path and client_key_path:
26+
session = create_mtls_session(client_cert_path, client_key_path)
27+
else:
28+
session = create_mtls_session()
2429

25-
def create_and_store_record(
26-
test_data, nhs_number="9912003071", doc_status: str | None = None
27-
):
28-
"""Helper to create metadata and resource for a record."""
29-
record = pdm_data_helper.build_record(nhs_number=nhs_number, doc_status=doc_status)
30-
test_data.append(record)
31-
pdm_data_helper.create_metadata(record)
32-
pdm_data_helper.create_resource(record)
33-
return record
30+
return session.get(url, headers=headers)
3431

3532

3633
def test_search_patient_details(test_data):
37-
create_and_store_record(test_data)
34+
create_and_store_pdm_record(test_data)
3835

3936
response = search_document_reference("9912003071")
4037
assert response.status_code == 200
@@ -50,8 +47,8 @@ def test_search_patient_details(test_data):
5047

5148

5249
def test_multiple_cancelled_search_patient_details(test_data):
53-
create_and_store_record(test_data, doc_status="cancelled")
54-
create_and_store_record(test_data, doc_status="cancelled")
50+
create_and_store_pdm_record(test_data, doc_status="cancelled")
51+
create_and_store_pdm_record(test_data, doc_status="cancelled")
5552

5653
response = search_document_reference("9912003071")
5754
assert response.status_code == 200
@@ -90,20 +87,15 @@ def test_search_edge_cases(
9087

9188
def test_search_patient_unauthorized_mtls(test_data, temp_cert_and_key):
9289
"""Search should return 403 when mTLS certificate is invalid or missing."""
93-
create_and_store_record(test_data)
94-
url = (
95-
f"https://{MTLS_ENDPOINT}/DocumentReference?"
96-
f"subject:identifier=https://fhir.nhs.uk/Id/nhs-number|9912003071"
97-
)
98-
headers = {
99-
"Authorization": "Bearer 123",
100-
"X-Correlation-Id": "unauthorized-test",
101-
}
90+
create_and_store_pdm_record(test_data)
10291

10392
# Use an invalid cert that is trusted by TLS but fails truststore validation
10493
cert_path, key_path = temp_cert_and_key
10594

106-
response = requests.get(url, headers=headers, cert=(cert_path, key_path))
95+
response = search_document_reference(
96+
"9912003071", client_cert_path=cert_path, client_key_path=key_path
97+
)
98+
10799
body = response.json()
108100
assert response.status_code == 403
109101
assert body["message"] == "Forbidden"

0 commit comments

Comments
 (0)