@@ -11,23 +11,79 @@ class Extractor:
1111
1212 CODING_SYSTEM_URL_SNOMED = "http://snomed.info/sct"
1313 ODS_ORG_CODE_SYSTEM_URL = "https://fhir.nhs.uk/Id/ods-organization-code"
14-
14+ DEFAULT_LOCATION = "X99999"
15+
1516 def __init__ (self , fhir_json_data ):
1617 self .fhir_json_data = json .loads (fhir_json_data ) if isinstance (fhir_json_data , str ) else fhir_json_data
1718
1819 def _get_patient (self ):
1920 contained = self .fhir_json_data .get ("contained" , [])
2021 return next ((c for c in contained if isinstance (c , dict ) and c .get ("resourceType" ) == "Patient" ), None )
2122
22- def _get_valid_names (self , names , occurrence_time ):
23-
23+ def _get_valid_names (self , names , occurrence_time ):
2424 official_names = [n for n in names if n .get ("use" ) == "official" and self ._is_current_period (n , occurrence_time )]
2525 if official_names :
2626 return official_names [0 ]
2727
2828 valid_names = [n for n in names if self ._is_current_period (n , occurrence_time ) and n .get ("use" ) != "old" ]
2929 return valid_names [0 ] if valid_names else names [0 ]
30+
31+ def _is_current_period (self , name , occurrence_time ):
32+ period = name .get ("period" )
33+ if not isinstance (period , dict ):
34+ return True # If no period is specified, assume it's valid
35+
36+ start = datetime .fromisoformat (period .get ("start" )) if period .get ("start" ) else None
37+ end = datetime .fromisoformat (period .get ("end" )) if period .get ("end" ) else None
38+
39+ # Ensure all datetime objects are timezone-aware
40+ if start and start .tzinfo is None :
41+ start = start .replace (tzinfo = timezone .utc )
42+ if end and end .tzinfo is None :
43+ end = end .replace (tzinfo = timezone .utc )
44+
45+ return (not start or start <= occurrence_time ) and (not end or occurrence_time <= end )
46+
47+ def _get_occurance_date_time (self ) -> str :
48+ try :
49+ #TODO: Double check if this logic is correct
50+ occurrence_time = datetime .fromisoformat (self .fhir_json_data .get ("occurrenceDateTime" , "" ))
51+ if occurrence_time and occurrence_time .tzinfo is None :
52+ occurrence_time = occurrence_time .replace (tzinfo = timezone .utc )
53+ return occurrence_time
54+ return occurrence_time
55+
56+ except Exception as e :
57+ message = "DateTime conversion error [%s]: %s" % (e .__class__ .__name__ , e )
58+ error = self ._log_error (message , code = exception_messages .UNEXPECTED_EXCEPTION )
59+ return error
60+
61+ def _get_first_snomed_code (self , coding_container : dict ) -> str :
62+ codings = coding_container .get ("coding" , [])
63+ for coding in codings :
64+ if coding .get ("system" ) == self .CODING_SYSTEM_URL_SNOMED :
65+ return coding .get ("code" , "" )
66+ return ""
3067
68+ def _get_term_from_codeable_concept (self , concept : dict ) -> str :
69+ if concept .get ("text" ):
70+ return concept ["text" ]
71+
72+ codings = concept .get ("coding" , [])
73+ for coding in codings :
74+ if coding .get ("system" ) == self .CODING_SYSTEM_URL_SNOMED :
75+ # Try SCTDescDisplay extension first
76+ for ext in coding .get ("extension" , []):
77+ if ext .get ("url" ) == self .EXTENSION_URL_SCT_DESC_DISPLAY :
78+ value_string = ext .get ("valueString" )
79+ if value_string :
80+ return value_string
81+
82+ # Fallback to display
83+ return coding .get ("display" , "" )
84+
85+ return ""
86+
3187 def extract_person_forename (self ):
3288 return self .extract_person_names ()[0 ]
3389
@@ -133,21 +189,6 @@ def extract_practitioner_names(self):
133189
134190 return performing_professional_forename , performing_professional_surname
135191
136- def _is_current_period (self , name , occurrence_time ):
137- period = name .get ("period" )
138- if not isinstance (period , dict ):
139- return True # If no period is specified, assume it's valid
140-
141- start = datetime .fromisoformat (period .get ("start" )) if period .get ("start" ) else None
142- end = datetime .fromisoformat (period .get ("end" )) if period .get ("end" ) else None
143-
144- # Ensure all datetime objects are timezone-aware
145- if start and start .tzinfo is None :
146- start = start .replace (tzinfo = timezone .utc )
147- if end and end .tzinfo is None :
148- end = end .replace (tzinfo = timezone .utc )
149-
150- return (not start or start <= occurrence_time ) and (not end or occurrence_time <= end )
151192
152193 def extract_vaccination_procedure_code (self ) -> str :
153194 extensions = self .fhir_json_data .get ("extension" , [])
@@ -176,6 +217,10 @@ def extract_indication_code(self) -> str:
176217 if coding .get ("system" ) == self .CODING_SYSTEM_URL_SNOMED :
177218 return coding .get ("code" , "" )
178219 return ""
220+
221+ def extract_dose_amount (self ) -> str :
222+ dose_quantity = self .fhir_json_data .get ("doseQuantity" , {})
223+ return dose_quantity .get ("value" , "" )
179224
180225 def extract_dose_unit_code (self ) -> str :
181226 dose_quantity = self .fhir_json_data .get ("doseQuantity" , {})
@@ -187,32 +232,6 @@ def extract_dose_unit_term(self) -> str:
187232 dose_quantity = self .fhir_json_data .get ("doseQuantity" , {})
188233 return dose_quantity .get ("unit" , "" )
189234
190- def _get_first_snomed_code (self , coding_container : dict ) -> str :
191- codings = coding_container .get ("coding" , [])
192- for coding in codings :
193- if coding .get ("system" ) == self .CODING_SYSTEM_URL_SNOMED :
194- return coding .get ("code" , "" )
195- return ""
196-
197- def _get_term_from_codeable_concept (self , concept : dict ) -> str :
198- if concept .get ("text" ):
199- return concept ["text" ]
200-
201- codings = concept .get ("coding" , [])
202- for coding in codings :
203- if coding .get ("system" ) == self .CODING_SYSTEM_URL_SNOMED :
204- # Try SCTDescDisplay extension first
205- for ext in coding .get ("extension" , []):
206- if ext .get ("url" ) == self .EXTENSION_URL_SCT_DESC_DISPLAY :
207- value_string = ext .get ("valueString" )
208- if value_string :
209- return value_string
210-
211- # Fallback to display
212- return coding .get ("display" , "" )
213-
214- return ""
215-
216235 def extract_vaccination_procedure_term (self ) -> str :
217236 extensions = self .fhir_json_data .get ("extension" , [])
218237 for ext in extensions :
@@ -237,16 +256,21 @@ def extract_dose_sequence(self) -> str:
237256 return str (dose ) if dose else ""
238257 return ""
239258
240- def _get_occurance_date_time (self ) -> str :
241- try :
242- #TODO: Double check if this logic is correct
243- occurrence_time = datetime .fromisoformat (self .fhir_json_data .get ("occurrenceDateTime" , "" ))
244- if occurrence_time and occurrence_time .tzinfo is None :
245- occurrence_time = occurrence_time .replace (tzinfo = timezone .utc )
246- return occurrence_time
247- return occurrence_time
259+ def extract_location_code (self ) -> str :
260+ location = self .fhir_json_data .get ("location" , {})
261+
262+ if location :
263+ identifier = location .get ("identifier" , {})
264+ return identifier .get ("value" , self .DEFAULT_LOCATION )
248265
249- except Exception as e :
250- message = "DateTime conversion error [%s]: %s" % (e .__class__ .__name__ , e )
251- error = self ._log_error (message , code = exception_messages .UNEXPECTED_EXCEPTION )
252- return error
266+ return self .DEFAULT_LOCATION
267+
268+ def extract_location_code_type_uri (self ) -> str :
269+ location = self .fhir_json_data .get ("location" , {})
270+
271+ if location :
272+ identifier = location .get ("identifier" , {})
273+ return identifier .get ("system" , self .ODS_ORG_CODE_SYSTEM_URL )
274+
275+ return self .ODS_ORG_CODE_SYSTEM_URL
276+
0 commit comments