Skip to content

Commit b0bfcac

Browse files
Merge pull request #999 from NHSDigital/feature/made14-NRL-1559-add-stats-report
[NRL-1559] Add pointer stats report
2 parents d45455d + d7f7926 commit b0bfcac

File tree

3 files changed

+516
-0
lines changed

3 files changed

+516
-0
lines changed

reports/calculate_pointer_stats.py

Lines changed: 221 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,221 @@
1+
import json
2+
from datetime import datetime, timedelta, timezone
3+
from typing import Any
4+
5+
import boto3
6+
import fire
7+
8+
from nrlf.consumer.fhir.r4.model import DocumentReference
9+
from nrlf.core.logger import logger
10+
from nrlf.core.validators import DocumentReferenceValidator
11+
12+
type PatientCounter = dict[int, int]
13+
type TypePatientCounter = dict[str, PatientCounter]
14+
type OrgTypePatientCounter = dict[str, TypePatientCounter]
15+
16+
dynamodb = boto3.client("dynamodb")
17+
paginator = dynamodb.get_paginator("scan")
18+
19+
logger.setLevel("ERROR")
20+
21+
22+
def _calc_type_stats(producer: str, type_str: str, stats: dict[str, Any]) -> None:
23+
stats["type_counts"] = stats.get("type_counts", {})
24+
stats["type_counts"][type_str] = stats["type_counts"].get(type_str, 0) + 1
25+
26+
stats["producer_by_type_counts"][producer] = stats["producer_by_type_counts"].get(
27+
producer, {}
28+
)
29+
stats["producer_by_type_counts"][producer][type_str] = (
30+
stats["producer_by_type_counts"][producer].get(type_str, 0) + 1
31+
)
32+
33+
34+
def _calc_date_stats(created_on: str, stats: dict[str, Any]) -> None:
35+
month_created = created_on[:7] if created_on else "not-set"
36+
if month_created not in stats["created_by_month"]:
37+
stats["created_by_month"][month_created] = 1
38+
else:
39+
stats["created_by_month"][month_created] += 1
40+
41+
42+
def _calc_patient_counters(
43+
patient_number: str, producer: str, type_str: str, patient_counters: dict[str, Any]
44+
) -> None:
45+
if patient_number not in patient_counters:
46+
patient_counters[patient_number] = {
47+
"count": 1,
48+
"types": {type_str: 1},
49+
"orgs": {producer: {type_str: 1}},
50+
}
51+
else:
52+
patient_counters[patient_number]["count"] += 1
53+
patient_counters[patient_number]["types"][type_str] = (
54+
patient_counters[patient_number]["types"].get(type_str, 0) + 1
55+
)
56+
patient_counters[patient_number]["orgs"][producer] = patient_counters[
57+
patient_number
58+
]["orgs"].get(producer, {})
59+
patient_counters[patient_number]["orgs"][producer][type_str] = (
60+
patient_counters[patient_number]["orgs"][producer].get(type_str, 0) + 1
61+
)
62+
63+
64+
def _get_patient_stats(patient_counters: dict[str, Any]) -> dict[str, Any]:
65+
total_pointers = 0
66+
max_pointers = 0
67+
min_pointers = 0
68+
counts_with_pointers: PatientCounter = {}
69+
counts_with_types: TypePatientCounter = {}
70+
counts_with_orgs_types: OrgTypePatientCounter = {}
71+
72+
for counters in patient_counters.values():
73+
count = counters["count"]
74+
75+
total_pointers += count
76+
max_pointers = max(max_pointers, count)
77+
min_pointers = min(min_pointers, count) if min_pointers else count
78+
79+
counts_with_pointers[count] = counts_with_pointers.get(count, 0) + 1
80+
81+
for type, type_count in counters["types"].items():
82+
counts_with_types[type] = counts_with_types.get(type, {})
83+
counts_with_types[type][type_count] = (
84+
counts_with_types[type].get(type_count, 0) + 1
85+
)
86+
87+
for org, types in counters["orgs"].items():
88+
counts_with_orgs_types[org] = counts_with_orgs_types.get(org, {})
89+
for type, type_count in types.items():
90+
counts_with_orgs_types[org][type] = counts_with_orgs_types[org].get(
91+
type, {}
92+
)
93+
counts_with_orgs_types[org][type][type_count] = (
94+
counts_with_orgs_types[org][type].get(type_count, 0) + 1
95+
)
96+
97+
return {
98+
"avg_pointers_per_patient": (
99+
total_pointers / len(patient_counters) if patient_counters else 0
100+
),
101+
"max_pointers_per_patient": max_pointers,
102+
"min_pointers_per_patient": min_pointers,
103+
"patient_counts_with_pointers": counts_with_pointers,
104+
"patient_counts_with_types": counts_with_types,
105+
"patient_counts_with_org_types": counts_with_orgs_types,
106+
}
107+
108+
109+
def _scan_and_get_stats(
110+
table_name: str, report_output_file: str = ""
111+
) -> dict[str, float | int]:
112+
"""
113+
Calculate stats from the pointers table.
114+
Parameters:
115+
- table_name: The name of the pointers table to use.
116+
"""
117+
params: dict[str, Any] = {
118+
"TableName": table_name,
119+
"PaginationConfig": {"PageSize": 50},
120+
}
121+
122+
total_scanned_count = 0
123+
124+
start_time = datetime.now(tz=timezone.utc)
125+
126+
stats: dict[str, Any] = {
127+
"fails_model": 0,
128+
"fails_validation": 0,
129+
"total_pointers": 0,
130+
"type_counts": {},
131+
"producer_by_type_counts": {},
132+
"created_by_month": {},
133+
"patients_with_pointers": 0,
134+
"avg_pointers_per_patient": 0,
135+
"max_pointers_per_patient": 0,
136+
"min_pointers_per_patient": 0,
137+
"patient_counts_with_pointers": {},
138+
"patient_counts_with_types": {},
139+
"patient_counts_with_org_types": {},
140+
}
141+
142+
patient_counters: dict[str, Any] = {}
143+
144+
for page in paginator.paginate(**params):
145+
for item in page["Items"]:
146+
document = item.get("document", {}).get("S", "")
147+
created_on = item.get("created_on", {}).get("S", "")
148+
149+
# Do validations
150+
try:
151+
docref = DocumentReference.model_validate_json(document)
152+
except Exception:
153+
stats["fails_model"] += 1
154+
continue
155+
156+
result = DocumentReferenceValidator().validate(data=docref)
157+
if not result.is_valid:
158+
stats["fails_validation"] += 1
159+
160+
patient_number = (
161+
docref.subject.identifier.value
162+
if docref.subject
163+
and docref.subject.identifier
164+
and docref.subject.identifier.value
165+
else "unknown"
166+
)
167+
producer = (
168+
docref.custodian.identifier.value
169+
if docref.custodian
170+
and docref.custodian.identifier
171+
and docref.custodian.identifier.value
172+
else "unknown"
173+
)
174+
type_coding = (
175+
docref.type.coding[0] if docref.type and docref.type.coding else None
176+
)
177+
type_str = (
178+
f"{type_coding.system}|{type_coding.code}" if type_coding else "unknown"
179+
)
180+
181+
_calc_type_stats(producer, type_str, stats)
182+
_calc_date_stats(created_on, stats)
183+
_calc_patient_counters(patient_number, producer, type_str, patient_counters)
184+
185+
total_scanned_count += page["ScannedCount"]
186+
187+
if total_scanned_count % 1000 == 0:
188+
print(".", end="", flush=True) # noqa
189+
190+
if total_scanned_count % 100000 == 0:
191+
print(f"scanned={total_scanned_count}") # noqa
192+
193+
end_time = datetime.now(tz=timezone.utc)
194+
195+
stats["total_pointers"] = total_scanned_count
196+
stats["patients_with_pointers"] = len(patient_counters)
197+
stats["avg_pointers_per_patient"] = (
198+
total_scanned_count / stats["patients_with_pointers"]
199+
if stats["patients_with_pointers"] > 0
200+
else 0
201+
)
202+
203+
patient_stats = _get_patient_stats(patient_counters)
204+
stats.update(patient_stats)
205+
206+
print("Done") # noqa
207+
208+
if report_output_file:
209+
with open(report_output_file, "w") as f:
210+
json.dump(stats, f, indent=2)
211+
print(f"Stats saved to {report_output_file}") # noqa
212+
213+
return {
214+
"scanned_count": total_scanned_count,
215+
"took-secs": timedelta.total_seconds(end_time - start_time),
216+
"stats": json.dumps(stats, indent=2),
217+
}
218+
219+
220+
if __name__ == "__main__":
221+
fire.Fire(_scan_and_get_stats)

0 commit comments

Comments
 (0)