Skip to content

Commit 3376f04

Browse files
committed
[NRL-1559] Add pointer stats report
1 parent 734e158 commit 3376f04

File tree

2 files changed

+427
-0
lines changed

2 files changed

+427
-0
lines changed

reports/calculate_pointer_stats.py

Lines changed: 224 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,224 @@
1+
import json
2+
from datetime import datetime, timedelta, timezone
3+
from typing import Any
4+
5+
import boto3
6+
import fire
7+
8+
from nrlf.consumer.fhir.r4.model import DocumentReference
9+
from nrlf.core.constants import PointerTypes
10+
from nrlf.core.logger import logger
11+
from nrlf.core.validators import DocumentReferenceValidator
12+
13+
type PatientCounter = dict[int, int]
14+
type TypePatientCounter = dict[str, PatientCounter]
15+
type OrgTypePatientCounter = dict[str, TypePatientCounter]
16+
17+
dynamodb = boto3.client("dynamodb")
18+
paginator = dynamodb.get_paginator("scan")
19+
20+
logger.setLevel("ERROR")
21+
22+
type_to_name = {pointer_type.value: pointer_type.name for pointer_type in PointerTypes}
23+
24+
25+
def _calc_type_stats(producer: str, type_str: str, stats: dict[str, Any]) -> None:
26+
stats["type_counts"] = stats.get("type_counts", {})
27+
stats["type_counts"][type_str] = stats["type_counts"].get(type_str, 0) + 1
28+
29+
stats["producer_by_type_counts"][producer] = stats["producer_by_type_counts"].get(
30+
producer, {}
31+
)
32+
stats["producer_by_type_counts"][producer][type_str] = (
33+
stats["producer_by_type_counts"][producer].get(type_str, 0) + 1
34+
)
35+
36+
37+
def _calc_date_stats(created_on: str, stats: dict[str, Any]) -> None:
38+
month_created = created_on[:7] if created_on else "not-set"
39+
if month_created not in stats["created_by_month"]:
40+
stats["created_by_month"][month_created] = 1
41+
else:
42+
stats["created_by_month"][month_created] += 1
43+
44+
45+
def _calc_patient_counters(
46+
patient_number: str, producer: str, type_str: str, patient_counters: dict[str, Any]
47+
) -> None:
48+
if patient_number not in patient_counters:
49+
patient_counters[patient_number] = {
50+
"count": 1,
51+
"types": {type_str: 1},
52+
"orgs": {producer: {type_str: 1}},
53+
}
54+
else:
55+
patient_counters[patient_number]["count"] += 1
56+
patient_counters[patient_number]["types"][type_str] = (
57+
patient_counters[patient_number]["types"].get(type_str, 0) + 1
58+
)
59+
patient_counters[patient_number]["orgs"][producer] = patient_counters[
60+
patient_number
61+
]["orgs"].get(producer, {})
62+
patient_counters[patient_number]["orgs"][producer][type_str] = (
63+
patient_counters[patient_number]["orgs"][producer].get(type_str, 0) + 1
64+
)
65+
66+
67+
def _get_patient_stats(patient_counters: dict[str, Any]) -> dict[str, Any]:
68+
total_pointers = 0
69+
max_pointers = 0
70+
min_pointers = 0
71+
counts_with_pointers: PatientCounter = {}
72+
counts_with_types: TypePatientCounter = {}
73+
counts_with_orgs_types: OrgTypePatientCounter = {}
74+
75+
for counters in patient_counters.values():
76+
count = counters["count"]
77+
78+
total_pointers += count
79+
max_pointers = max(max_pointers, count)
80+
min_pointers = min(min_pointers, count) if min_pointers else count
81+
82+
counts_with_pointers[count] = counts_with_pointers.get(count, 0) + 1
83+
84+
for type, type_count in counters["types"].items():
85+
counts_with_types[type] = counts_with_types.get(type, {})
86+
counts_with_types[type][type_count] = (
87+
counts_with_types[type].get(type_count, 0) + 1
88+
)
89+
90+
for org, types in counters["orgs"].items():
91+
counts_with_orgs_types[org] = counts_with_orgs_types.get(org, {})
92+
for type, type_count in types.items():
93+
counts_with_orgs_types[org][type] = counts_with_orgs_types[org].get(
94+
type, {}
95+
)
96+
counts_with_orgs_types[org][type][type_count] = (
97+
counts_with_orgs_types[org][type].get(type_count, 0) + 1
98+
)
99+
100+
return {
101+
"avg_pointers_per_patient": (
102+
total_pointers / len(patient_counters) if patient_counters else 0
103+
),
104+
"max_pointers_per_patient": max_pointers,
105+
"min_pointers_per_patient": min_pointers,
106+
"patient_counts_with_pointers": counts_with_pointers,
107+
"patient_counts_with_types": counts_with_types,
108+
"patient_counts_with_org_types": counts_with_orgs_types,
109+
}
110+
111+
112+
def _scan_and_get_stats(
113+
table_name: str, report_output_file: str = ""
114+
) -> dict[str, float | int]:
115+
"""
116+
Calculate stats from the pointers table.
117+
Parameters:
118+
- table_name: The name of the pointers table to use.
119+
"""
120+
params: dict[str, Any] = {
121+
"TableName": table_name,
122+
"PaginationConfig": {"PageSize": 50},
123+
}
124+
125+
total_scanned_count = 0
126+
127+
start_time = datetime.now(tz=timezone.utc)
128+
129+
stats: dict[str, Any] = {
130+
"fails_model": 0,
131+
"fails_validation": 0,
132+
"total_pointers": 0,
133+
"type_counts": {},
134+
"producer_by_type_counts": {},
135+
"created_by_month": {},
136+
"patients_with_pointers": 0,
137+
"avg_pointers_per_patient": 0,
138+
"max_pointers_per_patient": 0,
139+
"min_pointers_per_patient": 0,
140+
"patient_counts_with_pointers": {},
141+
"patient_counts_with_types": {},
142+
"patient_counts_with_org_types": {},
143+
}
144+
145+
patient_counters: dict[str, Any] = {}
146+
147+
for page in paginator.paginate(**params):
148+
for item in page["Items"]:
149+
document = item.get("document", {}).get("S", "")
150+
created_on = item.get("created_on", {}).get("S", "")
151+
152+
# Do validations
153+
try:
154+
docref = DocumentReference.model_validate_json(document)
155+
except Exception:
156+
stats["fails_model"] += 1
157+
continue
158+
159+
result = DocumentReferenceValidator().validate(data=docref)
160+
if not result.is_valid:
161+
stats["fails_validation"] += 1
162+
163+
patient_number = (
164+
docref.subject.identifier.value
165+
if docref.subject
166+
and docref.subject.identifier
167+
and docref.subject.identifier.value
168+
else "unknown"
169+
)
170+
producer = (
171+
docref.custodian.identifier.value
172+
if docref.custodian
173+
and docref.custodian.identifier
174+
and docref.custodian.identifier.value
175+
else "unknown"
176+
)
177+
type_coding = (
178+
docref.type.coding[0] if docref.type and docref.type.coding else None
179+
)
180+
type_str = (
181+
f"{type_coding.system}|{type_coding.code}" if type_coding else "unknown"
182+
)
183+
184+
_calc_type_stats(producer, type_str, stats)
185+
_calc_date_stats(created_on, stats)
186+
_calc_patient_counters(patient_number, producer, type_str, patient_counters)
187+
188+
total_scanned_count += page["ScannedCount"]
189+
190+
if total_scanned_count % 1000 == 0:
191+
print(".", end="", flush=True) # noqa
192+
193+
if total_scanned_count % 100000 == 0:
194+
print(f"scanned={total_scanned_count}") # noqa
195+
196+
end_time = datetime.now(tz=timezone.utc)
197+
198+
stats["total_pointers"] = total_scanned_count
199+
stats["patients_with_pointers"] = len(patient_counters)
200+
stats["avg_pointers_per_patient"] = (
201+
total_scanned_count / stats["patients_with_pointers"]
202+
if stats["patients_with_pointers"] > 0
203+
else 0
204+
)
205+
206+
patient_stats = _get_patient_stats(patient_counters)
207+
stats.update(patient_stats)
208+
209+
print("Done") # noqa
210+
211+
if report_output_file:
212+
with open(report_output_file, "w") as f:
213+
json.dump(stats, f, indent=2)
214+
print(f"Stats saved to {report_output_file}") # noqa
215+
216+
return {
217+
"scanned_count": total_scanned_count,
218+
"took-secs": timedelta.total_seconds(end_time - start_time),
219+
"stats": json.dumps(stats, indent=2),
220+
}
221+
222+
223+
if __name__ == "__main__":
224+
fire.Fire(_scan_and_get_stats)

0 commit comments

Comments
 (0)