11"""Generate inpatients data"""
22
3+ from operator import is_
4+
35from databricks .connect import DatabricksSession
46from delta .tables import DeltaTable
57from pyspark .sql import SparkSession
1113from raw_data .helpers import add_age_group_column , add_tretspef_grouped_column
1214
1315
16+ def create_capacity_conversion_group ():
17+ is_child = F .col ("age" ) <= 17
18+ is_surgical_specialty = F .col ("tretspef" ).rlike ("^1(?!80|9[012])" )
19+ is_zero_los = F .col ("speldur" ) == 0
20+ is_elective = F .col ("group" ) == "elective"
21+ is_nonelective = F .col ("group" ) == "non-elective"
22+
23+ # the logic for this will fall through, so we do not need to do thinks like apply an "is_adult"
24+ # filter after filtering for is_child.
25+ return (
26+ # daycases
27+ F .when (
28+ F .col ("classpat" ).isin (["2" , "3" ]),
29+ F .when (is_child , "ip-daycase-childrens" )
30+ .when (F .col ("tretspef" ).isin (["320" , "321" ]), "ip-daycase-cardiology" )
31+ .when (
32+ F .col ("tretspef" ).isin (["280" , "811" ]),
33+ "ip-daycase-interventional_radiology" ,
34+ )
35+ # TODO: add endoscopy
36+ .when (
37+ F .col ("tretspef" ).isin (["253" , "260" , "303" , "370" , "800" ]),
38+ "ip-daycase-oncology_haematology" ,
39+ )
40+ .when (is_surgical_specialty , "ip-daycase-surgical" )
41+ .otherwise ("ip-daycase-non_surgical" ),
42+ )
43+ # everything else will be non-daycase
44+ # maternity admissions
45+ .when (F .col ("tretspef" ) == "501" , "ip-maternity-obstetric" )
46+ .when (F .col ("tretspef" ) == "560" , "ip-maternity-midwife_led" )
47+ .when (F .col ("group" ) == "maternity" , "ip-maternity-unknown" )
48+ # paediatric admissions
49+ .when (
50+ is_child ,
51+ F .when (
52+ is_zero_los & is_nonelective , "ip-childrens-assessment_unit"
53+ ).otherwise ("ip-childrens-inpatients" ),
54+ )
55+ # adult admissions
56+ # elective admissions
57+ # TODO: add ip-stroke
58+ .when (
59+ is_elective ,
60+ F .when (
61+ is_surgical_specialty ,
62+ F .when (
63+ F .col ("speldur" ) <= 3 , "ip-elective-surgical-short_stay"
64+ ).otherwise ("ip-elective-surgical-long_stay" ),
65+ ).otherwise (
66+ F .when (
67+ F .col ("speldur" ) <= 3 , "ip-elective-non_surgical-short_stay"
68+ ).otherwise ("ip-elective-non_surgical-long_stay" )
69+ ),
70+ )
71+ # non-elective admissions
72+ .when (is_zero_los , "ip-adult_acute_assessment" )
73+ .when (
74+ is_surgical_specialty ,
75+ F .when (F .col ("speldur" ) <= 3 , "ip-acute-surgical-short_stay" ).otherwise (
76+ "ip-acute-surgical-longer_stay"
77+ ),
78+ )
79+ .when (F .col ("speldur" ) <= 3 , "ip-acute-non_surgical-short_stay" )
80+ .otherwise ("ip-acute-non_surgical-longer_stay" )
81+ )
82+
83+
1484def get_inpatients_data (spark : SparkSession ) -> None :
1585 """Get Inpatients Data"""
1686 # Spell has maternity delivery episode
@@ -92,6 +162,8 @@ def get_inpatients_data(spark: SparkSession) -> None:
92162 # add in primary diagnosis and procedure columns
93163 .join (df_primary_diagnosis , ["epikey" , "fyear" , "procode3" ], "left" )
94164 .join (df_primary_procedure , ["epikey" , "fyear" , "procode3" ], "left" )
165+ # capacity conversion
166+ .withColumn ("capacity_conversion_group" , create_capacity_conversion_group ())
95167 .select (
96168 F .col ("epikey" ),
97169 F .col ("fyear" ),
@@ -110,6 +182,7 @@ def get_inpatients_data(spark: SparkSession) -> None:
110182 F .col ("tretspef_grouped" ),
111183 F .col ("hsagrp" ),
112184 F .col ("group" ),
185+ F .col ("capacity_conversion_group" ),
113186 F .col ("admidate" ),
114187 F .col ("disdate" ),
115188 F .col ("speldur" ),
@@ -186,3 +259,5 @@ def main() -> None:
186259
187260if __name__ == "__main__" :
188261 main ()
262+ main ()
263+ main ()
0 commit comments