Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/nhp/data/aggregated_data/ecds.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def get_ecds_data(spark: SparkSession) -> DataFrame:
F.col("type"),
F.col("hsagrp"),
F.col("ndggrp"),
F.col("capacity_conversion_group"),
F.col("icb"),
F.col("is_main_icb"),
F.col("is_adult"),
Expand Down
1 change: 1 addition & 0 deletions src/nhp/data/aggregated_data/outpatients.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ def get_outpatients_data(spark: SparkSession) -> DataFrame:
F.col("pod"),
F.col("hsagrp"),
F.col("ndggrp"),
F.col("capacity_conversion_group"),
F.col("has_procedures"),
F.col("sushrg").substr(1, 4).alias("sushrg_trimmed"),
F.col("icb"),
Expand Down
12 changes: 9 additions & 3 deletions src/nhp/data/raw_data/aae.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,19 @@
"""Generate the AAE data"""

import pyspark.sql.functions as F
from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import * # noqa: F403

import pyspark.sql.functions as F
from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping
from nhp.data.nhp_datasets.local_authorities import local_authority_successors
from nhp.data.nhp_datasets.providers import read_data_with_provider
from nhp.data.raw_data.helpers import add_age_group_column
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.types import * # noqa: F403


def create_capacity_conversion_group():
# can't create capacity groups on AAE data
return F.lit("aae-unknown")


def get_aae_data(spark: SparkSession) -> DataFrame:
Expand Down Expand Up @@ -192,6 +197,7 @@ def get_aae_data(spark: SparkSession) -> DataFrame:
.withColumn("tretspef_grouped", F.lit("Other"))
.withColumn("pod", F.concat(F.lit("aae_type-"), F.col("aedepttype")))
.withColumn("ndggrp", F.col("group"))
.withColumn("capacity_conversion_group", create_capacity_conversion_group())
.repartition("fyear", "provider")
)

Expand Down
18 changes: 15 additions & 3 deletions src/nhp/data/raw_data/ecds.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,25 @@
from itertools import chain

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403

from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping
from nhp.data.nhp_datasets.local_authorities import local_authority_successors
from nhp.data.nhp_datasets.providers import add_provider
from nhp.data.raw_data.helpers import add_age_group_column
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403


def create_capacity_conversion_group():
is_child = F.col("age") <= 17

return (
F.when(F.col("acuity") == "immediate-resuscitation", "aae-resus")
.when(is_child, "aae-childrens")
.when(F.col("acuity").isin(["urgent", "very-urgent"]), "aae-majors")
.otherwise("aae-minors")
)


def get_ecds_data(spark: SparkSession) -> DataFrame:
Expand Down Expand Up @@ -239,6 +250,7 @@ def get_ecds_data(spark: SparkSession) -> DataFrame:
.withColumn("tretspef_grouped", F.lit("Other"))
.withColumn("pod", F.concat(F.lit("aae_type-"), F.col("aedepttype")))
.withColumn("ndggrp", F.col("group"))
.withColumn("capacity_conversion_group", create_capacity_conversion_group())
.repartition("fyear", "provider")
)

Expand Down
77 changes: 74 additions & 3 deletions src/nhp/data/raw_data/inpatients.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,81 @@

from databricks.connect import DatabricksSession
from delta.tables import DeltaTable
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403

from nhp.data.nhp_datasets.apc import apc_primary_procedures, hes_apc
from nhp.data.nhp_datasets.icbs import add_main_icb
from nhp.data.raw_data.helpers import add_age_group_column, add_tretspef_grouped_column
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403


def create_capacity_conversion_group():
is_child = F.col("age") <= 17
is_surgical_specialty = F.col("tretspef").rlike("^1(?!80|9[012])")
is_zero_los = F.col("speldur") == 0
is_elective = F.col("group") == "elective"
is_nonelective = F.col("group") == "non-elective"

# the logic for this will fall through, so we do not need to do thinks like apply an "is_adult"
# filter after filtering for is_child.
return (
# daycases
F.when(
F.col("classpat").isin(["2", "3"]),
F.when(is_child, "ip-daycase-childrens")
.when(F.col("tretspef").isin(["320", "321"]), "ip-daycase-cardiology")
.when(
F.col("tretspef").isin(["280", "811"]),
"ip-daycase-interventional_radiology",
)
# TODO: add endoscopy
.when(
F.col("tretspef").isin(["253", "260", "303", "370", "800"]),
"ip-daycase-oncology_haematology",
)
.when(is_surgical_specialty, "ip-daycase-surgical")
.otherwise("ip-daycase-non_surgical"),
)
# everything else will be non-daycase
# maternity admissions
.when(F.col("tretspef") == "501", "ip-maternity-obstetric")
.when(F.col("tretspef") == "560", "ip-maternity-midwife_led")
.when(F.col("group") == "maternity", "ip-maternity-unknown")
# paediatric admissions
.when(
is_child,
F.when(
is_zero_los & is_nonelective, "ip-childrens-assessment_unit"
).otherwise("ip-childrens-inpatients"),
)
# adult admissions
# elective admissions
# TODO: add ip-stroke
.when(
is_elective,
F.when(
is_surgical_specialty,
F.when(
F.col("speldur") <= 3, "ip-elective-surgical-short_stay"
).otherwise("ip-elective-surgical-long_stay"),
).otherwise(
F.when(
F.col("speldur") <= 3, "ip-elective-non_surgical-short_stay"
).otherwise("ip-elective-non_surgical-long_stay")
),
)
# non-elective admissions
.when(is_zero_los, "ip-adult_acute_assessment")
.when(
is_surgical_specialty,
F.when(F.col("speldur") <= 3, "ip-acute-surgical-short_stay").otherwise(
"ip-acute-surgical-longer_stay"
),
)
.when(F.col("speldur") <= 3, "ip-acute-non_surgical-short_stay")
.otherwise("ip-acute-non_surgical-longer_stay")
)


def get_inpatients_data(spark: SparkSession) -> DataFrame:
Expand Down Expand Up @@ -90,6 +158,8 @@ def get_inpatients_data(spark: SparkSession) -> DataFrame:
# add in primary diagnosis and procedure columns
.join(df_primary_diagnosis, ["epikey", "fyear", "procode3"], "left")
.join(df_primary_procedure, ["epikey", "fyear", "procode3"], "left")
# capacity conversion
.withColumn("capacity_conversion_group", create_capacity_conversion_group())
.select(
F.col("epikey"),
F.col("fyear"),
Expand All @@ -108,6 +178,7 @@ def get_inpatients_data(spark: SparkSession) -> DataFrame:
F.col("tretspef_grouped"),
F.col("hsagrp"),
F.col("group"),
F.col("capacity_conversion_group"),
F.col("admidate"),
F.col("disdate"),
F.col("speldur"),
Expand Down
23 changes: 20 additions & 3 deletions src/nhp/data/raw_data/outpatients.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,30 @@
"""Generate outpatients data"""

from databricks.connect import DatabricksSession
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403

from nhp.data.nhp_datasets.icbs import add_main_icb, icb_mapping
from nhp.data.nhp_datasets.local_authorities import local_authority_successors
from nhp.data.nhp_datasets.providers import read_data_with_provider
from nhp.data.raw_data.helpers import add_age_group_column, add_tretspef_grouped_column
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import * # noqa: F403


def create_capacity_conversion_group():
is_maternity = F.col("trestpef").isin(["424", "501", "505", "560"])
is_child = F.col("age") <= 17

return F.when(
F.col("has_procedures"),
F.when(is_maternity, "op-procedure-maternity")
.when(is_child, "op-procedure-childrens")
.otherwise("op-procedure-adult"),
).otherwise(
F.when(is_maternity, "op-maternity")
.when(is_child, "op-childrens")
.otherwise("op-adult")
)


def get_outpatients_data(spark: SparkSession) -> DataFrame:
Expand Down Expand Up @@ -144,6 +160,7 @@ def get_outpatients_data(spark: SparkSession) -> DataFrame:
.when(F.col("is_first"), "op_first")
.otherwise("op_follow-up"),
)
.withColumn("capacity_conversion_group", create_capacity_conversion_group())
.withColumn("ndggrp", F.col("group"))
)

Expand Down
Loading