diff --git a/src/virtualship/cli/_run.py b/src/virtualship/cli/_run.py index f07fbab27..269d77e15 100644 --- a/src/virtualship/cli/_run.py +++ b/src/virtualship/cli/_run.py @@ -1,5 +1,6 @@ """do_expedition function.""" +import glob import logging import os import shutil @@ -14,11 +15,12 @@ ScheduleProblem, simulate_schedule, ) -from virtualship.models import Schedule -from virtualship.models.checkpoint import Checkpoint +from virtualship.make_realistic.problems.simulator import ProblemSimulator +from virtualship.models import Checkpoint, Schedule from virtualship.utils import ( CHECKPOINT, _get_expedition, + _save_checkpoint, expedition_cost, get_instrument_class, ) @@ -35,7 +37,10 @@ logging.getLogger("copernicusmarine").setLevel("ERROR") -def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: +# TODO: prob-level needs to be parsed from CLI args; currently set to 1 override for testing purposes +def _run( + expedition_dir: str | Path, from_data: Path | None = None, prob_level: int = 1 +) -> None: """ Perform an expedition, providing terminal feedback and file output. @@ -73,7 +78,7 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: expedition = _get_expedition(expedition_dir) - # Verify instruments_config file is consistent with schedule + # verify instruments_config file is consistent with schedule expedition.instruments_config.verify(expedition) # load last checkpoint @@ -82,7 +87,7 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: checkpoint = Checkpoint(past_schedule=Schedule(waypoints=[])) # verify that schedule and checkpoint match - checkpoint.verify(expedition.schedule) + checkpoint.verify(expedition.schedule, expedition_dir) print("\n---- WAYPOINT VERIFICATION ----") @@ -96,6 +101,8 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: projection=projection, expedition=expedition, ) + + # handle cases where user defined schedule is incompatible (i.e. not enough time between waypoints, not problems) if isinstance(schedule_results, ScheduleProblem): print( f"SIMULATION PAUSED: update your schedule (`virtualship plan`) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {expedition_dir.joinpath(CHECKPOINT)}." @@ -124,12 +131,29 @@ def _run(expedition_dir: str | Path, from_data: Path | None = None) -> None: print("\n--- MEASUREMENT SIMULATIONS ---") + # identify problems + # TODO: prob_level needs to be parsed from CLI args + problem_simulator = ProblemSimulator( + expedition.schedule, prob_level, expedition_dir + ) + problems = problem_simulator.select_problems() + # simulate measurements print("\nSimulating measurements. This may take a while...\n") + # TODO: logic for getting simulations to carry on from last checkpoint! Building on .zarr files already created... + instruments_in_expedition = expedition.get_instruments() - for itype in instruments_in_expedition: + for i, itype in enumerate(instruments_in_expedition): + # propagate problems (pre-departure problems will only occur in first iteration) + if problems: + problem_simulator.execute( + problems=problems, + pre_departure=True if i == 0 else False, + instrument_type=itype, + ) + # get instrument class instrument_class = get_instrument_class(itype) if instrument_class is None: @@ -174,9 +198,13 @@ def _load_checkpoint(expedition_dir: Path) -> Checkpoint | None: return None -def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: - file_path = expedition_dir.joinpath(CHECKPOINT) - checkpoint.to_yaml(file_path) +def _load_hashes(expedition_dir: Path) -> set[str]: + hashes_path = expedition_dir.joinpath("problems_encountered") + if not hashes_path.exists(): + return set() + hash_files = glob.glob(str(hashes_path / "problem_*.txt")) + hashes = {Path(f).stem.split("_")[1] for f in hash_files} + return hashes def _write_expedition_cost(expedition, schedule_results, expedition_dir): diff --git a/src/virtualship/expedition/simulate_schedule.py b/src/virtualship/expedition/simulate_schedule.py index e450fcc7c..c09ae7b56 100644 --- a/src/virtualship/expedition/simulate_schedule.py +++ b/src/virtualship/expedition/simulate_schedule.py @@ -4,7 +4,7 @@ from dataclasses import dataclass, field from datetime import datetime, timedelta -from typing import ClassVar +from typing import TYPE_CHECKING, ClassVar import pyproj @@ -21,6 +21,9 @@ Waypoint, ) +if TYPE_CHECKING: + pass + @dataclass class ScheduleOk: @@ -124,7 +127,8 @@ def simulate(self) -> ScheduleOk | ScheduleProblem: print( f"Waypoint {wp_i + 1} could not be reached in time. Current time: {self._time}. Waypoint time: {waypoint.time}." "\n\nHave you ensured that your schedule includes sufficient time for taking measurements, e.g. CTD casts (in addition to the time it takes to sail between waypoints)?\n" - "**Note**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "**Hint #1**, the `virtualship plan` tool will not account for measurement times when verifying the schedule, only the time it takes to sail between waypoints.\n" + "**Hint #2**: if you previously encountered any unforeseen delays (e.g. equipment failure, pre-departure delays) during your expedition, you will need to adjust the timings of **all** waypoints after the affected waypoint, not just the next one." ) return ScheduleProblem(self._time, wp_i) else: diff --git a/src/virtualship/make_realistic/problems/scenarios.py b/src/virtualship/make_realistic/problems/scenarios.py new file mode 100644 index 000000000..97696a21e --- /dev/null +++ b/src/virtualship/make_realistic/problems/scenarios.py @@ -0,0 +1,340 @@ +from __future__ import annotations + +import abc +from dataclasses import dataclass +from datetime import timedelta +from typing import TYPE_CHECKING + +from virtualship.instruments.types import InstrumentType + +if TYPE_CHECKING: + from virtualship.models import Waypoint + + +# ===================================================== +# SECTION: Base Classes +# ===================================================== + + +@dataclass +class GeneralProblem(abc.ABC): + """ + Base class for general problems. + + Problems occur at each waypoint. + """ + + message: str + can_reoccur: bool + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem occurs before expedition departure, False if during expedition + + @abc.abstractmethod + def is_valid() -> bool: + """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + ... + + +@dataclass +class InstrumentProblem(abc.ABC): + """Base class for instrument-specific problems.""" + + instrument_dataclass: type + message: str + can_reoccur: bool + base_probability: float # Probability is a function of time - the longer the expedition the more likely something is to go wrong (not a function of waypoints) + delay_duration: timedelta + pre_departure: bool # True if problem can occur before expedition departure, False if during expedition + + @abc.abstractmethod + def is_valid() -> bool: + """Check if the problem can occur based on e.g. waypoint location and/or datetime etc.""" + ... + + +# ===================================================== +# SECTION: General Problems +# ===================================================== + + +@dataclass +# @register_general_problem +class FoodDeliveryDelayed: + """Problem: Scheduled food delivery is delayed, causing a postponement of departure.""" + + message = ( + "The scheduled food delivery prior to departure has not arrived. Until the supply truck reaches the pier, " + "we cannot leave. Once it arrives, unloading and stowing the provisions in the ship’s cold storage " + "will also take additional time. These combined delays postpone departure by approximately 5 hours." + ) + can_reoccur = False + delay_duration = timedelta(hours=5.0) + base_probability = 0.1 + pre_departure = True + + +@dataclass +# @register_general_problem +class VenomousCentipedeOnboard(GeneralProblem): + """Problem: Venomous centipede discovered onboard in tropical waters.""" + + # TODO: this needs logic added to the is_valid() method to check if waypoint is in tropical waters + + message = ( + "A venomous centipede is discovered onboard while operating in tropical waters. " + "One crew member becomes ill after contact with the creature and receives medical attention, " + "prompting a full search of the vessel to ensure no further danger. " + "The medical response and search efforts cause an operational delay of about 2 hours." + ) + can_reoccur = False + delay_duration = timedelta(hours=2.0) + base_probability = 0.05 + pre_departure = False + + def is_valid(self, waypoint: Waypoint) -> bool: + """Check if the waypoint is in tropical waters.""" + lat_limit = 23.5 # [degrees] + return abs(waypoint.latitude) <= lat_limit + + +# @register_general_problem +class CaptainSafetyDrill(GeneralProblem): + """Problem: Sudden initiation of a mandatory safety drill.""" + + message = ( + "A miscommunication with the ship’s captain results in the sudden initiation of a mandatory safety drill. " + "The emergency vessel must be lowered and tested while the ship remains stationary, pausing all scientific " + "operations for the duration of the exercise. The drill introduces a delay of approximately 2 hours." + ) + can_reoccur: False + delay_duration: 2.0 + base_probability = 0.1 + pre_departure = False + + +@dataclass +class FuelDeliveryIssue: + message = ( + "The fuel tanker expected to deliver fuel has not arrived. Port authorities are unable to provide " + "a clear estimate for when the delivery might occur. You may choose to [w]ait for the tanker or [g]et a " + "harbor pilot to guide the vessel to an available bunker dock instead. This decision may need to be " + "revisited periodically depending on circumstances." + ) + can_reoccur: bool = False + delay_duration: float = 0.0 # dynamic delays based on repeated choices + + +@dataclass +class EngineOverheating: + message = ( + "One of the main engines has overheated. To prevent further damage, the engineering team orders a reduction " + "in vessel speed until the engine can be inspected and repaired in port. The ship will now operate at a " + "reduced cruising speed of 8.5 knots for the remainder of the transit." + ) + can_reoccur: bool = False + delay_duration: None = None # speed reduction affects ETA instead of fixed delay + ship_speed_knots: float = 8.5 + + +# @register_general_problem +class MarineMammalInDeploymentArea(GeneralProblem): + """Problem: Marine mammals observed in deployment area, causing delay.""" + + message = ( + "A pod of dolphins is observed swimming directly beneath the planned deployment area. " + "To avoid risk to wildlife and comply with environmental protocols, all in-water operations " + "must pause until the animals move away from the vicinity. This results in a delay of about 30 minutes." + ) + can_reoccur: bool = True + delay_duration: float = 0.5 + base_probability: float = 0.1 + + +# @register_general_problem +class BallastPumpFailure(GeneralProblem): + """Problem: Ballast pump failure during ballasting operations.""" + + message = ( + "One of the ship’s ballast pumps suddenly stops responding during routine ballasting operations. " + "Without the pump, the vessel cannot safely adjust trim or compensate for equipment movements on deck. " + "Engineering isolates the faulty pump and performs a rapid inspection. Temporary repairs allow limited " + "functionality, but the interruption causes a delay of approximately 1 hour." + ) + can_reoccur: bool = True + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# @register_general_problem +class ThrusterConverterFault(GeneralProblem): + """Problem: Bow thruster's power converter fault during station-keeping.""" + + message = ( + "The bow thruster's power converter reports a fault during station-keeping operations. " + "Dynamic positioning becomes less stable, forcing a temporary suspension of high-precision sampling. " + "Engineers troubleshoot the converter and perform a reset, resulting in a delay of around 1 hour." + ) + can_reoccur: bool = False + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# @register_general_problem +class AFrameHydraulicLeak(GeneralProblem): + """Problem: Hydraulic fluid leak from A-frame actuator.""" + + message = ( + "A crew member notices hydraulic fluid leaking from the A-frame actuator during equipment checks. " + "The leak must be isolated immediately to prevent environmental contamination or mechanical failure. " + "Engineering replaces a faulty hose and repressurizes the system. This repair causes a delay of about 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + + +# @register_general_problem +class CoolingWaterIntakeBlocked(GeneralProblem): + """Problem: Main engine's cooling water intake blocked.""" + + message = ( + "The main engine's cooling water intake alarms indicate reduced flow, likely caused by marine debris " + "or biological fouling. The vessel must temporarily slow down while engineering clears the obstruction " + "and flushes the intake. This results in a delay of approximately 1 hour." + ) + can_reoccur: bool = True + delay_duration: float = 1.0 + base_probability: float = 0.1 + + +# ===================================================== +# SECTION: Instrument-specific Problems +# ===================================================== + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDCableJammed(InstrumentProblem): + """Problem: CTD cable jammed in winch drum, requiring replacement.""" + + message = ( + "During preparation for the next CTD cast, the CTD cable becomes jammed in the winch drum. " + "Attempts to free it are unsuccessful, and the crew determines that the entire cable must be " + "replaced before deployment can continue. This repair is time-consuming and results in a delay " + "of approximately 3 hours." + ) + can_reoccur = True + delay_duration = timedelta(hours=3.0) + base_probability = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.ADCP) +class ADCPMalfunction(InstrumentProblem): + """Problem: ADCP returns invalid data, requiring inspection.""" + + message = ( + "The hull-mounted ADCP begins returning invalid velocity data. Engineering suspects damage to the cable " + "from recent maintenance activities. The ship must hold position while a technician enters the cable " + "compartment to perform an inspection and continuity test. This diagnostic procedure results in a delay " + "of around 1 hour." + ) + can_reoccur = True + delay_duration = timedelta(hours=1.0) + base_probability = 0.1 + instrument_type = InstrumentType.ADCP + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDTemperatureSensorFailure(InstrumentProblem): + """Problem: CTD temperature sensor failure, requiring replacement.""" + + message = ( + "The primary temperature sensor on the CTD begins returning inconsistent readings. " + "Troubleshooting confirms that the sensor has malfunctioned. A spare unit can be installed, " + "but integrating and verifying the replacement will pause operations. " + "This procedure leads to an estimated delay of around 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class CTDSalinitySensorFailureWithCalibration(InstrumentProblem): + """Problem: CTD salinity sensor failure, requiring replacement and calibration.""" + + message = ( + "The CTD’s primary salinity sensor fails and must be replaced with a backup. After installation, " + "a mandatory calibration cast to a minimum depth of 1000 meters is required to verify sensor accuracy. " + "Both the replacement and calibration activities result in a total delay of roughly 4 hours." + ) + can_reoccur: bool = True + delay_duration: float = 4.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class WinchHydraulicPressureDrop(InstrumentProblem): + """Problem: CTD winch hydraulic pressure drop, requiring repair.""" + + message = ( + "The CTD winch begins to lose hydraulic pressure during routine checks prior to deployment. " + "The engineering crew must stop operations to diagnose the hydraulic pump and replenish or repair " + "the system. Until pressure is restored to operational levels, the winch cannot safely be used. " + "This results in an estimated delay of 1.5 hours." + ) + can_reoccur: bool = True + delay_duration: float = 1.5 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.CTD) +class RosetteTriggerFailure(InstrumentProblem): + """Problem: CTD rosette trigger failure, requiring inspection.""" + + message = ( + "During a CTD cast, the rosette's bottle-triggering mechanism fails to actuate. " + "No discrete water samples can be collected during this cast. The rosette must be brought back " + "on deck for inspection and manual testing of the trigger system. This results in an operational " + "delay of approximately 2.5 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.5 + base_probability: float = 0.1 + instrument_type = InstrumentType.CTD + + +# @register_instrument_problem(InstrumentType.DRIFTER) +class DrifterSatelliteConnectionDelay(InstrumentProblem): + """Problem: Drifter fails to establish satellite connection before deployment.""" + + message = ( + "The drifter scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.DRIFTER + + +# @register_instrument_problem(InstrumentType.ARGO_FLOAT) +class ArgoSatelliteConnectionDelay(InstrumentProblem): + """Problem: Argo float fails to establish satellite connection before deployment.""" + + message = ( + "The Argo float scheduled for deployment fails to establish a satellite connection during " + "pre-launch checks. To improve signal acquisition, the float must be moved to a higher location on deck " + "with fewer obstructions. The team waits for the satellite fix to come through, resulting in a delay " + "of approximately 2 hours." + ) + can_reoccur: bool = True + delay_duration: float = 2.0 + base_probability: float = 0.1 + instrument_type = InstrumentType.ARGO_FLOAT diff --git a/src/virtualship/make_realistic/problems/simulator.py b/src/virtualship/make_realistic/problems/simulator.py new file mode 100644 index 000000000..c10e9d909 --- /dev/null +++ b/src/virtualship/make_realistic/problems/simulator.py @@ -0,0 +1,255 @@ +from __future__ import annotations + +import hashlib +import os +import time +from pathlib import Path +from typing import TYPE_CHECKING + +import numpy as np +from yaspin import yaspin + +from virtualship.instruments.types import InstrumentType +from virtualship.make_realistic.problems.scenarios import ( + CTDCableJammed, + FoodDeliveryDelayed, +) +from virtualship.models.checkpoint import Checkpoint +from virtualship.utils import CHECKPOINT, _save_checkpoint + +if TYPE_CHECKING: + from virtualship.make_realistic.problems.scenarios import ( + GeneralProblem, + InstrumentProblem, + ) + from virtualship.models import Schedule +import json + +LOG_MESSAGING = { + "first_pre_departure": "Hang on! There could be a pre-departure problem in-port...", + "subsequent_pre_departure": "Oh no, another pre-departure problem has occurred...!\n", + "first_during_expedition": "Oh no, a problem has occurred during at waypoint {waypoint_i}...!\n", + "subsequent_during_expedition": "Another problem has occurred during the expedition... at waypoint {waypoint_i}!\n", + "simulation_paused": "SIMULATION PAUSED: update your schedule (`virtualship plan`) and continue the expedition by executing the `virtualship run` command again.\nCheckpoint has been saved to {checkpoint_path}.\n", + "problem_avoided": "Phew! You had enough contingency time scheduled to avoid delays from this problem. The expedition can carry on.\n", + "pre_departure_delay": "This problem will cause a delay of **{delay_duration} hours** to the expedition schedule. \n\nPlease account for this for **ALL** waypoints in your schedule (`virtualship plan`), then continue the expedition by executing the `virtualship run` command again.\n", +} + + +class ProblemSimulator: + """Handle problem simulation during expedition.""" + + def __init__(self, schedule: Schedule, prob_level: int, expedition_dir: str | Path): + """Initialise ProblemSimulator with a schedule and probability level.""" + self.schedule = schedule + self.prob_level = prob_level + self.expedition_dir = Path(expedition_dir) + + def select_problems( + self, + ) -> dict[str, list[GeneralProblem | InstrumentProblem]] | None: + """Propagate both general and instrument problems.""" + # TODO: whether a problem can reoccur or not needs to be handled here too! + probability = self._calc_prob() + probability = 1.0 # TODO: temporary override for testing!! + if probability > 0.0: + problems = {} + problems["general"] = self._general_problem_select(probability) + problems["instrument"] = self._instrument_problem_select(probability) + return problems + else: + return None + + def execute( + self, + problems: dict[str, list[GeneralProblem | InstrumentProblem]], + pre_departure: bool, + instrument_type: InstrumentType | None = None, + log_delay: float = 7.0, + ): + """Execute the selected problems, returning messaging and delay times.""" + # TODO: integration with which zarr files have been written so far? + # TODO: logic to determine whether user has made the necessary changes to the schedule to account for the problem's delay_duration when next running the simulation... (does this come in here or _run?) + # TODO: logic for whether the user has already scheduled in enough contingency time to account for the problem's delay_duration, and they get a well done message if so + # TODO: need logic for if the problem can reoccur or not / and or that it has already occurred and has been addressed + + #! TODO: logic as well for case where problem can reoccur but it can only reoccur at a waypoint different to the one it has already occurred at + + # TODO: make the log output stand out more visually + + # general problems + for i, gproblem in enumerate(problems["general"]): + # determine failed waypoint index (random if during expedition) + failed_waypoint_i = ( + np.nan + if pre_departure + else np.random.randint(0, len(self.schedule.waypoints) - 1) + ) + + # TODO: some kind of handling for deleting directory if ... simulation encounters error? or just leave it to user to delete manually if they want to restart from scratch? + # mark problem by unique hash and log to json, use to assess whether problem has already occurred + gproblem_hash = self._make_hash( + gproblem.message + str(failed_waypoint_i), 8 + ) + hash_path = Path( + self.expedition_dir + / f"problems_encountered/problem_{gproblem_hash}.json" + ) + if hash_path.exists(): + continue # problem * waypoint combination has already occurred + else: + self._hash_to_json( + gproblem, gproblem_hash, failed_waypoint_i, hash_path + ) + + if pre_departure and gproblem.pre_departure: + alert_msg = ( + LOG_MESSAGING["first_pre_departure"] + if i == 0 + else LOG_MESSAGING["subsequent_pre_departure"] + ) + + elif not pre_departure and not gproblem.pre_departure: + alert_msg = ( + LOG_MESSAGING["first_during_expedition"].format( + waypoint_i=gproblem.waypoint_i + ) + if i == 0 + else LOG_MESSAGING["subsequent_during_expedition"].format( + waypoint_i=gproblem.waypoint_i + ) + ) + + else: + continue # problem does not occur (e.g. wrong combination of pre-departure vs. problem can only occur during expedition) + + # log problem occurrence, save to checkpoint, and pause simulation + self._log_problem(gproblem, failed_waypoint_i, alert_msg, log_delay) + + # instrument problems + for i, problem in enumerate(problems["instrument"]): + pass # TODO: implement!! + # TODO: similar logic to above for instrument-specific problems... or combine? + + def _propagate_general_problems(self): + """Propagate general problems based on probability.""" + probability = self._calc_general_prob(self.schedule, prob_level=self.prob_level) + return self._general_problem_select(probability) + + def _propagate_instrument_problems(self): + """Propagate instrument problems based on probability.""" + probability = self._calc_instrument_prob( + self.schedule, prob_level=self.prob_level + ) + return self._instrument_problem_select(probability) + + def _calc_prob(self) -> float: + """ + Calcuates probability of a general problem as function of expedition duration and prob-level. + + TODO: for now, general and instrument-specific problems have the same probability of occurence. Separating this out and allowing their probabilities to be set independently may be useful in future. + """ + if self.prob_level == 0: + return 0.0 + + def _general_problem_select(self, probability) -> list[GeneralProblem]: + """Select which problems. Higher probability (tied to expedition duration) means more problems are likely to occur.""" + return [FoodDeliveryDelayed] # TODO: temporary placeholder!! + + def _instrument_problem_select(self, probability) -> list[InstrumentProblem]: + """Select which problems. Higher probability (tied to expedition duration) means more problems are likely to occur.""" + # set: waypoint instruments vs. list of instrument-specific problems (automated registry) + # will deterimne which instrument-specific problems are possible at this waypoint + + # wp_instruments = self.schedule.waypoints.instruments + + return [CTDCableJammed] + + def _log_problem( + self, + problem: GeneralProblem | InstrumentProblem, + failed_waypoint_i: int | float, + alert_msg: str, + log_delay: float, + ): + """Log problem occurrence with spinner and delay, save to checkpoint, write hash.""" + time.sleep(3.0) # brief pause before spinner + with yaspin(text=alert_msg) as spinner: + time.sleep(log_delay) + spinner.ok("💥 ") + + print("\nPROBLEM ENCOUNTERED: " + problem.message) + + if np.isnan(failed_waypoint_i): # pre-departure problem + print( + "\nRESULT: " + + LOG_MESSAGING["pre_departure_delay"].format( + delay_duration=problem.delay_duration.total_seconds() / 3600.0 + ) + ) + else: # problem occurring during expedition + print( + "\nRESULT: " + + LOG_MESSAGING["simulation_paused"].format( + checkpoint_path=self.expedition_dir.joinpath(CHECKPOINT) + ) + ) + # check if enough contingency time has been scheduled to avoid delay + print("\nAssessing impact on expedition schedule...\n") + failed_waypoint_time = self.schedule.waypoints[failed_waypoint_i].time + previous_waypoint_time = self.schedule.waypoints[failed_waypoint_i - 1].time + time_diff = ( + failed_waypoint_time - previous_waypoint_time + ).total_seconds() / 3600.0 # in hours + if time_diff >= problem.delay_duration.total_seconds() / 3600.0: + print(LOG_MESSAGING["problem_avoided"]) + return + else: + print( + f"\nNot enough contingency time scheduled to avoid delay of {problem.delay_duration.total_seconds() / 3600.0} hours.\n" + ) + + checkpoint = self._make_checkpoint(failed_waypoint_i) + _save_checkpoint(checkpoint, self.expedition_dir) + + return + + def _make_checkpoint(self, failed_waypoint_i: int | float = np.nan): + """Make checkpoint, also handling pre-departure.""" + if np.isnan(failed_waypoint_i): # handles pre-departure problems + checkpoint = Checkpoint( + past_schedule=self.schedule + ) # TODO: and then when it comes to verify checkpoint later, can determine whether the changes have been made to the schedule accordingly? + else: + checkpoint = Checkpoint( + past_schedule=Schedule( + waypoints=self.schedule.waypoints[:failed_waypoint_i] + ) + ) + return checkpoint + + def _make_hash(self, s: str, length: int) -> str: + """Make unique hash for problem occurrence.""" + assert length % 2 == 0, "Length must be even." + half_length = length // 2 + return hashlib.shake_128(s.encode("utf-8")).hexdigest(half_length) + + def _hash_to_json( + self, + problem: InstrumentProblem | GeneralProblem, + problem_hash: str, + failed_waypoint_i: int | float, + hash_path: Path, + ) -> dict: + """Convert problem details + hash to json.""" + os.makedirs(self.expedition_dir / "problems_encountered", exist_ok=True) + hash_data = { + "problem_hash": problem_hash, + "message": problem.message, + "failed_waypoint_i": failed_waypoint_i, + "delay_duration_hours": problem.delay_duration.total_seconds() / 3600.0, + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()), + "resolved": False, + } + with open(hash_path, "w") as f: + json.dump(hash_data, f, indent=4) diff --git a/src/virtualship/models/__init__.py b/src/virtualship/models/__init__.py index d61c17194..7a106ba60 100644 --- a/src/virtualship/models/__init__.py +++ b/src/virtualship/models/__init__.py @@ -1,5 +1,6 @@ """Pydantic models and data classes used to configure virtualship (i.e., in the configuration files or settings).""" +from .checkpoint import Checkpoint from .expedition import ( ADCPConfig, ArgoFloatConfig, @@ -34,4 +35,5 @@ "Spacetime", "Expedition", "InstrumentsConfig", + "Checkpoint", ] diff --git a/src/virtualship/models/checkpoint.py b/src/virtualship/models/checkpoint.py index 98fe1ae0a..1a734ba74 100644 --- a/src/virtualship/models/checkpoint.py +++ b/src/virtualship/models/checkpoint.py @@ -2,14 +2,17 @@ from __future__ import annotations +import json +from datetime import timedelta from pathlib import Path +import numpy as np import pydantic import yaml from virtualship.errors import CheckpointError from virtualship.instruments.types import InstrumentType -from virtualship.models import Schedule +from virtualship.models.expedition import Schedule class _YamlDumper(yaml.SafeDumper): @@ -51,20 +54,8 @@ def from_yaml(cls, file_path: str | Path) -> Checkpoint: data = yaml.safe_load(file) return Checkpoint(**data) - def verify(self, schedule: Schedule) -> None: - """ - Verify that the given schedule matches the checkpoint's past schedule. - - This method checks if the waypoints in the given schedule match the waypoints - in the checkpoint's past schedule up to the length of the past schedule. - If there's a mismatch, it raises a CheckpointError. - - :param schedule: The schedule to verify against the checkpoint. - :type schedule: Schedule - :raises CheckpointError: If the past waypoints in the given schedule - have been changed compared to the checkpoint. - :return: None - """ + def verify(self, schedule: Schedule, expedition_dir: Path) -> None: + """Verify that the given schedule matches the checkpoint's past schedule, and that problems have been resolved.""" if ( not schedule.waypoints[: len(self.past_schedule.waypoints)] == self.past_schedule.waypoints @@ -72,3 +63,56 @@ def verify(self, schedule: Schedule) -> None: raise CheckpointError( "Past waypoints in schedule have been changed! Restore past schedule and only change future waypoints." ) + + # TODO: how does this handle pre-departure problems that caused delays? Old schedule will be a complete mismatch then. + + # check that problems have been resolved in the new schedule + hash_fpaths = [ + str(path.resolve()) + for path in Path(expedition_dir, "problems_encountered").glob( + "problem_*.json" + ) + ] + + for file in hash_fpaths: + with open(file) as f: + problem = json.load(f) + if problem["resolved"]: + continue + elif not problem["resolved"]: + # check if delay has been accounted for in the schedule + delay_duration = timedelta( + hours=float(problem["delay_duration_hours"]) + ) # delay associated with the problem + + failed_waypoint_i = ( + int(problem["failed_waypoint_i"]) + if type(problem["failed_waypoint_i"]) is int + else np.nan + ) + + time_deltas = [ + schedule.waypoints[i].time + - self.past_schedule.waypoints[i].time + for i in range( + failed_waypoint_i, len(self.past_schedule.waypoints) + ) + ] # difference in time between the two schedules from the failed waypoint onwards + + if all(td >= delay_duration for td in time_deltas): + print( + "\n\nPrevious problem has been resolved in the schedule.\n" + ) + + # save back to json file changing the resolved status to True + problem["resolved"] = True + with open(file, "w") as f_out: + json.dump(problem, f_out, indent=4) + + else: + raise CheckpointError( + "The problem encountered in previous simulation has not been resolved in the schedule! Please adjust the schedule to account for delays caused by problem.", + f"The problem was associated with a delay duration of {problem['delay_duration_hours']} hours starting from waypoint {failed_waypoint_i + 1}.", + ) + + break # only handle the first unresolved problem found; others will be handled in subsequent runs but are not yet known to the user diff --git a/src/virtualship/utils.py b/src/virtualship/utils.py index b1926dc65..f0514e938 100644 --- a/src/virtualship/utils.py +++ b/src/virtualship/utils.py @@ -18,9 +18,11 @@ from virtualship.errors import CopernicusCatalogueError if TYPE_CHECKING: - from virtualship.expedition.simulate_schedule import ScheduleOk + from virtualship.expedition.simulate_schedule import ( + ScheduleOk, + ) from virtualship.models import Expedition - + from virtualship.models.checkpoint import Checkpoint import pandas as pd import yaml @@ -272,6 +274,27 @@ def add_dummy_UV(fieldset: FieldSet): ) from None +# problems inventory registry and registration utilities +INSTRUMENT_PROBLEM_MAP = [] +GENERAL_PROBLEM_REG = [] + + +def register_instrument_problem(instrument_type): + def decorator(cls): + INSTRUMENT_PROBLEM_MAP[instrument_type] = cls + return cls + + return decorator + + +def register_general_problem(): + def decorator(cls): + GENERAL_PROBLEM_REG.append(cls) + return cls + + return decorator + + # Copernicus Marine product IDs PRODUCT_IDS = { @@ -553,3 +576,8 @@ def _get_waypoint_latlons(waypoints): strict=True, ) return wp_lats, wp_lons + + +def _save_checkpoint(checkpoint: Checkpoint, expedition_dir: Path) -> None: + file_path = expedition_dir.joinpath(CHECKPOINT) + checkpoint.to_yaml(file_path)