Skip to content

Commit da3d958

Browse files
committed
VED-79: fix idempotency issues
1 parent 37a42fd commit da3d958

File tree

3 files changed

+115
-33
lines changed

3 files changed

+115
-33
lines changed

mns_subscription/models/errors.py

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ class Severity(str, Enum):
1111
class Code(str, Enum):
1212
forbidden = "forbidden"
1313
not_found = "not-found"
14-
invalid = "invalid"
15-
server_error = "exception"
14+
invalid = "invalid or missing access token"
15+
server_error = "internal server error"
1616
invariant = "invariant"
1717
not_supported = "not-supported"
1818
duplicate = "duplicate"
@@ -34,21 +34,34 @@ def to_operation_outcome() -> dict:
3434

3535

3636
@dataclass
37-
class UnauthorizedVaxOnRecordError(RuntimeError):
37+
class TokenValidationError(RuntimeError):
3838
@staticmethod
3939
def to_operation_outcome() -> dict:
40-
msg = "Unauthorized request for vaccine type present in the stored immunization resource"
40+
msg = "Missing/Invalid Token"
4141
return create_operation_outcome(
4242
resource_id=str(uuid.uuid4()),
4343
severity=Severity.error,
44-
code=Code.forbidden,
44+
code=Code.invalid,
45+
diagnostics=msg,
46+
)
47+
48+
49+
@dataclass
50+
class ConflictError(RuntimeError):
51+
@staticmethod
52+
def to_operation_outcome() -> dict:
53+
msg = "Conflict"
54+
return create_operation_outcome(
55+
resource_id=str(uuid.uuid4()),
56+
severity=Severity.error,
57+
code=Code.duplicate,
4558
diagnostics=msg,
4659
)
4760

4861

4962
@dataclass
5063
class ResourceFoundError(RuntimeError):
51-
"""Return this error when the requested FHIR resource does exist"""
64+
"""Return this error when the requested resource does not exist or not complete"""
5265

5366
resource_type: str
5467
resource_id: str
@@ -67,7 +80,7 @@ def to_operation_outcome(self) -> dict:
6780

6881
@dataclass
6982
class UnhandledResponseError(RuntimeError):
70-
"""Use this error when the response from an external service (ex: dynamodb) can't be handled"""
83+
"""Use this unhandled errors"""
7184

7285
response: dict | str
7386
message: str
@@ -85,21 +98,21 @@ def to_operation_outcome(self) -> dict:
8598

8699

87100
@dataclass
88-
class IdentifierDuplicationError(RuntimeError):
89-
"""Fine grain validation"""
101+
class ServerError(RuntimeError):
102+
"""Use when there is a server error"""
90103

91-
identifier: str
104+
response: dict | str
105+
message: str
92106

93-
def __str__(self) -> str:
94-
return f"The provided identifier: {self.identifier} is duplicated"
107+
def __str__(self):
108+
return f"{self.message}\n{self.response}"
95109

96110
def to_operation_outcome(self) -> dict:
97-
msg = self.__str__()
98111
return create_operation_outcome(
99112
resource_id=str(uuid.uuid4()),
100113
severity=Severity.error,
101-
code=Code.duplicate,
102-
diagnostics=msg,
114+
code=Code.server_error,
115+
diagnostics=self.__str__(),
103116
)
104117

105118

mns_subscription/src/mns_service.py

Lines changed: 86 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@
44
import logging
55
import json
66
from authentication import AppRestrictedAuth
7-
from models.errors import UnhandledResponseError
7+
from models.errors import (
8+
UnhandledResponseError,
9+
ResourceFoundError,
10+
UnauthorizedError,
11+
ServerError,
12+
TokenValidationError
13+
)
814

915
SQS_ARN = os.getenv("SQS_ARN")
1016
MNS_URL = "https://int.api.service.nhs.uk/multicast-notification-service/subscriptions"
@@ -13,39 +19,102 @@
1319
class MnsService:
1420
def __init__(self, authenticator: AppRestrictedAuth):
1521
self.authenticator = authenticator
16-
17-
logging.info(f"Using SQS ARN for subscription: {SQS_ARN}")
18-
19-
def subscribe_notification(self) -> dict | None:
20-
access_token = self.authenticator.get_access_token()
21-
request_headers = {
22+
self.access_token = self.authenticator.get_access_token()
23+
self.request_headers = {
2224
'Content-Type': 'application/fhir+json',
23-
'Authorization': f'Bearer {access_token}',
25+
'Authorization': f'Bearer {self.access_token}',
2426
'X-Correlation-ID': str(uuid.uuid4())
2527
}
26-
27-
subscription_payload = {
28+
self.subscription_payload = {
2829
"resourceType": "Subscription",
2930
"status": "requested",
30-
"reason": "Subscribe SQS to MNS test-signal",
31+
"reason": "Subscribe SQS to NHS Number Change Events",
3132
"criteria": "eventType=nhs-number-change-2",
3233
"channel": {
3334
"type": "message",
3435
"endpoint": SQS_ARN,
3536
"payload": "application/json"
3637
}
3738
}
38-
response = requests.post(MNS_URL, headers=request_headers, data=json.dumps(subscription_payload))
3939

40-
print(f"Access Token: {access_token}")
40+
logging.info(f"Using SQS ARN for subscription: {SQS_ARN}")
41+
42+
def subscribe_notification(self) -> dict | None:
43+
44+
response = requests.post(MNS_URL, headers=self.request_headers, data=json.dumps(self.subscription_payload))
45+
46+
print(f"Access Token: {self.access_token}")
4147
print(f"SQS ARN: {SQS_ARN}")
42-
print(f"Headers: {request_headers}")
43-
print(f"Payload: {json.dumps(subscription_payload, indent=2)}")
48+
print(f"Headers: {self.request_headers}")
49+
print(f"Payload: {json.dumps(self.subscription_payload, indent=2)}")
4450

45-
if response.status_code == 201:
51+
if response.status_code == 200:
4652
return response.json()
53+
elif response.status_code == 409:
54+
msg = "SQS Queue Already Subscribed, can't re-subscribe"
55+
raise UnhandledResponseError(response=response.json(), message=msg)
56+
elif response.status_code == 401:
57+
msg = "SQS Queue Already Subscribed, can't re-subscribe"
58+
raise TokenValidationError(response=response.json(), message=msg)
59+
elif response.status_code == 400:
60+
msg = "Resource Type provided for this is not correct"
61+
raise ResourceFoundError(response=response.json(), message=msg)
62+
elif response.status_code == 403:
63+
msg = "You don't have the right permissions for this request"
64+
raise UnauthorizedError(response=response.json(), message=msg)
65+
elif response.status_code == 500:
66+
msg = "Internal Server Error"
67+
raise ServerError(response=response.json(), message=msg)
68+
else:
69+
msg = f"Unhandled error: {response.status_code} - {response.text}"
70+
raise UnhandledResponseError(response=response.json(), message=msg)
71+
72+
def get_subscription(self) -> dict | None:
73+
response = requests.get(MNS_URL, headers=self.request_headers)
74+
logging.info(f"GET {MNS_URL}")
75+
logging.debug(f"Headers: {self.request_headers}")
76+
77+
if response.status_code == 200:
78+
bundle = response.json()
79+
# Assume a FHIR Bundle with 'entry' list
80+
for entry in bundle.get("entry", []):
81+
resource = entry.get("channel", {})
82+
channel = resource.get("channel", {})
83+
if channel.get("endpoint") == SQS_ARN:
84+
return resource # Found a matching subscription
85+
return None # No subscription for this SQS ARN
4786
elif response.status_code == 404:
4887
return None
88+
elif response.status_code == 401:
89+
msg = "Token validation failed for the request"
90+
raise TokenValidationError(response=response.json(), message=msg)
91+
elif response.status_code == 400:
92+
msg = "Bad request: Resource type or parameters incorrect"
93+
raise ResourceFoundError(response=response.json(), message=msg)
94+
elif response.status_code == 403:
95+
msg = "You don't have the right permissions for this request"
96+
raise UnauthorizedError(response=response.json(), message=msg)
97+
elif response.status_code == 500:
98+
msg = "Internal Server Error"
99+
raise ServerError(response=response.json(), message=msg)
49100
else:
50-
msg = "MNS subscription failed"
101+
msg = f"Unhandled error: {response.status_code} - {response.text}"
51102
raise UnhandledResponseError(response=response.json(), message=msg)
103+
104+
def check_subscription(self) -> dict:
105+
"""
106+
Ensures that a subscription exists for this SQS_ARN.
107+
If not found, creates one.
108+
Returns the subscription.
109+
"""
110+
try:
111+
existing = self.get_subscription()
112+
if existing:
113+
logging.info("Subscription for this SQS ARN already exists.")
114+
return existing
115+
else:
116+
logging.info("No subscription found for this SQS ARN. Creating new subscription...")
117+
return self.subscribe_notification()
118+
except Exception as e:
119+
logging.error(f"Error ensuring subscription: {e}")
120+
raise

mns_subscription/src/subscribe_mns.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ def run_subscription():
2626
mns = MnsService(authenticator)
2727

2828
logging.info("Subscribing to MNS...")
29-
result = mns.subscribe_notification()
29+
result = mns.check_subscription()
3030
logging.info(f"Subscription Result: {result}")
3131
return result
3232
except Exception:

0 commit comments

Comments
 (0)