Skip to content

Commit 10e7d6c

Browse files
authored
VED-746: E2E Test suite for search by identifier (#800)
* VED-746: E2E Teat suite for search by identifier * VED-746: Test for identifier values * VED-746: search by identifier backward compatibility * VED-746: test for search by identifier * VED-746: test for search by identifier2 * VED-746: e2e test for identifier and _elements * VED-746: e2e for search for identifier and _elements * VED-746: e2e for post path * VED-746: OAS specification for meta.versionId * VED-746: change variable names * VED-746: rename e2e test files
1 parent f737a78 commit 10e7d6c

File tree

6 files changed

+393
-10
lines changed

6 files changed

+393
-10
lines changed
Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,214 @@
1+
2+
from decimal import Decimal
3+
import pprint
4+
from typing import NamedTuple, Literal, Optional
5+
import uuid
6+
from utils.base_test import ImmunizationBaseTest
7+
from utils.constants import valid_nhs_number1
8+
9+
from utils.resource import generate_imms_resource, generate_filtered_imms_resource
10+
from utils.mappings import VaccineTypes
11+
from lib.env import get_service_base_path
12+
13+
14+
class TestSearchImmunizationByIdentifier(ImmunizationBaseTest):
15+
16+
def store_records(self, *resources):
17+
ids = []
18+
for res in resources:
19+
imms_id = self.default_imms_api.create_immunization_resource(res)
20+
ids.append(imms_id)
21+
return ids[0] if len(ids) == 1 else tuple(ids)
22+
23+
def test_search_imms(self):
24+
for imms_api in self.imms_apis:
25+
with self.subTest(imms_api):
26+
covid19_imms_data = generate_imms_resource()
27+
rsv_imms_data = generate_imms_resource()
28+
covid_ids = self.store_records(covid19_imms_data)
29+
rsv_ids = self.store_records(rsv_imms_data)
30+
31+
# Retrieve the resources to get the identifier system and value via read API
32+
covid_resource = imms_api.get_immunization_by_id(covid_ids).json()
33+
rsv_resource = imms_api.get_immunization_by_id(rsv_ids).json()
34+
35+
# Extract identifier components safely for covid resource
36+
identifiers = covid_resource.get("identifier", [])
37+
identifier_system = identifiers[0].get("system")
38+
identifier_value = identifiers[0].get("value")
39+
40+
# Extract identifier components safely for rsv resource
41+
rsv_identifiers = rsv_resource.get("identifier", [])
42+
rsv_identifier_system = rsv_identifiers[0].get("system")
43+
rsv_identifier_value = rsv_identifiers[0].get("value")
44+
45+
# When
46+
search_response = imms_api.search_immunization_by_identifier(identifier_system, identifier_value)
47+
self.assertEqual(search_response.status_code, 200, search_response.text)
48+
bundle = search_response.json()
49+
self.assertEqual(bundle.get("resourceType"), "Bundle", bundle)
50+
entries = bundle.get("entry", [])
51+
self.assertTrue(entries, "Expected at least one match in Bundle.entry")
52+
self.assertEqual(len(entries), 1, f"Expected exactly one match, got {len(entries)}")
53+
54+
# When
55+
rsv_search_response = imms_api.search_immunization_by_identifier(
56+
rsv_identifier_system,
57+
rsv_identifier_value
58+
)
59+
self.assertEqual(rsv_search_response.status_code, 200, search_response.text)
60+
rsv_bundle = rsv_search_response.json()
61+
self.assertEqual(bundle.get("resourceType"), "Bundle", rsv_bundle)
62+
entries = rsv_bundle.get("entry", [])
63+
self.assertTrue(entries, "Expected at least one match in Bundle.entry")
64+
self.assertEqual(len(entries), 1, f"Expected exactly one match, got {len(entries)}")
65+
66+
def test_search_backwards_compatible(self):
67+
"""Test that SEARCH 200 response body is backwards compatible with Immunisation History FHIR API.
68+
This test proves that the search endpoint’s response is still shaped exactly like the
69+
Immunisation History FHIR API expects (“backwards compatible”), not just that it returns a 200
70+
"""
71+
for imms_api in self.imms_apis:
72+
with self.subTest(imms_api):
73+
74+
stored_imms_resource = generate_imms_resource()
75+
imms_identifier_value = stored_imms_resource["identifier"][0]["value"]
76+
imms_id = self.store_records(stored_imms_resource)
77+
78+
# Prepare the imms resource expected from the response. Note that id and identifier_value need to be
79+
# updated to match those assigned by the create_an_imms_obj and store_records functions.
80+
expected_imms_resource = generate_filtered_imms_resource(
81+
crud_operation_to_filter_for="SEARCH",
82+
imms_identifier_value=imms_identifier_value,
83+
nhs_number=valid_nhs_number1,
84+
vaccine_type=VaccineTypes.covid_19,
85+
)
86+
expected_imms_resource["id"] = imms_id
87+
expected_imms_resource["meta"] = {"versionId": "1"}
88+
89+
# Retrieve the resource to get the identifier system and value via READ API
90+
imms_resource = imms_api.get_immunization_by_id(imms_id).json()
91+
identifiers = imms_resource.get("identifier", [])
92+
identifier_system = identifiers[0].get("system")
93+
identifier_value = identifiers[0].get("value")
94+
self.assertIsNotNone(identifier_system, "Identifier system is None")
95+
self.assertIsNotNone(identifier_value, "Identifier value is None")
96+
97+
# When
98+
response = imms_api.search_immunization_by_identifier(identifier_system, identifier_value)
99+
100+
# Then
101+
self.assertEqual(response.status_code, 200, response.text)
102+
body = response.json(parse_float=Decimal)
103+
entries = body["entry"]
104+
response_imms = [item for item in entries if item["resource"]["resourceType"] == "Immunization"]
105+
response_patients = [item for item in entries if item["resource"]["resourceType"] == "Patient"]
106+
response_other_entries = [
107+
item for item in entries if item["resource"]["resourceType"] not in ("Patient", "Immunization")
108+
]
109+
110+
# Check bundle structure apart from entry
111+
self.assertEqual(body["resourceType"], "Bundle")
112+
self.assertEqual(body["type"], "searchset")
113+
self.assertEqual(body["total"], len(response_imms))
114+
115+
# Check that entry only contains a patient and immunizations
116+
self.assertEqual(len(response_other_entries), 0)
117+
self.assertEqual(len(response_patients), 0)
118+
119+
# Check Immunization structure
120+
for entry in response_imms:
121+
self.assertEqual(entry["search"], {"mode": "match"})
122+
self.assertTrue(entry["fullUrl"].startswith("https://"))
123+
self.assertEqual(entry["resource"]["resourceType"], "Immunization")
124+
imms_identifier = entry["resource"]["identifier"]
125+
self.assertEqual(len(imms_identifier), 1, "Immunization did not have exactly 1 identifier")
126+
self.assertEqual(imms_identifier[0]["system"], identifier_system)
127+
self.assertEqual(imms_identifier[0]["value"], identifier_value)
128+
129+
# Check structure of one of the imms resources
130+
response_imm = next(item for item in entries if item["resource"]["id"] == imms_id)
131+
self.assertEqual(
132+
response_imm["fullUrl"], f"{get_service_base_path()}/Immunization/{imms_id}"
133+
)
134+
self.assertEqual(response_imm["search"], {"mode": "match"})
135+
expected_imms_resource["patient"]["reference"] = response_imm["resource"]["patient"]["reference"]
136+
self.assertEqual(response_imm["resource"], expected_imms_resource)
137+
138+
def test_search_immunization_parameter_smoke_tests(self):
139+
stored_records = generate_imms_resource(
140+
valid_nhs_number1, VaccineTypes.covid_19,
141+
imms_identifier_value=str(uuid.uuid4()))
142+
143+
imms_id = self.store_records(stored_records)
144+
# Retrieve the resources to get the identifier system and value via read API
145+
covid_resource = self.default_imms_api.get_immunization_by_id(imms_id).json()
146+
147+
# Extract identifier components safely for covid resource
148+
identifiers = covid_resource.get("identifier", [])
149+
identifier_system = identifiers[0].get("system")
150+
identifier_value = identifiers[0].get("value")
151+
152+
# created_resource_ids = [result["id"] for result in stored_records]
153+
154+
class SearchTestParams(NamedTuple):
155+
method: Literal["POST", "GET"]
156+
query_string: Optional[str]
157+
body: Optional[str]
158+
should_be_success: bool
159+
expected_status_code: int = 200
160+
161+
searches = [
162+
SearchTestParams(
163+
"GET",
164+
"",
165+
None,
166+
False,
167+
400
168+
),
169+
# No results.
170+
SearchTestParams(
171+
"GET",
172+
f"identifier={identifier_system}|{identifier_value}",
173+
None,
174+
True,
175+
200
176+
),
177+
SearchTestParams(
178+
"POST",
179+
"",
180+
f"identifier={identifier_system}|{identifier_value}",
181+
True,
182+
200
183+
),
184+
SearchTestParams(
185+
"POST",
186+
f"identifier={identifier_system}|{identifier_value}",
187+
f"identifier={identifier_system}|{identifier_value}",
188+
False,
189+
400
190+
),
191+
]
192+
for search in searches:
193+
pprint.pprint(search)
194+
response = self.default_imms_api.search_immunizations_full(
195+
search.method,
196+
search.query_string,
197+
body=search.body,
198+
expected_status_code=search.expected_status_code)
199+
200+
# Then
201+
assert response.ok == search.should_be_success, response.text
202+
203+
results: dict = response.json()
204+
if search.should_be_success:
205+
assert "entry" in results.keys()
206+
assert response.status_code == 200
207+
assert results["resourceType"] == "Bundle"
208+
assert results["type"] == "searchset"
209+
assert results["total"] == 1
210+
assert isinstance(results["entry"], list)
211+
else:
212+
assert "entry" not in results.keys()
213+
assert response.status_code != 200
214+
assert results["resourceType"] == "OperationOutcome"
Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,140 @@
1+
from utils.base_test import ImmunizationBaseTest
2+
from lib.env import get_service_base_path
3+
import pprint
4+
import uuid
5+
from utils.constants import valid_nhs_number1
6+
from utils.resource import generate_imms_resource
7+
from utils.mappings import VaccineTypes
8+
from typing import NamedTuple, Literal, Optional
9+
10+
11+
class TestSearchImmunizationByIdentifier(ImmunizationBaseTest):
12+
13+
def store_records(self, *resources):
14+
ids = []
15+
for res in resources:
16+
imms_id = self.default_imms_api.create_immunization_resource(res)
17+
ids.append(imms_id)
18+
return ids[0] if len(ids) == 1 else tuple(ids)
19+
20+
def test_search_imms(self):
21+
for imms_api in self.imms_apis:
22+
with self.subTest(imms_api):
23+
covid19_imms_data = generate_imms_resource()
24+
covid_ids = self.store_records(covid19_imms_data)
25+
26+
# Retrieve the resources to get the identifier system and value via read API
27+
covid_resource = imms_api.get_immunization_by_id(covid_ids).json()
28+
29+
# Extract identifier components safely for covid resource
30+
identifiers = covid_resource.get("identifier", [])
31+
identifier_system = identifiers[0].get("system")
32+
identifier_value = identifiers[0].get("value")
33+
34+
# When
35+
search_response = imms_api.search_immunization_by_identifier_and_elements(
36+
identifier_system, identifier_value)
37+
self.assertEqual(search_response.status_code, 200, search_response.text)
38+
bundle = search_response.json()
39+
self.assertEqual(bundle.get("resourceType"), "Bundle", bundle)
40+
entries = bundle.get("entry", [])
41+
self.assertTrue(entries, "Expected at least one match in Bundle.entry")
42+
self.assertEqual(len(entries), 1, f"Expected exactly one match, got {len(entries)}")
43+
self.assertIn("meta", entries[0]["resource"])
44+
self.assertEqual(entries[0]["resource"]["id"], covid_ids)
45+
self.assertEqual(entries[0]["resource"]["meta"]["versionId"], 1)
46+
self.assertTrue(entries[0]["fullUrl"].startswith("https://"))
47+
self.assertEqual(
48+
entries[0]["fullUrl"], f"{get_service_base_path()}/Immunization/{covid_ids}"
49+
)
50+
51+
def test_search_imms_no_match_returns_empty_bundle(self):
52+
for imms_api in self.imms_apis:
53+
with self.subTest(imms_api):
54+
resp = imms_api.search_immunization_by_identifier_and_elements(
55+
"http://example.org/sys", "does-not-exist-123"
56+
)
57+
self.assertEqual(resp.status_code, 200, resp.text)
58+
bundle = resp.json()
59+
self.assertEqual(bundle.get("resourceType"), "Bundle", bundle)
60+
self.assertEqual(bundle.get("type"), "searchset")
61+
self.assertEqual(bundle.get("total", 0), 0)
62+
self.assertFalse(bundle.get("entry"))
63+
64+
def test_search_by_identifier_parameter_smoke_tests(self):
65+
stored_records = generate_imms_resource(
66+
valid_nhs_number1, VaccineTypes.covid_19,
67+
imms_identifier_value=str(uuid.uuid4()))
68+
69+
imms_id = self.store_records(stored_records)
70+
# Retrieve the resources to get the identifier system and value via read API
71+
covid_resource = self.default_imms_api.get_immunization_by_id(imms_id).json()
72+
73+
# Extract identifier components safely for covid resource
74+
identifiers = covid_resource.get("identifier", [])
75+
identifier_system = identifiers[0].get("system")
76+
identifier_value = identifiers[0].get("value")
77+
78+
# created_resource_ids = [result["id"] for result in stored_records]
79+
80+
class SearchTestParams(NamedTuple):
81+
method: Literal["POST", "GET"]
82+
query_string: Optional[str]
83+
body: Optional[str]
84+
should_be_success: bool
85+
expected_status_code: int = 200
86+
87+
searches = [
88+
SearchTestParams(
89+
"GET",
90+
"",
91+
None,
92+
False,
93+
400
94+
),
95+
# No results.
96+
SearchTestParams(
97+
"GET",
98+
f"identifier={identifier_system}|{identifier_value}",
99+
None,
100+
True,
101+
200
102+
),
103+
SearchTestParams(
104+
"POST",
105+
"",
106+
f"identifier={identifier_system}|{identifier_value}",
107+
True,
108+
200
109+
),
110+
SearchTestParams(
111+
"POST",
112+
f"identifier={identifier_system}|{identifier_value}",
113+
f"identifier={identifier_system}|{identifier_value}",
114+
False,
115+
400
116+
),
117+
]
118+
for search in searches:
119+
pprint.pprint(search)
120+
response = self.default_imms_api.search_immunizations_full(
121+
search.method,
122+
search.query_string,
123+
body=search.body,
124+
expected_status_code=search.expected_status_code)
125+
126+
# Then
127+
assert response.ok == search.should_be_success, response.text
128+
129+
results: dict = response.json()
130+
if search.should_be_success:
131+
assert "entry" in results.keys()
132+
assert response.status_code == 200
133+
assert results["resourceType"] == "Bundle"
134+
assert results["type"] == "searchset"
135+
assert results["total"] == 1
136+
assert isinstance(results["entry"], list)
137+
else:
138+
assert "entry" not in results.keys()
139+
assert response.status_code != 200
140+
assert results["resourceType"] == "OperationOutcome"

e2e/test_search_immunization.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -335,13 +335,6 @@ class SearchTestParams(NamedTuple):
335335

336336
result_ids = [result["resource"]["id"] for result in results["entry"]]
337337
created_and_returned_ids = list(set(result_ids) & set(created_resource_ids))
338-
print("\n Search Test Debug Info:")
339-
print("Search method:", search.method)
340-
print("Search query string:", search.query_string)
341-
print("Expected indexes:", search.expected_indexes)
342-
print("Expected IDs:", [created_resource_ids[i] for i in search.expected_indexes])
343-
print("Actual returned IDs:", result_ids)
344-
print("Matched IDs:", created_and_returned_ids)
345338
assert len(created_and_returned_ids) == len(search.expected_indexes)
346339
for expected_index in search.expected_indexes:
347340
assert created_resource_ids[expected_index] in result_ids

e2e/utils/constants.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,5 +8,7 @@
88
patient_identifier_system = "https://fhir.nhs.uk/Id/nhs-number"
99
valid_patient_identifier1 = f"{patient_identifier_system}|{valid_nhs_number1}"
1010
valid_patient_identifier2 = f"{patient_identifier_system}|{valid_nhs_number2}"
11+
identifier_system = "https://supplierABC/identifiers/vacc"
12+
identifier_value = "f10b59b3-fc73-4616-99c9-9e882ab31184"
1113

1214
env_internal_dev = os.environ.get("ENVIRONMENT", "") == "internal-dev"

0 commit comments

Comments
 (0)