diff --git a/src/tlo/methods/healthsystem.py b/src/tlo/methods/healthsystem.py index 0c81fe4026..11cea67b93 100644 --- a/src/tlo/methods/healthsystem.py +++ b/src/tlo/methods/healthsystem.py @@ -1,7 +1,6 @@ import datetime import heapq as hp import itertools -import math import re import warnings from collections import Counter, defaultdict @@ -362,7 +361,6 @@ def __init__( use_funded_or_actual_staffing: Optional[str] = None, disable: bool = False, disable_and_reject_all: bool = False, - compute_squeeze_factor_to_district_level: bool = True, hsi_event_count_log_period: Optional[str] = "month", ): """ @@ -396,8 +394,6 @@ def __init__( logging) and every HSI event runs. :param disable_and_reject_all: If ``True``, disable health system and no HSI events run - :param compute_squeeze_factor_to_district_level: Whether to compute squeeze_factors to the district level, or - the national level (which effectively pools the resources across all districts). :param hsi_event_count_log_period: Period over which to accumulate counts of HSI events that have run before logging and reseting counters. Should be on of strings ``'day'``, ``'month'``, ``'year'``. ``'simulation'`` to log at the @@ -480,12 +476,6 @@ def __init__( assert equip_availability in (None, 'default', 'all', 'none') self.arg_equip_availability = equip_availability - # `compute_squeeze_factor_to_district_level` is a Boolean indicating whether the computation of squeeze_factors - # should be specific to each district (when `True`), or if the computation of squeeze_factors should be on the - # basis that resources from all districts can be effectively "pooled" (when `False). - assert isinstance(compute_squeeze_factor_to_district_level, bool) - self.compute_squeeze_factor_to_district_level = compute_squeeze_factor_to_district_level - # Create the Diagnostic Test Manager to store and manage all Diagnostic Test self.dx_manager = DxManager(self) @@ -504,10 +494,6 @@ def __init__( # Create counter for the running total of footprint of all the HSIs being run today self.running_total_footprint: Counter = Counter() - # A reusable store for holding squeeze factors in get_squeeze_factors() - self._get_squeeze_factors_store_grow = 500 - self._get_squeeze_factors_store = np.zeros(self._get_squeeze_factors_store_grow) - self._hsi_event_count_log_period = hsi_event_count_log_period if hsi_event_count_log_period in {"day", "month", "year", "simulation"}: # Counters for binning HSI events run (by unique integer keys) over @@ -1606,125 +1592,7 @@ def get_appt_footprint_as_time_request(self, facility_info: FacilityInfo, appt_f return appt_footprint_times - def get_squeeze_factors(self, footprints_per_event, total_footprint, current_capabilities, - compute_squeeze_factor_to_district_level: bool - ): - """ - This will compute the squeeze factors for each HSI event from the list of all - the calls on health system resources for the day. - The squeeze factor is defined as (call/available - 1). ie. the highest - fractional over-demand among any type of officer that is called-for in the - appt_footprint of an HSI event. - A value of 0.0 signifies that there is no squeezing (sufficient resources for - the EXPECTED_APPT_FOOTPRINT). - - :param footprints_per_event: List, one entry per HSI event, containing the - minutes required from each health officer in each health facility as a - Counter (using the standard index) - :param total_footprint: Counter, containing the total minutes required from - each health officer in each health facility when non-zero, (using the - standard index) - :param current_capabilities: Series giving the amount of time available for - each health officer in each health facility (using the standard index) - :param compute_squeeze_factor_to_district_level: Boolean indicating whether - the computation of squeeze_factors should be specific to each district - (when `True`), or if the computation of squeeze_factors should be on - the basis that resources from all districts can be effectively "pooled" - (when `False). - - :return: squeeze_factors: an array of the squeeze factors for each HSI event - (position in array matches that in the all_call_today list). - """ - - def get_total_minutes_of_this_officer_in_this_district(_officer): - """Returns the minutes of current capabilities for the officer identified (this officer type in this - facility_id).""" - return current_capabilities.get(_officer) - - def get_total_minutes_of_this_officer_in_all_district(_officer): - """Returns the minutes of current capabilities for the officer identified in all districts (this officer - type in this all facilities of the same level in all districts).""" - - def split_officer_compound_string(cs) -> Tuple[int, str]: - """Returns (facility_id, officer_type) for the officer identified in the string of the form: - 'FacilityID_{facility_id}_Officer_{officer_type}'.""" - _, _facility_id, _, _officer_type = cs.split('_', 3) # (NB. Some 'officer_type' include "_") - return int(_facility_id), _officer_type - - def _match(_this_officer, facility_ids: List[int], officer_type: str): - """Returns True if the officer identified is of the identified officer_type and is in one of the - facility_ids.""" - this_facility_id, this_officer_type = split_officer_compound_string(_this_officer) - return (this_officer_type == officer_type) and (this_facility_id in facility_ids) - - facility_id, officer_type = split_officer_compound_string(_officer) - facility_level = self._facility_by_facility_id[int(facility_id)].level - facilities_of_same_level_in_all_district = [ - _fac.id for _fac in self._facilities_for_each_district[facility_level].values() - ] - - officers_in_the_same_level_in_all_districts = [ - _officer for _officer in current_capabilities.keys() if - _match(_officer, facility_ids=facilities_of_same_level_in_all_district, officer_type=officer_type) - ] - - return sum(current_capabilities.get(_o) for _o in officers_in_the_same_level_in_all_districts) - - # 1) Compute the load factors for each officer type at each facility that is - # called-upon in this list of HSIs - load_factor = {} - for officer, call in total_footprint.items(): - if compute_squeeze_factor_to_district_level: - availability = get_total_minutes_of_this_officer_in_this_district(officer) - else: - availability = get_total_minutes_of_this_officer_in_all_district(officer) - - # If officer does not exist in the relevant facility, log warning and proceed as if availability = 0 - if availability is None: - logger.warning( - key="message", - data=(f"Requested officer {officer} is not contemplated by health system. ") - ) - availability = 0 - - if availability == 0: - load_factor[officer] = float('inf') - else: - load_factor[officer] = max(call / availability - 1, 0.0) - - # 2) Convert these load-factors into an overall 'squeeze' signal for each HSI, - # based on the load-factor of the officer with the largest time requirement for that - # event (or zero if event has an empty footprint) - - # Instead of repeatedly creating lists for squeeze factors, we reuse a numpy array - # If the current store is too small, replace it - if len(footprints_per_event) > len(self._get_squeeze_factors_store): - # The new array size is a multiple of `grow` - new_size = math.ceil( - len(footprints_per_event) / self._get_squeeze_factors_store_grow - ) * self._get_squeeze_factors_store_grow - self._get_squeeze_factors_store = np.zeros(new_size) - - for i, footprint in enumerate(footprints_per_event): - if footprint: - # If any of the required officers are not available at the facility, set overall squeeze to inf - require_missing_officer = False - for officer in footprint: - if load_factor[officer] == float('inf'): - require_missing_officer = True - # No need to check the rest - break - - if require_missing_officer: - self._get_squeeze_factors_store[i] = np.inf - else: - self._get_squeeze_factors_store[i] = max(load_factor[footprint.most_common()[0][0]], 0.) - else: - self._get_squeeze_factors_store[i] = 0.0 - - return self._get_squeeze_factors_store - - def record_hsi_event(self, hsi_event, actual_appt_footprint=None, squeeze_factor=None, did_run=True, priority=None): + def record_hsi_event(self, hsi_event, actual_appt_footprint=None, squeeze_factor=0.0, did_run=True, priority=None): """ Record the processing of an HSI event. It will also record the actual appointment footprint. @@ -2043,115 +1911,57 @@ def run_individual_level_events_in_mode_0_or_1(self, # from argument to Counter object called from self.running_total_footprint.update(footprint) - # Estimate Squeeze-Factors for today - if self.mode_appt_constraints == 0: - # For Mode 0 (no Constraints), the squeeze factors are all zero. - squeeze_factor_per_hsi_event = np.zeros( - len(footprints_of_all_individual_level_hsi_event)) - else: - # For Other Modes, the squeeze factors must be computed - squeeze_factor_per_hsi_event = self.get_squeeze_factors( - footprints_per_event=footprints_of_all_individual_level_hsi_event, - total_footprint=self.running_total_footprint, - current_capabilities=self.capabilities_today, - compute_squeeze_factor_to_district_level=self.compute_squeeze_factor_to_district_level, - ) - for ev_num, event in enumerate(_list_of_individual_hsi_event_tuples): _priority = event.priority event = event.hsi_event - squeeze_factor = squeeze_factor_per_hsi_event[ev_num] # todo use zip here! # store appt_footprint before running _appt_footprint_before_running = event.EXPECTED_APPT_FOOTPRINT - # Mode 0: All HSI Event run, with no squeeze - # Mode 1: All HSI Events run with squeeze provided latter is not inf - ok_to_run = True - - if self.mode_appt_constraints == 1 and squeeze_factor == float('inf'): - ok_to_run = False - - if ok_to_run: - - # Compute the bed days that are allocated to this HSI and provide this information to the HSI - if sum(event.BEDDAYS_FOOTPRINT.values()): - event._received_info_about_bed_days = \ - self.bed_days.issue_bed_days_according_to_availability( - facility_id=self.bed_days.get_facility_id_for_beds(persons_id=event.target), - footprint=event.BEDDAYS_FOOTPRINT - ) - - # Check that a facility has been assigned to this HSI - assert event.facility_info is not None, \ - f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined." + # Compute the bed days that are allocated to this HSI and provide this information to the HSI + if sum(event.BEDDAYS_FOOTPRINT.values()): + event._received_info_about_bed_days = \ + self.bed_days.issue_bed_days_according_to_availability( + facility_id=self.bed_days.get_facility_id_for_beds(persons_id=event.target), + footprint=event.BEDDAYS_FOOTPRINT + ) - # Run the HSI event (allowing it to return an updated appt_footprint) - actual_appt_footprint = event.run(squeeze_factor=squeeze_factor) + # Check that a facility has been assigned to this HSI + assert event.facility_info is not None, \ + f"Cannot run HSI {event.TREATMENT_ID} without facility_info being defined." - # Check if the HSI event returned updated appt_footprint - if actual_appt_footprint is not None: - # The returned footprint is different to the expected footprint: so must update load factors + # Run the HSI event (allowing it to return an updated appt_footprint) + actual_appt_footprint = event.run(squeeze_factor=0.0) - # check its formatting: - assert self.appt_footprint_is_valid(actual_appt_footprint) + # Check if the HSI event returned updated appt_footprint + if actual_appt_footprint is not None: + # The returned footprint is different to the expected footprint: so must update load factors - # Update load factors: - updated_call = self.get_appt_footprint_as_time_request( - facility_info=event.facility_info, - appt_footprint=actual_appt_footprint - ) - original_call = footprints_of_all_individual_level_hsi_event[ev_num] - footprints_of_all_individual_level_hsi_event[ev_num] = updated_call - self.running_total_footprint -= original_call - self.running_total_footprint += updated_call - - # Don't recompute for mode=0 - if self.mode_appt_constraints != 0: - squeeze_factor_per_hsi_event = self.get_squeeze_factors( - footprints_per_event=footprints_of_all_individual_level_hsi_event, - total_footprint=self.running_total_footprint, - current_capabilities=self.capabilities_today, - compute_squeeze_factor_to_district_level=self. - compute_squeeze_factor_to_district_level, - ) + # check its formatting: + assert self.appt_footprint_is_valid(actual_appt_footprint) - else: - # no actual footprint is returned so take the expected initial declaration as the actual, - # as recorded before the HSI event run - actual_appt_footprint = _appt_footprint_before_running - - # Write to the log - self.record_hsi_event( - hsi_event=event, - actual_appt_footprint=actual_appt_footprint, - squeeze_factor=squeeze_factor, - did_run=True, - priority=_priority + # Update load factors: + updated_call = self.get_appt_footprint_as_time_request( + facility_info=event.facility_info, + appt_footprint=actual_appt_footprint ) + original_call = footprints_of_all_individual_level_hsi_event[ev_num] + footprints_of_all_individual_level_hsi_event[ev_num] = updated_call + self.running_total_footprint -= original_call + self.running_total_footprint += updated_call - # if not ok_to_run else: - # Do not run, - # Call did_not_run for the hsi_event - rtn_from_did_not_run = event.did_not_run() - - # If received no response from the call to did_not_run, or a True signal, then - # add to the hold-over queue. - # Otherwise (disease module returns "FALSE") the event is not rescheduled and will not run. - - if rtn_from_did_not_run is not False: - # reschedule event - hp.heappush(_to_be_held_over, _list_of_individual_hsi_event_tuples[ev_num]) - - # Log that the event did not run - self.record_hsi_event( - hsi_event=event, - actual_appt_footprint=event.EXPECTED_APPT_FOOTPRINT, - squeeze_factor=squeeze_factor, - did_run=False, - priority=_priority - ) + # no actual footprint is returned so take the expected initial declaration as the actual, + # as recorded before the HSI event run + actual_appt_footprint = _appt_footprint_before_running + + # Write to the log + self.record_hsi_event( + hsi_event=event, + actual_appt_footprint=actual_appt_footprint, + did_run=True, + priority=_priority + ) return _to_be_held_over @@ -2277,7 +2087,11 @@ def _get_events_due_today(self) -> List: return due_today - def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> None: + def process_events_mode_0_and_1(self) -> None: + # Run all events due today, repeating the check for due events until none are due + # (this allows for HSI that are added to the queue in the course of other HSI + # for this today to be run this day). + while True: # Get the events that are due today: list_of_individual_hsi_event_tuples_due_today = self._get_events_due_today() @@ -2296,10 +2110,9 @@ def process_events_mode_0_and_1(self, hold_over: List[HSIEventQueueItem]) -> Non list_of_individual_hsi_event_tuples_due_today_that_have_essential_equipment.append(item) # Try to run the list of individual-level events that have their essential equipment - _to_be_held_over = self.module.run_individual_level_events_in_mode_0_or_1( + self.module.run_individual_level_events_in_mode_0_or_1( list_of_individual_hsi_event_tuples_due_today_that_have_essential_equipment, ) - hold_over.extend(_to_be_held_over) def process_events_mode_2(self, hold_over: List[HSIEventQueueItem]) -> None: @@ -2604,10 +2417,7 @@ def apply(self, population): hold_over = list() if self.module.mode_appt_constraints in (0, 1): - # Run all events due today, repeating the check for due events until none are due - # (this allows for HSI that are added to the queue in the course of other HSI - # for this today to be run this day). - self.process_events_mode_0_and_1(hold_over) + self.process_events_mode_0_and_1() elif self.module.mode_appt_constraints == 2: self.process_events_mode_2(hold_over)