1+ from datetime import datetime , timezone
2+
3+
4+ @staticmethod
5+ def get_patient (json_data ):
6+ contained = json_data .get ("contained" , [])
7+ return next ((c for c in contained if isinstance (c , dict ) and c .get ("resourceType" ) == "Patient" ), None )
8+
9+
10+ @staticmethod
11+ def get_valid_names (names , occurrence_time ):
12+ official_names = [n for n in names if n .get ("use" ) == "official" and is_current_period (n , occurrence_time )]
13+ if official_names :
14+ return official_names [0 ]
15+
16+ valid_names = [n for n in names if is_current_period (n , occurrence_time ) and n .get ("use" ) != "old" ]
17+ return valid_names [0 ] if valid_names else names [0 ]
18+
19+
20+ @staticmethod
21+ def extract_person_names (patient , occurrence_time ):
22+ names = patient .get ("name" , [])
23+ if not isinstance (names , list ) or not names :
24+ return "" , ""
25+
26+ selected_name = get_valid_names (names , occurrence_time )
27+ person_forename = " " .join (selected_name .get ("given" , []))
28+ person_surname = selected_name .get ("family" , "" )
29+
30+ return person_forename , person_surname
31+
32+
33+ @staticmethod
34+ def get_valid_address (patient , occurrence_time ):
35+ addresses = patient .get ("address" , [])
36+ if not isinstance (addresses , list ) or not addresses :
37+ return "ZZ99 3CZ"
38+
39+ valid_addresses = [a for a in addresses if "postalCode" in a and is_current_period (a , occurrence_time )]
40+ if not valid_addresses :
41+ return "ZZ99 3CZ"
42+
43+ selected_address = next (
44+ (a for a in valid_addresses if a .get ("use" ) == "home" and a .get ("type" ) != "postal" ),
45+ next (
46+ (a for a in valid_addresses if a .get ("use" ) != "old" and a .get ("type" ) != "postal" ),
47+ next ((a for a in valid_addresses if a .get ("use" ) != "old" ), valid_addresses [0 ]),
48+ ),
49+ )
50+ return selected_address .get ("postalCode" , "ZZ99 3CZ" )
51+
52+
53+ @staticmethod
54+ def extract_site_code (json_data ):
55+ performers = json_data .get ("performer" , [])
56+ if not isinstance (performers , list ) or not performers :
57+ return None , None
58+
59+ valid_performers = [p for p in performers if "actor" in p and "identifier" in p ["actor" ]]
60+ if not valid_performers :
61+ return None , None
62+
63+ selected_performer = next (
64+ (
65+ p
66+ for p in valid_performers
67+ if p .get ("actor" , {}).get ("type" ) == "Organization"
68+ and p .get ("actor" , {}).get ("identifier" , {}).get ("system" ) == "https://fhir.nhs.uk/Id/ods-organization-code"
69+ ),
70+ next (
71+ (
72+ p
73+ for p in valid_performers
74+ if p .get ("actor" , {}).get ("identifier" , {}).get ("system" )
75+ == "https://fhir.nhs.uk/Id/ods-organization-code"
76+ ),
77+ next (
78+ (p for p in valid_performers if p .get ("actor" , {}).get ("type" ) == "Organization" ),
79+ valid_performers [0 ] if valid_performers else None ,
80+ ),
81+ ),
82+ )
83+ site_code = selected_performer ["actor" ].get ("identifier" , {}).get ("value" )
84+ site_code_type_uri = selected_performer ["actor" ].get ("identifier" , {}).get ("system" )
85+
86+ return site_code , site_code_type_uri
87+
88+
89+ @staticmethod
90+ def extract_practitioner_names (json_data , occurrence_time ):
91+ contained = json_data .get ("contained" , [])
92+ practitioner = next ((c for c in contained if isinstance (c , dict ) and c .get ("resourceType" ) == "Practitioner" ), None )
93+ if not practitioner or "name" not in practitioner :
94+ return "" , ""
95+
96+ practitioner_names = practitioner .get ("name" , [])
97+ valid_practitioner_names = [n for n in practitioner_names if "given" in n or "family" in n ]
98+ if not valid_practitioner_names :
99+ return "" , ""
100+
101+ selected_practitioner_name = get_valid_names (valid_practitioner_names , occurrence_time )
102+ performing_professional_forename = " " .join (selected_practitioner_name .get ("given" , []))
103+ performing_professional_surname = selected_practitioner_name .get ("family" , "" )
104+
105+ return performing_professional_forename , performing_professional_surname
106+
107+
108+ def is_current_period (name , occurrence_time ):
109+ period = name .get ("period" )
110+ if not isinstance (period , dict ):
111+ return True # If no period is specified, assume it's valid
112+
113+ start = datetime .fromisoformat (period .get ("start" )) if period .get ("start" ) else None
114+ end = datetime .fromisoformat (period .get ("end" )) if period .get ("end" ) else None
115+
116+ # Ensure all datetime objects are timezone-aware
117+ if start and start .tzinfo is None :
118+ start = start .replace (tzinfo = timezone .utc )
119+ if end and end .tzinfo is None :
120+ end = end .replace (tzinfo = timezone .utc )
121+
122+ return (not start or start <= occurrence_time ) and (not end or occurrence_time <= end )
0 commit comments