diff --git a/appdaemon/app_management.py b/appdaemon/app_management.py index f3ee7ef0b..11d7a745f 100644 --- a/appdaemon/app_management.py +++ b/appdaemon/app_management.py @@ -22,7 +22,7 @@ from pydantic import ValidationError -from appdaemon.dependency import DependencyResolutionFail, get_full_module_name +from appdaemon.dependency import DependencyResolutionFail, find_all_dependents, get_full_module_name from appdaemon.dependency_manager import DependencyManager from appdaemon.models.config import AllAppConfig, AppConfig, GlobalModule from appdaemon.models.config.app import SequenceConfig @@ -338,7 +338,7 @@ async def initialize_app(self, app_name: str): else: await utils.run_in_executor(self, init_func) - async def terminate_app(self, app_name: str, delete: bool = True) -> bool: + async def terminate_app(self, app_name: str, *, delete: bool = True) -> bool: try: if (obj := self.objects.get(app_name)) and (terminate := getattr(obj.object, "terminate", None)): self.logger.info("Calling terminate() for '%s'", app_name) @@ -367,11 +367,9 @@ async def terminate_app(self, app_name: str, delete: bool = True) -> bool: finally: self.logger.debug("Cleaning up app '%s'", app_name) - if obj := self.objects.get(app_name): - if delete: - del self.objects[app_name] - else: - obj.running = False + obj = self.objects.pop(app_name, None) if delete else self.objects.get(app_name) + if obj is not None: + obj.running = False await self.increase_inactive_apps(app_name) @@ -434,13 +432,8 @@ async def start_app(self, app_name: str): case AppConfig(): # There is a valid app configuration for this dependency match self.objects.get(dep_name): - case ManagedObject(type="app") as obj: - # There is an app being managed that matches the dependency - if not obj.running: - # But it's not running, so raise an exception - raise ade.DependencyNotRunning(*exc_args) - case _: - # TODO: make this a different exception + case ManagedObject(type="app", running=False): + # There is an app being managed that matches the dependency and isn't running raise ade.DependencyNotRunning(*exc_args) case GlobalModule() as dep_cfg: # The dependency is a legacy global module @@ -472,7 +465,7 @@ async def start_app(self, app_name: str): } await self.AD.events.process_event("admin", event_data) - async def stop_app(self, app_name: str, delete: bool = False) -> bool: + async def stop_app(self, app_name: str, *, delete: bool = False) -> bool: """Stops the app Returns: @@ -481,7 +474,7 @@ async def stop_app(self, app_name: str, delete: bool = False) -> bool: try: if isinstance(self.app_config[app_name], AppConfig): self.logger.debug("Stopping app '%s'", app_name) - await self.terminate_app(app_name, delete) + await self.terminate_app(app_name, delete=delete) except Exception: error_logger = logging.getLogger(f"Error.{app_name}") error_logger.warning("-" * 60) @@ -1131,70 +1124,102 @@ async def _start_plugin_apps(self, plugin_ns: str | None, update_actions: Update update_actions.apps.init |= deps async def _stop_apps(self, update_actions: UpdateActions): - """Terminate apps. Returns the set of app names that failed to properly terminate. + """Terminate the apps from the update actions, including any dependent ones. Part of self.check_app_updates sequence """ stop_order = update_actions.apps.term_sort(self.dependency_manager) - # stop_order = update_actions.apps.term_sort(self.app_config.depedency_graph()) + indirect_stops = set(stop_order) - update_actions.apps.term_set if stop_order: - self.logger.info("Stopping apps: %s", update_actions.apps.term_set) - self.logger.debug("App stop order: %s", stop_order) + self.logger.info("Stopping apps: %s", stop_order) + if indirect_stops: + self.logger.debug("Dependent apps: %s", indirect_stops) failed_to_stop = set() # stores apps that had a problem terminating for app_name in stop_order: - if not await self.stop_app(app_name): - failed_to_stop.add(app_name) - else: + successfully_stopped = await self.stop_app(app_name) + if successfully_stopped: self.logger.info("Stopped app '%s'", app_name) + if app_name in indirect_stops: + update_actions.apps.init.add(app_name) + else: + failed_to_stop.add(app_name) if failed_to_stop: self.logger.debug("Removing %s apps because they failed to stop cleanly", len(failed_to_stop)) update_actions.apps.init -= failed_to_stop update_actions.apps.reload -= failed_to_stop + def _filter_running_apps(self, *trackers: Iterable[str]) -> Iterable[Iterable[str]]: + """App names that get added to the start order indirectly may already be running.""" + for app_name in copy(trackers[0]): + match self.objects.get(app_name): + case ManagedObject(running=True): + self.logger.debug("Dependent app '%s' is already running", app_name) + for tracker in trackers: + match tracker: + case set(): + tracker.discard(app_name) + case list(): + tracker.remove(app_name) + case dict(): + tracker.pop(app_name, None) + return trackers + async def _create_and_start_apps(self, update_actions: UpdateActions) -> None: """Creates and starts apps that are in the init set of the update actions.""" if failed := update_actions.apps.failed: self.logger.warning("Failed to start apps: %s", failed) start_order = update_actions.apps.start_sort(self.dependency_manager, self.logger) - if start_order: - self.logger.info("Starting apps: %s", update_actions.apps.init_set) - self.logger.debug("App start order: %s", start_order) - - for app_name in start_order: - match self.app_config.root.get(app_name): - case AppConfig() as cfg if not cfg.disable: - if await self.create_app_object(app_name) is None: - update_actions.apps.failed.add(app_name) - case GlobalModule(): - # Global modules are not started, they are just imported - self.logger.debug(f"Skipping global module '{app_name}'") - case None: - self.logger.warning(f"App '{app_name}' not found in app config") + indirect_starts = set(start_order) - update_actions.apps.init_set + self._filter_running_apps(indirect_starts, start_order) - # Need to have already created the ManagedObjects for the threads to get assigned - await self.AD.threading.calculate_pin_threads() + if not start_order: + return - # Need to recalculate start order to account for any failed object creations - start_order = update_actions.apps.start_sort(self.dependency_manager, self.logger) - if start_order: - for app_name in start_order: - match self.app_config.root.get(app_name): - case GlobalModule() as global_module: - assert global_module.module_name in sys.modules, f"{global_module.module_name} not in sys.modules" - case AppConfig() as cfg: - @ade.wrap_async(self.error, self.AD.app_dir, f"Failed to start '{app_name}'") - async def safe_start(self: "AppManagement"): - try: - await self.start_app(app_name) - except Exception as exc: - update_actions.apps.failed.add(app_name) - raise ade.AppStartFailure(app_name) from exc - - if await self.get_state(app_name) != "compile_error": - await safe_start(self) + self.logger.info("Starting apps: %s", start_order) + if indirect_starts: + self.logger.debug("Dependents: %s", indirect_starts) + + for app_name in start_order.copy(): + match self.app_config.root.get(app_name): + case AppConfig(disable=False): + if await self.create_app_object(app_name) is None: + update_actions.apps.failed.add(app_name) + start_order.remove(app_name) + case GlobalModule(): + # Global modules are not started, they are just imported + self.logger.debug(f"Skipping global module '{app_name}'") + case None: + self.logger.warning(f"App '{app_name}' not found in app config") + + # Need to have already created the ManagedObjects for the threads to get assigned + await self.AD.threading.calculate_pin_threads() + + # Account for failures and apps that depend on them + failed = update_actions.apps.failed + failed_deps = find_all_dependents(update_actions.apps.failed, self.dependency_manager.app_deps.rev_graph) + prevented_apps = [a for a in start_order if a in failed_deps and a not in failed] + if prevented_apps: + self.logger.warning("Failures of other apps prevented these apps from starting: %s", prevented_apps) + start_order = [a for a in start_order if a not in (failed | failed_deps)] + + for app_name in start_order: + match self.app_config.root.get(app_name): + case GlobalModule(module_name=str(mod_name)): + assert mod_name in sys.modules, f"{mod_name} not in sys.modules" + case AppConfig(): + @ade.wrap_async(self.error, self.AD.app_dir, f"Failed to start '{app_name}'") + async def safe_start(self: "AppManagement"): + try: + await self.start_app(app_name) + except Exception as exc: + update_actions.apps.failed.add(app_name) + raise ade.AppStartFailure(app_name) from exc + + if await self.get_state(app_name) != "compile_error": + await safe_start(self) async def _import_modules(self, update_actions: UpdateActions) -> set[str]: """Calls ``self.import_module`` for each module in the list diff --git a/appdaemon/models/internal/app_management.py b/appdaemon/models/internal/app_management.py index 03087b16e..c2918b880 100644 --- a/appdaemon/models/internal/app_management.py +++ b/appdaemon/models/internal/app_management.py @@ -107,7 +107,7 @@ def start_sort(self, dm: DependencyManager, logger: Logger | None = None) -> lis def term_set(self) -> set[str]: return self.reload | self.term - def term_sort(self, dm: DependencyManager): + def term_sort(self, dm: DependencyManager) -> list[str]: """Finds all the apps that need to be terminated. Uses a dependency graph to sort the internal ``reload`` and ``term`` sets together