diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index d5be894112..f9c2b24a9e 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1,14 +1,13 @@ import datetime import heapq as hp import itertools -import math import re import warnings from collections import Counter, defaultdict from collections.abc import Iterable from itertools import repeat from pathlib import Path -from typing import Dict, List, NamedTuple, Optional, Tuple, Union +from typing import Dict, List, NamedTuple, Optional, Union import numpy as np import pandas as pd @@ -360,9 +359,9 @@ class HealthSystem(Module): "mode_appt_constraints": Parameter( Types.INT, "Integer code in `{1, 2}` determining mode of constraints with regards to officer numbers " - "and time - 1: elastic constraints, all HSI events run with squeeze factor, provided " + "and time - 1: elastic constraints, all HSI events run, provided " "officers required to deliver the HSI have capabilities > 0" - "2: hard constraints, only HSI events with no squeeze factor run. N.B. This parameter" + "2: hard constraints, only HSI events for which capabilities are available run. N.B. This parameter" "is over-ridden if an argument is provided to the module initialiser.", ), "mode_appt_constraints_postSwitch": Parameter( @@ -402,7 +401,6 @@ def __init__( use_funded_or_actual_staffing: Optional[str] = None, disable: bool = False, disable_and_reject_all: bool = False, - compute_squeeze_factor_to_district_level: bool = True, hsi_event_count_log_period: Optional[str] = "month", ): """ @@ -410,9 +408,8 @@ def __init__( :param service_availability: A list of treatment IDs to allow. :param mode_appt_constraints: Integer code in ``{1, 2}`` determining mode of constraints with regards to officer numbers and time - 1: elastic constraints, all HSI - events run with squeeze factor provided officers required have nonzero capabilities, - 2: hard constraints, only HSI events with - no squeeze factor run. + events run provided officers required have nonzero capabilities, + 2: hard constraints, only HSI events for which capabilities are available run. :param cons_availability: If 'default' then use the availability specified in the ResourceFile; if 'none', then let no consumable be ever be available; if 'all', then all consumables are always available. When using 'all' or 'none', requests for consumables are not logged. @@ -436,8 +433,6 @@ def __init__( logging) and every HSI event runs. :param disable_and_reject_all: If ``True``, disable health system and no HSI events run - :param compute_squeeze_factor_to_district_level: Whether to compute squeeze_factors to the district level, or - the national level (which effectively pools the resources across all districts). :param hsi_event_count_log_period: Period over which to accumulate counts of HSI events that have run before logging and reseting counters. Should be on of strings ``'day'``, ``'month'``, ``'year'``. ``'simulation'`` to log at the @@ -528,12 +523,6 @@ def __init__( assert equip_availability in (None, "default", "all", "none") self.arg_equip_availability = equip_availability - # `compute_squeeze_factor_to_district_level` is a Boolean indicating whether the computation of squeeze_factors - # should be specific to each district (when `True`), or if the computation of squeeze_factors should be on the - # basis that resources from all districts can be effectively "pooled" (when `False). - assert isinstance(compute_squeeze_factor_to_district_level, bool) - self.compute_squeeze_factor_to_district_level = compute_squeeze_factor_to_district_level - # Create the Diagnostic Test Manager to store and manage all Diagnostic Test self.dx_manager = DxManager(self) @@ -550,13 +539,7 @@ def __init__( self._summary_counter = HealthSystemSummaryCounter() # Create counter for the running total of footprint of all the HSIs being run today - self.running_total_footprint: Counter = Counter() - - # A reusable store for holding squeeze factors in get_squeeze_factors() - self._get_squeeze_factors_store_grow = 500 - ## We need this to be a dictionary indexed by clinic names, but we don't have the clinic names yet. - ## create an empty dictionary here, and then populate it properly later. - self._get_squeeze_factors_store = {} + self.running_total_footprint = defaultdict(Counter) self._hsi_event_count_log_period = hsi_event_count_log_period if hsi_event_count_log_period in {"day", "month", "year", "simulation"}: @@ -849,12 +832,6 @@ def pre_initialise_population(self): # Set up framework for considering a priority policy self.setup_priority_policy() - ## Initialise the stores for squeeze factors - self._get_squeeze_factors_store = { - clinic: np.zeros(self._get_squeeze_factors_store_grow) for clinic in self._clinic_names - } - - def initialise_population(self, population): self.bed_days.initialise_population(population.props) @@ -1284,7 +1261,7 @@ def _rescale_capabilities_to_capture_effective_capability(self): for clinic, clinic_cl in self._daily_capabilities.items(): for facID_and_officer in clinic_cl.keys(): rescaling_factor = self._summary_counter.frac_time_used_by_facID_and_officer( - facID_and_officer=facID_and_officer + facID_and_officer=facID_and_officer, clinic=clinic ) if rescaling_factor > 1 and rescaling_factor != float("inf"): self._daily_capabilities[clinic][facID_and_officer] *= rescaling_factor @@ -1658,9 +1635,6 @@ def check_hsi_event_is_valid(self, hsi_event): self.bed_days.check_beddays_footprint_format(hsi_event.BEDDAYS_FOOTPRINT) - # Check that this can accept the squeeze argument - assert _accepts_argument(hsi_event.run, "squeeze_factor") - # Check that the event does not request an appointment at a facility # level which is not possible appt_type_to_check_list = hsi_event.EXPECTED_APPT_FOOTPRINT.keys() @@ -1834,162 +1808,21 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f return appt_footprint_times - def get_squeeze_factors( - self, - footprints_per_event, - total_footprint, - current_capabilities, - compute_squeeze_factor_to_district_level: bool, - ): - for clinic, clinic_cl in current_capabilities.items(): - self.get_clinic_squeeze_factors( - clinic, footprints_per_event, total_footprint, clinic_cl, compute_squeeze_factor_to_district_level - ) - - return self._get_squeeze_factors_store - - def get_clinic_squeeze_factors( - self, - clinic, - footprints_per_event, - total_footprint, - current_capabilities, - compute_squeeze_factor_to_district_level: bool, - ): - """ - This will compute the squeeze factors for each HSI event from the list of all - the calls on health system resources for the day. - The squeeze factor is defined as (call/available - 1). ie. the highest - fractional over-demand among any type of officer that is called-for in the - appt_footprint of an HSI event. - A value of 0.0 signifies that there is no squeezing (sufficient resources for - the EXPECTED_APPT_FOOTPRINT). - - :param footprints_per_event: List, one entry per HSI event, containing the - minutes required from each health officer in each health facility as a - Counter (using the standard index) - :param total_footprint: Counter, containing the total minutes required from - each health officer in each health facility when non-zero, (using the - standard index) - :param current_capabilities: Series giving the amount of time available for - each health officer in each health facility (using the standard index) - :param compute_squeeze_factor_to_district_level: Boolean indicating whether - the computation of squeeze_factors should be specific to each district - (when `True`), or if the computation of squeeze_factors should be on - the basis that resources from all districts can be effectively "pooled" - (when `False). - - :return: squeeze_factors: an array of the squeeze factors for each HSI event - (position in array matches that in the all_call_today list). - """ - - def get_total_minutes_of_this_officer_in_this_district(_officer): - """Returns the minutes of current capabilities for the officer identified (this officer type in this - facility_id).""" - return current_capabilities.get(_officer) - - def get_total_minutes_of_this_officer_in_all_district(_officer): - """Returns the minutes of current capabilities for the officer identified in all districts (this officer - type in this all facilities of the same level in all districts).""" - - def split_officer_compound_string(cs) -> Tuple[int, str]: - """Returns (facility_id, officer_type) for the officer identified in the string of the form: - 'FacilityID_{facility_id}_Officer_{officer_type}'.""" - _, _facility_id, _, _officer_type = cs.split("_", 3) # (NB. Some 'officer_type' include "_") - return int(_facility_id), _officer_type - - def _match(_this_officer, facility_ids: List[int], officer_type: str): - """Returns True if the officer identified is of the identified officer_type and is in one of the - facility_ids.""" - this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) - return (this_officer_type == officer_type) and (this_facility_id in facility_ids) - - facility_id, officer_type = split_officer_compound_string(_officer) - facility_level = self._facility_by_facility_id[int(facility_id)].level - facilities_of_same_level_in_all_district = [ - _fac.id for _fac in self._facilities_for_each_district[facility_level].values() - ] - - officers_in_the_same_level_in_all_districts = [ - _officer - for _officer in current_capabilities.keys() - if _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) - ] - - return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) - - # 1) Compute the load factors for each officer type at each facility that is - # called-upon in this list of HSIs - load_factor = {} - for officer, call in total_footprint.items(): - if compute_squeeze_factor_to_district_level: - availability = get_total_minutes_of_this_officer_in_this_district(officer) - else: - availability = get_total_minutes_of_this_officer_in_all_district(officer) - - # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 - if availability is None: - logger.warning( - key="message", data=(f"Requested officer {officer} is not contemplated by health system. ") - ) - availability = 0 - - if availability == 0: - load_factor[officer] = float("inf") - else: - load_factor[officer] = max(call / availability - 1, 0.0) - - # 2) Convert these load-factors into an overall 'squeeze' signal for each HSI, - # based on the load-factor of the officer with the largest time requirement for that - # event (or zero if event has an empty footprint) - - # Instead of repeatedly creating lists for squeeze factors, we reuse a numpy array - # If the current store is too small, replace it - if len(footprints_per_event) > len(self._get_squeeze_factors_store[clinic]): - # The new array size is a multiple of `grow` - new_size = ( - math.ceil(len(footprints_per_event) / self._get_squeeze_factors_store_grow) - * self._get_squeeze_factors_store_grow - ) - self._get_squeeze_factors_store[clinic] = np.zeros(new_size) - - for i, footprint in enumerate(footprints_per_event): - if footprint: - # If any of the required officers are not available at the facility, set overall squeeze to inf - require_missing_officer = False - for officer in footprint: - if load_factor[officer] == float("inf"): - require_missing_officer = True - # No need to check the rest - break - - if require_missing_officer: - self._get_squeeze_factors_store[clinic][i] = np.inf - else: - self._get_squeeze_factors_store[clinic][i] = max(load_factor[footprint.most_common()[0][0]], 0.0) - else: - self._get_squeeze_factors_store[clinic][i] = 0.0 - - return self._get_squeeze_factors_store - def record_hsi_event( - self, hsi_event, actual_appt_footprint=None, squeeze_factor=None, did_run=True, priority=None, clinic=None + self, hsi_event, actual_appt_footprint=None, did_run=True, priority=None, clinic=None ): """ Record the processing of an HSI event. It will also record the actual appointment footprint. :param hsi_event: The HSI_Event (containing the initial expectations of footprints) :param actual_appt_footprint: The actual Appointment Footprint (if individual event) - :param squeeze_factor: The squeeze factor (if individual event) """ # HSI-Event - _squeeze_factor = squeeze_factor if squeeze_factor != np.inf else 100.0 self.write_to_hsi_log( event_details=hsi_event.as_namedtuple(actual_appt_footprint), person_id=hsi_event.target, facility_id=hsi_event.facility_info.id, - squeeze_factor=_squeeze_factor, did_run=did_run, priority=priority, clinic=clinic, @@ -2000,7 +1833,6 @@ def write_to_hsi_log( event_details: HSIEventDetails, person_id: int, facility_id: Optional[int], - squeeze_factor: float, did_run: bool, priority: int, clinic: str, @@ -2013,7 +1845,7 @@ def write_to_hsi_log( "TREATMENT_ID": event_details.treatment_id, "Number_By_Appt_Type_Code": dict(event_details.appt_footprint), "Person_ID": person_id, - "Squeeze_Factor": squeeze_factor, + "Squeeze_Factor": 0.0, "priority": priority, "did_run": did_run, "Facility_Level": event_details.facility_level if event_details.facility_level is not None else -99, @@ -2033,7 +1865,6 @@ def write_to_hsi_log( self._summary_counter.record_hsi_event( treatment_id=event_details.treatment_id, hsi_event_name=event_details.event_name, - squeeze_factor=squeeze_factor, appt_footprint=event_details.appt_footprint, level=event_details.facility_level, ) @@ -2091,17 +1922,14 @@ def write_to_never_ran_hsi_log( level=event_details.facility_level, ) - def log_current_capabilities_and_usage(self): - for clinic_name in self._clinic_names: - self.log_clinic_current_capabilities_and_usage(clinic_name) - def log_clinic_current_capabilities_and_usage(self, clinic_name): + def log_current_capabilities_and_usage(self, clinic_name): """ This will log the percentage of the current capabilities that is used at each Facility Type, according the `runnning_total_footprint`. This runs every day. """ current_capabilities = self.capabilities_today[clinic_name] - total_footprint = self.running_total_footprint + total_footprint = self.running_total_footprint[clinic_name] # Combine the current_capabilities and total_footprint per-officer totals comparison = pd.DataFrame(index=current_capabilities.keys()) @@ -2150,8 +1978,9 @@ def log_clinic_current_capabilities_and_usage(self, clinic_name): ) self._summary_counter.record_hs_status( - fraction_time_used_across_all_facilities=fraction_time_used_overall, - fraction_time_used_by_facID_and_officer=fraction_time_used_by_facID_and_officer.to_dict(), + fraction_time_used_across_all_facilities_in_this_clinic=fraction_time_used_overall, + fraction_time_used_by_facID_and_officer_in_this_clinic=fraction_time_used_by_facID_and_officer.to_dict(), + clinic=clinic_name ) def remove_beddays_footprint(self, person_id): @@ -2277,6 +2106,27 @@ def on_end_of_year(self) -> None: # Record equipment usage for the year, for each facility self._record_general_equipment_usage_for_year() + def check_if_all_required_officers_have_nonzero_capabilities(self, expected_time_requests, clinic)-> bool: + """Check if all officers required by the appt footprint are available to perform the HSI""" + + ok_to_run = True + + for officer in expected_time_requests.keys(): + availability = self.capabilities_today[clinic][officer] + + # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 + if availability is None: + logger.warning( + key="message", + data=(f"Requested officer {officer} is not contemplated by health system. ") + ) + availability = 0.0 + + if availability == 0.0: + ok_to_run = False + + return ok_to_run + def run_individual_level_events_in_mode_1( self, _list_of_individual_hsi_event_tuples: List[HSIEventQueueItem] ) -> List: @@ -2288,40 +2138,43 @@ def run_individual_level_events_in_mode_1( # Examine total call on health officers time from the HSI events in the list: # For all events in the list, expand the appt-footprint of the event to give the demands on each - # officer-type in each facility_id. - footprints_of_all_individual_level_hsi_event = [ - event_tuple.hsi_event.expected_time_requests for event_tuple in _list_of_individual_hsi_event_tuples - ] + # officer-type in each facility_id and clinic. + footprints_of_all_individual_level_hsi_event = defaultdict(list) + ## _list_of_individual_hsi_event_tuples is a flat list, whereas we will now + ## store the footprint by clinic; as we loop over the list of events to be run, we + ## will retrieve the updated footprint using get_appt_footprint_as_time_request. + ## We want to ensure that we update the footprint of the ``correct'' event. We will + ## therefore also store the number of the event in the original flat list in a + ## dictionary keyed by clinics. + event_num_of_all_individual_level_hsi_event = defaultdict(list) + for eve_num, event_tuple in enumerate(_list_of_individual_hsi_event_tuples): + event_clinic = event_tuple.clinic_eligibility + footprints_of_all_individual_level_hsi_event[event_clinic].append( + event_tuple.hsi_event.expected_time_requests + ) + event_num_of_all_individual_level_hsi_event[event_clinic].append(eve_num) - # Compute total appointment footprint across all events - for footprint in footprints_of_all_individual_level_hsi_event: - # Counter.update method when called with dict-like argument adds counts - # from argument to Counter object called from - self.running_total_footprint.update(footprint) - - # Estimate Squeeze-Factors for today - squeeze_factor_per_hsi_event = self.get_squeeze_factors( - footprints_per_event=footprints_of_all_individual_level_hsi_event, - total_footprint=self.running_total_footprint, - current_capabilities=self.capabilities_today, - compute_squeeze_factor_to_district_level=self.compute_squeeze_factor_to_district_level, - ) + # For each clinic, compute total appointment footprint across all events + + for clinic, footprint in footprints_of_all_individual_level_hsi_event.items(): + for hsi_footprint in footprint: + # Counter.update method when called with dict-like argument adds counts + # from argument to Counter object called from + self.running_total_footprint[clinic].update(hsi_footprint) for ev_num, event in enumerate(_list_of_individual_hsi_event_tuples): _priority = event.priority clinic = event.clinic_eligibility event = event.hsi_event - squeeze_factor = squeeze_factor_per_hsi_event[clinic][ev_num] # todo use zip here! # store appt_footprint before running _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT - # Mode 1: All HSI Events run with squeeze provided latter is not inf + # In this mode, all HSI Events run provided all required officers have non-zero capabilities ok_to_run = True - - if self.mode_appt_constraints == 1 and squeeze_factor == float("inf"): - ok_to_run = False - + if event.expected_time_requests: + ok_to_run = self.check_if_all_required_officers_have_nonzero_capabilities( + event.expected_time_requests, clinic=clinic) if ok_to_run: # Compute the bed days that are allocated to this HSI and provide this information to the HSI if sum(event.BEDDAYS_FOOTPRINT.values()): @@ -2336,7 +2189,7 @@ def run_individual_level_events_in_mode_1( ) # Run the HSI event (allowing it to return an updated appt_footprint) - actual_appt_footprint = event.run(squeeze_factor=squeeze_factor) + actual_appt_footprint = event.run(squeeze_factor=0.0) # Check if the HSI event returned updated appt_footprint if actual_appt_footprint is not None: @@ -2349,17 +2202,11 @@ def run_individual_level_events_in_mode_1( updated_call = self.get_appt_footprint_as_time_request( facility_info=event.facility_info, appt_footprint=actual_appt_footprint ) - original_call = footprints_of_all_individual_level_hsi_event[ev_num] - footprints_of_all_individual_level_hsi_event[ev_num] = updated_call - self.running_total_footprint -= original_call - self.running_total_footprint += updated_call - - squeeze_factor_per_hsi_event = self.get_squeeze_factors( - footprints_per_event=footprints_of_all_individual_level_hsi_event, - total_footprint=self.running_total_footprint, - current_capabilities=self.capabilities_today, - compute_squeeze_factor_to_district_level=self.compute_squeeze_factor_to_district_level, - ) + ev_num_in_clinics_fp = event_num_of_all_individual_level_hsi_event[clinic].index(ev_num) + original_call = footprints_of_all_individual_level_hsi_event[clinic][ev_num_in_clinics_fp] + footprints_of_all_individual_level_hsi_event[clinic][ev_num_in_clinics_fp] = updated_call + self.running_total_footprint[clinic] -= original_call + self.running_total_footprint[clinic] += updated_call else: # no actual footprint is returned so take the expected initial declaration as the actual, @@ -2370,7 +2217,6 @@ def run_individual_level_events_in_mode_1( self.record_hsi_event( hsi_event=event, actual_appt_footprint=actual_appt_footprint, - squeeze_factor=squeeze_factor, did_run=True, priority=_priority, ) @@ -2393,7 +2239,6 @@ def run_individual_level_events_in_mode_1( self.record_hsi_event( hsi_event=event, actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, - squeeze_factor=squeeze_factor, did_run=False, priority=_priority, ) @@ -2621,9 +2466,6 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: # Retrieve officers&facility required for HSI original_call = next_event_tuple.hsi_event.expected_time_requests _priority = next_event_tuple.priority - # In this version of mode_appt_constraints = 2, do not have access to squeeze - # based on queue information, and we assume no squeeze ever takes place. - squeeze_factor = 0.0 # Check if any of the officers required have run out. out_of_resources = False @@ -2655,7 +2497,6 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, - squeeze_factor=squeeze_factor, did_run=False, priority=_priority, clinic=event_clinic, @@ -2692,7 +2533,7 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: # Expected appt footprint before running event _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT # Run event & get actual footprint - actual_appt_footprint = event.run(squeeze_factor=squeeze_factor) + actual_appt_footprint = event.run(squeeze_factor=0.0) # Check if the HSI event returned updated_appt_footprint, and if so adjust original_call if actual_appt_footprint is not None: @@ -2707,10 +2548,6 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: actual_appt_footprint = _appt_footprint_before_running updated_call = original_call - # Recalculate call on officers based on squeeze factor. - for k in updated_call.keys(): - updated_call[k] = updated_call[k] / (squeeze_factor + 1.0) - # Subtract this from capabilities used so-far today capabilities_monitor[event_clinic].subtract(updated_call) @@ -2729,14 +2566,13 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: ), ) - # Update today's footprint based on actual call and squeeze factor - self.module.running_total_footprint.update(updated_call) + # Update today's footprint based on actual call + self.module.running_total_footprint[event_clinic].update(updated_call) # Write to the log self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=actual_appt_footprint, - squeeze_factor=squeeze_factor, did_run=True, priority=_priority, clinic=event_clinic, @@ -2807,7 +2643,6 @@ def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: self.module.record_hsi_event( hsi_event=event, actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, - squeeze_factor=0, did_run=False, priority=next_event_tuple.priority, clinic=next_event_tuple.clinic_eligibility, @@ -2847,14 +2682,14 @@ def apply(self, population): ), person_id=-1, facility_id=_fac_id, - squeeze_factor=0.0, priority=-1, clinic=str(None), did_run=True, ) # Restart the total footprint of all calls today, beginning with those due to existing in-patients. - self.module.running_total_footprint = inpatient_footprints + # Important: Here we assign all inpatient bed-days to the GenericClinic + self.module.running_total_footprint["GenericClinic"] = inpatient_footprints # Create hold-over list. This will hold events that cannot occur today before they are added back to the queue. hold_over = list() @@ -2874,7 +2709,15 @@ def apply(self, population): hp.heappush(self.module.HSI_EVENT_QUEUE, hp.heappop(hold_over)) # Log total usage of the facilities - self.module.log_current_capabilities_and_usage() + for clinic in self.module._clinic_names: + ## In general there should not be any discrepancy between self.module._clinic_names + ## and keys of self.module.capabilities_today + ## But this can happen e.g. during testing because self.module.capabilities_today + ## only get structured during setup of daily capabilities. + if clinic not in self.module.capabilities_today: + continue + + self.module.log_current_capabilities_and_usage(clinic) # Trigger jobs to be done at the end of the day (after all HSI run) self.module.on_end_of_day() @@ -2917,23 +2760,17 @@ def _reset_internal_stores(self) -> None: self._never_ran_appts = defaultdict(int) # As above, but for `HSI_Event`s that have never ran self._never_ran_appts_by_level = {_level: defaultdict(int) for _level in ("0", "1a", "1b", "2", "3", "4")} - self._frac_time_used_overall = [] # Running record of the usage of the healthcare system - self._sum_of_daily_frac_time_used_by_facID_and_officer = Counter() - self._squeeze_factor_by_hsi_event_name = defaultdict(list) # Running record the squeeze-factor applying to each - # treatment_id. Key is of the form: - # ":" + self._frac_time_used_overall = defaultdict(list) # Running record of the usage of the healthcare system + self._sum_of_daily_frac_time_used_by_facID_and_officer = defaultdict(Counter) def record_hsi_event( - self, treatment_id: str, hsi_event_name: str, squeeze_factor: float, appt_footprint: Counter, level: str + self, treatment_id: str, hsi_event_name: str, appt_footprint: Counter, level: str ) -> None: """Add information about an `HSI_Event` to the running summaries.""" # Count the treatment_id: self._treatment_ids[treatment_id] += 1 - # Add the squeeze-factor to the list - self._squeeze_factor_by_hsi_event_name[f"{treatment_id}:{hsi_event_name}"].append(squeeze_factor) - # Count each type of appointment: for appt_type, number in appt_footprint: self._appts[appt_type] += number @@ -2961,29 +2798,33 @@ def record_never_ran_hsi_event( def record_hs_status( self, - fraction_time_used_across_all_facilities: float, - fraction_time_used_by_facID_and_officer: Dict[str, float], + fraction_time_used_across_all_facilities_in_this_clinic: float, + fraction_time_used_by_facID_and_officer_in_this_clinic: Dict[str, float], + clinic: str ) -> None: """Record a current status metric of the HealthSystem.""" # The fraction of all healthcare worker time that is used: - self._frac_time_used_overall.append(fraction_time_used_across_all_facilities) + self._frac_time_used_overall[clinic].append(fraction_time_used_across_all_facilities_in_this_clinic) - for facID_and_officer, fraction_time in fraction_time_used_by_facID_and_officer.items(): - self._sum_of_daily_frac_time_used_by_facID_and_officer[facID_and_officer] += fraction_time + for facID_and_officer, fraction_time in fraction_time_used_by_facID_and_officer_in_this_clinic.items(): + self._sum_of_daily_frac_time_used_by_facID_and_officer[clinic][facID_and_officer] += fraction_time def write_to_log_and_reset_counters(self): """Log summary statistics reset the data structures. This usually occurs at the end of the year.""" + print("what is looks like", {t_id: 0.0 for t_id, v in self._treatment_ids.items()}) + print(self._treatment_ids) + logger_summary.info( key="HSI_Event", description="Counts of the HSI_Events that have occurred in this calendar year by TREATMENT_ID, " - "and counts of the 'Appt_Type's that have occurred in this calendar year," - "and the average squeeze_factor for HSIs that have occurred in this calendar year.", + "and counts of the 'Appt_Type's that have occurred in this calendar year." + "Squeeze factors are always assumed to be 0.", data={ "TREATMENT_ID": self._treatment_ids, "Number_By_Appt_Type_Code": self._appts, "Number_By_Appt_Type_Code_And_Level": self._appts_by_level, - "squeeze_factor": {k: sum(v) / len(v) for k, v in self._squeeze_factor_by_hsi_event_name.items()}, + "squeeze_factor": {t_id: 0.0 for t_id, v in self._treatment_ids.items()}, }, ) logger_summary.info( @@ -3013,24 +2854,30 @@ def write_to_log_and_reset_counters(self): description="The fraction of all the healthcare worker time that is used each day, averaged over this " "calendar year.", data={ - "average_Frac_Time_Used_Overall": np.mean(self._frac_time_used_overall), + "average_Frac_Time_Used_Overall": { + clinic: np.mean(values) for clinic, values in self._frac_time_used_overall.items() + }, # <-- leaving space here for additional summary measures that may be needed in the future. }, ) # Log mean of 'fraction time used by facID and officer' from daily entries from the previous - # year. - logger_summary.info( - key="Capacity_By_FacID_and_Officer", - description="The fraction of healthcare worker time that is used each day, averaged over this " - "calendar year, for each officer type at each facility.", - data=flatten_multi_index_series_into_dict_for_logging(self.frac_time_used_by_facID_and_officer()), - ) + # year for each clinic + + for clinic in self._frac_time_used_overall.keys(): + logger_summary.info( + key="Capacity_By_FacID_and_Officer", + description="The fraction of healthcare worker time that is used each day, averaged over this " + "calendar year, for each officer type at each facility.", + data=flatten_multi_index_series_into_dict_for_logging(self.frac_time_used_by_facID_and_officer(clinic))#, + ) + self._reset_internal_stores() def frac_time_used_by_facID_and_officer( self, + clinic:str, facID_and_officer: Optional[str] = None, ) -> Union[float, pd.Series]: """Average fraction of time used by officer type and level since last reset. @@ -3039,17 +2886,18 @@ def frac_time_used_by_facID_and_officer( if facID_and_officer is not None: return ( - self._sum_of_daily_frac_time_used_by_facID_and_officer[facID_and_officer] - / len(self._frac_time_used_overall) + self._sum_of_daily_frac_time_used_by_facID_and_officer[clinic][facID_and_officer] + / len(self._frac_time_used_overall[clinic]) # Use len(self._frac_time_used_overall) as proxy for number of days in past year. ) else: # Return multiple in the form of a pd.Series with multiindex mean_frac_time_used = { - (_facID_and_officer): v / len(self._frac_time_used_overall) - for (_facID_and_officer), v in self._sum_of_daily_frac_time_used_by_facID_and_officer.items() - if (_facID_and_officer == facID_and_officer or _facID_and_officer is None) + (_facID_and_officer): v / len(self._frac_time_used_overall[clinic]) + for (_facID_and_officer), v in self._sum_of_daily_frac_time_used_by_facID_and_officer[clinic].items() + ##if (_facID_and_officer == facID_and_officer or _facID_and_officer is None) } + breakpoint() return pd.Series( index=pd.MultiIndex.from_tuples(mean_frac_time_used.keys(), names=["facID_and_officer"]), data=mean_frac_time_used.values(), diff --git a/tests/test_healthsystem.py b/tests/test_healthsystem.py index 98e9319641..dfa1c1bd1d 100644 --- a/tests/test_healthsystem.py +++ b/tests/test_healthsystem.py @@ -292,7 +292,7 @@ def test_run_in_mode_1_with_capacity(tmpdir, seed): @pytest.mark.slow -def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): +def test_rescaling_capabilities_based_on_load_factors(tmpdir, seed): # Capabilities should increase when a HealthSystem that has low capabilities changes mode with # the option `scale_to_effective_capabilities` set to `True`. @@ -305,144 +305,73 @@ def test_rescaling_capabilities_based_on_squeeze_factors(tmpdir, seed): "directory": tmpdir, "custom_levels": { "tlo.methods.healthsystem": logging.DEBUG, - }, - }, - resourcefilepath=resourcefilepath, + "tlo.methods.healthsystem.summary": logging.INFO + } + }, resourcefilepath=resourcefilepath ) # Register the core modules # Set the year in which mode is changed to start_date + 1 year, and mode after that still 1. # Check that in second year, squeeze factor is smaller on average. - sim.register( - demography.Demography(), - simplified_births.SimplifiedBirths(), - enhanced_lifestyle.Lifestyle(), - healthsystem.HealthSystem( - capabilities_coefficient=0.0000001, # This will mean that capabilities are - # very close to 0 everywhere. - # (If the value was 0, then it would - # be interpreted as the officers NEVER - # being available at a facility, - # which would mean the HSIs should not - # run (as opposed to running with - # a very high squeeze factor)). - ), - symptommanager.SymptomManager(), - healthseekingbehaviour.HealthSeekingBehaviour(), - mockitis.Mockitis(), - chronicsyndrome.ChronicSyndrome(), - ) + sim.register(demography.Demography(), + simplified_births.SimplifiedBirths(), + enhanced_lifestyle.Lifestyle(), + healthsystem.HealthSystem( + capabilities_coefficient=0.0000001, # This will mean that capabilities are + # very close to 0 everywhere. + # (If the value was 0, then it would + # be interpreted as the officers NEVER + # being available at a facility, + # which would mean the HSIs should not + # run (as opposed to running with + # a very high squeeze factor)). + ), + symptommanager.SymptomManager(), + healthseekingbehaviour.HealthSeekingBehaviour(), + mockitis.Mockitis(), + chronicsyndrome.ChronicSyndrome() + ) # Define the "switch" from Mode 1 to Mode 1, with the rescaling - hs_params = sim.modules["HealthSystem"].parameters - hs_params["mode_appt_constraints"] = 1 - hs_params["mode_appt_constraints_postSwitch"] = 1 - hs_params["year_mode_switch"] = start_date.year + 1 - hs_params["scale_to_effective_capabilities"] = True + hs_params = sim.modules['HealthSystem'].parameters + hs_params['mode_appt_constraints'] = 1 + hs_params['mode_appt_constraints_postSwitch'] = 1 + hs_params['year_mode_switch'] = start_date.year + 1 + hs_params['scale_to_effective_capabilities'] = True # Run the simulation - sim.make_initial_population(n=popsize) + sim.make_initial_population(n=1000) sim.simulate(end_date=end_date) check_dtypes(sim) # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) - - # Do the checks - assert len(output["tlo.methods.healthsystem"]["HSI_Event"]) > 0 - hsi_events = output["tlo.methods.healthsystem"]["HSI_Event"] - hsi_events["date"] = pd.to_datetime(hsi_events["date"]).dt.year - - # Check that all squeeze factors were high in 2010, but not all were high in 2011 - # thanks to rescaling of capabilities - assert ( - hsi_events.loc[ - (hsi_events["Person_ID"] >= 0) - & (hsi_events["Number_By_Appt_Type_Code"] != {}) - & (hsi_events["date"] == 2010), - "Squeeze_Factor", - ] - >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - - assert not ( - hsi_events.loc[ - (hsi_events["Person_ID"] >= 0) - & (hsi_events["Number_By_Appt_Type_Code"] != {}) - & (hsi_events["date"] == 2011), - "Squeeze_Factor", - ] - >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. + output = parse_log_file(sim.log_filepath, level=logging.INFO) + pd.set_option('display.max_columns', None) + summary = output['tlo.methods.healthsystem.summary'] + breakpoint() + capacity_by_officer_and_level = summary['Capacity_By_FacID_and_Officer'] + # Filter rows for the two years + row_2010 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2010-12-31"].squeeze() + row_2011 = capacity_by_officer_and_level.loc[capacity_by_officer_and_level["date"] == "2011-12-31"].squeeze() -@pytest.mark.slow -def test_run_in_mode_1_with_almost_no_capacity(tmpdir, seed): - # Events should run but (for those with non-blank footprints) with high squeeze factors - # (Mode 1 -> elastic constraints) - - # Establish the simulation object - sim = Simulation( - start_date=start_date, - seed=seed, - log_config={ - "filename": "log", - "directory": tmpdir, - "custom_levels": { - "tlo.methods.healthsystem": logging.DEBUG, - }, - }, - resourcefilepath=resourcefilepath, - ) - - # Define the service availability - service_availability = ["*"] - - # Register the core modules - sim.register( - demography.Demography(), - simplified_births.SimplifiedBirths(), - enhanced_lifestyle.Lifestyle(), - healthsystem.HealthSystem( - service_availability=service_availability, - capabilities_coefficient=0.0000001, # This will mean that capabilities are - # very close to 0 everywhere. - # (If the value was 0, then it would - # be interpreted as the officers NEVER - # being available at a facility, - # which would mean the HSIs should not - # run (as opposed to running with - # a very high squeeze factor)). - mode_appt_constraints=1, - ), - symptommanager.SymptomManager(), - healthseekingbehaviour.HealthSeekingBehaviour(), - mockitis.Mockitis(), - chronicsyndrome.ChronicSyndrome(), - ) - - # Run the simulation - sim.make_initial_population(n=popsize) - sim.simulate(end_date=end_date) - check_dtypes(sim) + # Dictionary to store results + results = {} - # read the results - output = parse_log_file(sim.log_filepath, level=logging.DEBUG) + # Check that load has significantly reduced in second year, thanks to the significant + # rescaling of capabilities. + # (There is some degeneracy here, in that load could also be reduced due to declining demand. + # However it is extremely unlikely that demand for care would have dropped by a factor of 100 + # in second year, hence this is a fair test). + for col in capacity_by_officer_and_level.columns: + if col == "date": + continue # skip the date column + if not (capacity_by_officer_and_level[col] == 0).any(): # check column is not all zeros + ratio = row_2010[col] / row_2011[col] - # Do the checks - assert len(output["tlo.methods.healthsystem"]["HSI_Event"]) > 0 - hsi_events = output["tlo.methods.healthsystem"]["HSI_Event"] - # assert hsi_events['did_run'].all() - assert ( - hsi_events.loc[ - (hsi_events["Person_ID"] >= 0) & (hsi_events["Number_By_Appt_Type_Code"] != {}), "Squeeze_Factor" - ] - >= 100.0 - ).all() # All the events that had a non-blank footprint experienced high squeezing. - assert (hsi_events.loc[hsi_events["Person_ID"] < 0, "Squeeze_Factor"] == 0.0).all() + results[col] = ratio > 100 - # Check that some Mockitis cures occurred (though health system) - assert any(sim.population.props["mi_status"] == "P") + assert all(results.values()) @pytest.mark.slow @@ -909,7 +838,7 @@ def dict_all_close(dict_1, dict_2): ) detailed_treatment_id_mean_squeeze_factors = ( detailed_hsi_event.assign( - treatment_id_hsi_name=lambda df: df["TREATMENT_ID"] + ":" + df["Event_Name"], + treatment_id_hsi_name=lambda df: df["TREATMENT_ID"], year=lambda df: df.date.dt.year, ) .groupby(by=["treatment_id_hsi_name", "year"])["Squeeze_Factor"] @@ -925,16 +854,22 @@ def dict_all_close(dict_1, dict_2): ) # - Average fraction of HCW time used (year by year) - assert ( - summary_capacity.set_index(pd.to_datetime(summary_capacity.date).dt.year)["average_Frac_Time_Used_Overall"] - .round(4) - .to_dict() - == detailed_capacity.set_index(pd.to_datetime(detailed_capacity.date).dt.year)["Frac_Time_Used_Overall"] - .groupby(level=0) - .mean() - .round(4) - .to_dict() - ) + summary_capacity_indexed = summary_capacity.set_index(pd.to_datetime(summary_capacity.date).dt.year) + for clinic in sim.modules["HealthSystem"]._clinic_names: + summary_clinic_capacity = summary_capacity_indexed["average_Frac_Time_Used_Overall"].apply( + lambda x: x.get(clinic, None) + ) + detailed_clinic_capacity = detailed_capacity[detailed_capacity["Clinic"] == clinic] + assert ( + summary_clinic_capacity.round(4).to_dict() + == detailed_clinic_capacity.set_index(pd.to_datetime(detailed_clinic_capacity.date).dt.year)[ + "Frac_Time_Used_Overall" + ] + .groupby(level=0) + .mean() + .round(4) + .to_dict() + ) # - Consumables (total over entire period of log that are available / not available) # add _Item_ assert ( @@ -1218,7 +1153,7 @@ def apply(self, person_id, squeeze_factor): .dropna() .to_dict() == detailed_hsi_event.assign( - treatment_id_hsi_name=lambda df: df["TREATMENT_ID"] + ":" + df["Event_Name"], + treatment_id_hsi_name=lambda df: df["TREATMENT_ID"], year=lambda df: df.date.dt.year, ) .groupby(by=["treatment_id_hsi_name", "year"])["Squeeze_Factor"] @@ -1312,11 +1247,11 @@ def test_HealthSystemChangeParameters(seed, tmpdir): "use_funded_or_actual_staffing_postSwitch": "actual", } parameters_to_change = ["cons_availability", "equip_availability", "use_funded_or_actual_staffing"] - + initial_parameters = {"cons_availability" : test_parameters["cons_availability"], "equip_availability": test_parameters["equip_availability"], "use_funded_or_actual_staffing": test_parameters["use_funded_or_actual_staffing"]} - + new_parameters = {"cons_availability" : test_parameters["cons_availability_postSwitch"], "equip_availability": test_parameters["equip_availability_postSwitch"], "use_funded_or_actual_staffing": test_parameters["use_funded_or_actual_staffing_postSwitch"]} @@ -2143,8 +2078,8 @@ def initialise_simulation(self, sim): return sim - def schedule_hsi_events(notherclinic, nclinic1, sim): - for i in range(0, notherclinic): + def schedule_hsi_events(ngenericclinic, nclinic1, sim): + for i in range(0, ngenericclinic): hsi = DummyHSIEvent( module=sim.modules["DummyModuleGenericClinic"], person_id=i, @@ -2156,7 +2091,7 @@ def schedule_hsi_events(notherclinic, nclinic1, sim): hsi, topen=sim.date, tclose=sim.date + pd.DateOffset(days=1), priority=1 ) - for i in range(notherclinic, notherclinic + nclinic1): + for i in range(ngenericclinic, ngenericclinic + nclinic1): hsi = DummyHSIEvent( module=sim.modules["DummyModuleClinic1"], person_id=i, @@ -2897,3 +2832,186 @@ def initialise_simulation(self, sim): == {"Dummy": 1} # recorded in both the usual and the 'non-blank' logger ) + +def test_clinics_rescaling_factor(seed, tmpdir): + """Test that rescaling factor for clinics is computed correctly.""" + + # Create a dummy HSI event class + class DummyHSIEvent(HSI_Event, IndividualScopeEventMixin): + def __init__(self, module, person_id, appt_type, level, treatment_id): + super().__init__(module, person_id=person_id) + self.TREATMENT_ID = treatment_id + self.EXPECTED_APPT_FOOTPRINT = self.make_appt_footprint({appt_type: 1}) + self.ACCEPTED_FACILITY_LEVEL = level + + def apply(self, person_id, squeeze_factor): + self.this_hsi_event_ran = True + + def create_simulation(tmpdir: Path, tot_population) -> Simulation: + class DummyModuleGenericClinic(Module): + METADATA = {Metadata.DISEASE_MODULE, Metadata.USES_HEALTHSYSTEM} + + def read_parameters(self, data_folder): + pass + + def initialise_population(self, population): + pass + + def initialise_simulation(self, sim): + pass + + class DummyModuleClinic1(Module): + METADATA = {Metadata.DISEASE_MODULE, Metadata.USES_HEALTHSYSTEM} + + def read_parameters(self, data_folder): + pass + + def initialise_population(self, population): + pass + + def initialise_simulation(self, sim): + pass + + log_config = { + "filename": "log", + "directory": tmpdir, + "custom_levels": {"tlo.methods.healthsystem": logging.DEBUG}, + } + start_date = Date(2010, 1, 1) + sim = Simulation(start_date=start_date, seed=0, log_config=log_config, resourcefilepath=resourcefilepath) + + sim.register( + demography.Demography(), + healthsystem.HealthSystem( + capabilities_coefficient=1.0, + mode_appt_constraints=1, + ignore_priority=False, + randomise_queue=True, + policy_name="", + use_funded_or_actual_staffing="funded_plus", + ), + DummyModuleGenericClinic(), + DummyModuleClinic1(), + ) + sim.make_initial_population(n=tot_population) + + sim.modules["HealthSystem"]._clinic_names = ["Clinic1", "GenericClinic"] + + + # Get any level 0 facility + district, fac_info = next(iter(sim.modules["HealthSystem"]._facilities_for_each_district['0'].items())) + fac_id = fac_info.id + col = "district_of_residence" + s = sim.population.props[col] + + ## Not specifying the dtype explicitly here made the col a string rather than a category + ## and that caused problems later on. + sim.population.props[col] = pd.Series(district, index=s.index, dtype=s.dtype) + + + ## Even though we don't use the split specified here in this test, we do need + ## to include Clinic1 as a column in the _clinic_configuration object, and + ## explicitly call setup_daily_capabilities as this function populates several + ## key objects. + sim.modules["HealthSystem"]._clinic_configuration = pd.DataFrame( + [{"Facility_ID": fac_id, "Officer_Type_Code": "DCSA", "Clinic1": 0, "GenericClinic": 1.0}] + ) + sim.modules["HealthSystem"]._clinic_mapping = pd.DataFrame( + [{"Treatment": "DummyHSIEvent", "Clinic": "Clinic1"}] + ) + sim.modules["HealthSystem"].setup_daily_capabilities("funded_plus") + + + sim.simulate(end_date=sim.start_date + pd.DateOffset(years=1)) + + return sim + + def schedule_hsi_events(ngenericclinic, nclinic1, sim): + for i in range(0, ngenericclinic): + hsi = DummyHSIEvent( + module=sim.modules["DummyModuleGenericClinic"], + person_id=i, + appt_type="ConWithDCSA", + level="0", + treatment_id="DummyHSIEventGenericClinic", + ) + sim.modules["HealthSystem"].schedule_hsi_event( + hsi, topen=sim.date, tclose=sim.date + pd.DateOffset(days=1), priority=1 + ) + + for i in range(ngenericclinic, ngenericclinic + nclinic1): + hsi = DummyHSIEvent( + module=sim.modules["DummyModuleClinic1"], + person_id=i, + appt_type="ConWithDCSA", + level="0", + treatment_id="DummyHSIEvent", + ) + sim.modules["HealthSystem"].schedule_hsi_event( + hsi, topen=sim.date, tclose=sim.date + pd.DateOffset(days=1), priority=1 + ) + + return sim + + tot_population = 100 + sim = create_simulation(tmpdir, tot_population) + + # Schedule an identical appointment for all individuals, assigning clinic as follows: + # 10 HSIs have clinic_eligibility=GenericClinic and 90 clinic_eligibility=Clinic1 + nevents_generic_clinic = 10 + nevents_clinic1 = 90 + sim = schedule_hsi_events(nevents_generic_clinic, nevents_clinic1, sim) + + ## This hsi is only created to get the expected officer type and minutes needed + ## for an appointment of type ConWithDCSA + ## therefore the treatment_id is not important + ## ConWithDCSA needs 10 mins of officer type DCSA at level 0 + hsi1 = DummyHSIEvent( + module=sim.modules["DummyModuleGenericClinic"], + person_id=0, # Ensures call is on officers in first district + appt_type="ConWithDCSA", + level="0", + treatment_id="DummyHSIEventGenericClinic", + ) + hsi1.initialise() + + # Now adjust capabilities available. + # GenericClinic has exactly the capability than needed to run all the appointments; + # Clinic1 has less capability than needed to run all the appointments; + # This will ensure rescaling factor for GenericClinic is 1 (it cannot be less than 1) + # and that for Clinic1 > 1 + + ## We will use this key to query capabilities later + fac_id_off_type = next(iter(hsi1.expected_time_requests)) + + sim.modules["HealthSystem"]._daily_capabilities["Clinic1"] = {} + for k, v in hsi1.expected_time_requests.items(): + sim.modules["HealthSystem"]._daily_capabilities["GenericClinic"][k] = v * nevents_generic_clinic + sim.modules["HealthSystem"]._daily_capabilities["Clinic1"][k] = v * (nevents_clinic1 / 2) + + # Run healthsystemscheduler + sim.modules["HealthSystem"].healthsystemscheduler.apply(sim.population) + + # Record capabilities before rescaling + genericclinic_capabilities_before = sim.modules["HealthSystem"]._daily_capabilities["GenericClinic"][fac_id_off_type] + clinic1_capabilities_before = sim.modules["HealthSystem"]._daily_capabilities["Clinic1"][fac_id_off_type] + + # Now trigger rescaling of capabilities + sim.modules["HealthSystem"]._rescale_capabilities_to_capture_effective_capability() + + # Record capabilities after rescaling + genericclinic_capabilities_after = sim.modules["HealthSystem"]._daily_capabilities["GenericClinic"][fac_id_off_type] + clinic1_capabilities_after = sim.modules["HealthSystem"]._daily_capabilities["Clinic1"][fac_id_off_type] + + # Expect no change in GenericClinic capabilities and Clinic1 capabilities to be rescaled by 2 + assert np.isclose( + genericclinic_capabilities_before, + genericclinic_capabilities_after, + ), "Expected no change in GenericClinic capabilities after rescaling" + + assert np.isclose( + clinic1_capabilities_before * 2, + clinic1_capabilities_after, + ), "Expected Clinic1 capabilities to be rescaled by factor of 2" + + breakpoint()