Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 82 additions & 57 deletions appdaemon/app_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from pydantic import ValidationError


from appdaemon.dependency import DependencyResolutionFail, get_full_module_name
from appdaemon.dependency import DependencyResolutionFail, find_all_dependents, get_full_module_name
from appdaemon.dependency_manager import DependencyManager
from appdaemon.models.config import AllAppConfig, AppConfig, GlobalModule
from appdaemon.models.config.app import SequenceConfig
Expand Down Expand Up @@ -338,7 +338,7 @@ async def initialize_app(self, app_name: str):
else:
await utils.run_in_executor(self, init_func)

async def terminate_app(self, app_name: str, delete: bool = True) -> bool:
async def terminate_app(self, app_name: str, *, delete: bool = True) -> bool:
try:
if (obj := self.objects.get(app_name)) and (terminate := getattr(obj.object, "terminate", None)):
self.logger.info("Calling terminate() for '%s'", app_name)
Expand Down Expand Up @@ -367,11 +367,9 @@ async def terminate_app(self, app_name: str, delete: bool = True) -> bool:

finally:
self.logger.debug("Cleaning up app '%s'", app_name)
if obj := self.objects.get(app_name):
if delete:
del self.objects[app_name]
else:
obj.running = False
obj = self.objects.pop(app_name, None) if delete else self.objects.get(app_name)
if obj is not None:
obj.running = False

await self.increase_inactive_apps(app_name)

Expand Down Expand Up @@ -434,13 +432,8 @@ async def start_app(self, app_name: str):
case AppConfig():
# There is a valid app configuration for this dependency
match self.objects.get(dep_name):
case ManagedObject(type="app") as obj:
# There is an app being managed that matches the dependency
if not obj.running:
# But it's not running, so raise an exception
raise ade.DependencyNotRunning(*exc_args)
case _:
# TODO: make this a different exception
case ManagedObject(type="app", running=False):
# There is an app being managed that matches the dependency and isn't running
raise ade.DependencyNotRunning(*exc_args)
case GlobalModule() as dep_cfg:
# The dependency is a legacy global module
Expand Down Expand Up @@ -472,7 +465,7 @@ async def start_app(self, app_name: str):
}
await self.AD.events.process_event("admin", event_data)

async def stop_app(self, app_name: str, delete: bool = False) -> bool:
async def stop_app(self, app_name: str, *, delete: bool = False) -> bool:
"""Stops the app

Returns:
Expand All @@ -481,7 +474,7 @@ async def stop_app(self, app_name: str, delete: bool = False) -> bool:
try:
if isinstance(self.app_config[app_name], AppConfig):
self.logger.debug("Stopping app '%s'", app_name)
await self.terminate_app(app_name, delete)
await self.terminate_app(app_name, delete=delete)
except Exception:
error_logger = logging.getLogger(f"Error.{app_name}")
error_logger.warning("-" * 60)
Expand Down Expand Up @@ -1131,70 +1124,102 @@ async def _start_plugin_apps(self, plugin_ns: str | None, update_actions: Update
update_actions.apps.init |= deps

async def _stop_apps(self, update_actions: UpdateActions):
"""Terminate apps. Returns the set of app names that failed to properly terminate.
"""Terminate the apps from the update actions, including any dependent ones.

Part of self.check_app_updates sequence
"""
stop_order = update_actions.apps.term_sort(self.dependency_manager)
# stop_order = update_actions.apps.term_sort(self.app_config.depedency_graph())
indirect_stops = set(stop_order) - update_actions.apps.term_set
if stop_order:
self.logger.info("Stopping apps: %s", update_actions.apps.term_set)
self.logger.debug("App stop order: %s", stop_order)
self.logger.info("Stopping apps: %s", stop_order)
if indirect_stops:
self.logger.debug("Dependent apps: %s", indirect_stops)

failed_to_stop = set() # stores apps that had a problem terminating
for app_name in stop_order:
if not await self.stop_app(app_name):
failed_to_stop.add(app_name)
else:
successfully_stopped = await self.stop_app(app_name)
if successfully_stopped:
self.logger.info("Stopped app '%s'", app_name)
if app_name in indirect_stops:
update_actions.apps.init.add(app_name)
else:
failed_to_stop.add(app_name)

if failed_to_stop:
self.logger.debug("Removing %s apps because they failed to stop cleanly", len(failed_to_stop))
update_actions.apps.init -= failed_to_stop
update_actions.apps.reload -= failed_to_stop

def _filter_running_apps(self, *trackers: Iterable[str]) -> Iterable[Iterable[str]]:
"""App names that get added to the start order indirectly may already be running."""
for app_name in copy(trackers[0]):
match self.objects.get(app_name):
case ManagedObject(running=True):
self.logger.debug("Dependent app '%s' is already running", app_name)
for tracker in trackers:
match tracker:
case set():
tracker.discard(app_name)
case list():
tracker.remove(app_name)
case dict():
tracker.pop(app_name, None)
return trackers

async def _create_and_start_apps(self, update_actions: UpdateActions) -> None:
"""Creates and starts apps that are in the init set of the update actions."""
if failed := update_actions.apps.failed:
self.logger.warning("Failed to start apps: %s", failed)

start_order = update_actions.apps.start_sort(self.dependency_manager, self.logger)
if start_order:
self.logger.info("Starting apps: %s", update_actions.apps.init_set)
self.logger.debug("App start order: %s", start_order)

for app_name in start_order:
match self.app_config.root.get(app_name):
case AppConfig() as cfg if not cfg.disable:
if await self.create_app_object(app_name) is None:
update_actions.apps.failed.add(app_name)
case GlobalModule():
# Global modules are not started, they are just imported
self.logger.debug(f"Skipping global module '{app_name}'")
case None:
self.logger.warning(f"App '{app_name}' not found in app config")
indirect_starts = set(start_order) - update_actions.apps.init_set
self._filter_running_apps(indirect_starts, start_order)

# Need to have already created the ManagedObjects for the threads to get assigned
await self.AD.threading.calculate_pin_threads()
if not start_order:
return

# Need to recalculate start order to account for any failed object creations
start_order = update_actions.apps.start_sort(self.dependency_manager, self.logger)
if start_order:
for app_name in start_order:
match self.app_config.root.get(app_name):
case GlobalModule() as global_module:
assert global_module.module_name in sys.modules, f"{global_module.module_name} not in sys.modules"
case AppConfig() as cfg:
@ade.wrap_async(self.error, self.AD.app_dir, f"Failed to start '{app_name}'")
async def safe_start(self: "AppManagement"):
try:
await self.start_app(app_name)
except Exception as exc:
update_actions.apps.failed.add(app_name)
raise ade.AppStartFailure(app_name) from exc

if await self.get_state(app_name) != "compile_error":
await safe_start(self)
self.logger.info("Starting apps: %s", start_order)
if indirect_starts:
self.logger.debug("Dependents: %s", indirect_starts)

for app_name in start_order.copy():
match self.app_config.root.get(app_name):
case AppConfig(disable=False):
if await self.create_app_object(app_name) is None:
update_actions.apps.failed.add(app_name)
start_order.remove(app_name)
case GlobalModule():
# Global modules are not started, they are just imported
self.logger.debug(f"Skipping global module '{app_name}'")
case None:
self.logger.warning(f"App '{app_name}' not found in app config")

# Need to have already created the ManagedObjects for the threads to get assigned
await self.AD.threading.calculate_pin_threads()

# Account for failures and apps that depend on them
failed = update_actions.apps.failed
failed_deps = find_all_dependents(update_actions.apps.failed, self.dependency_manager.app_deps.rev_graph)
prevented_apps = [a for a in start_order if a in failed_deps and a not in failed]
if prevented_apps:
self.logger.warning("Failures of other apps prevented these apps from starting: %s", prevented_apps)
start_order = [a for a in start_order if a not in (failed | failed_deps)]

for app_name in start_order:
match self.app_config.root.get(app_name):
case GlobalModule(module_name=str(mod_name)):
assert mod_name in sys.modules, f"{mod_name} not in sys.modules"
case AppConfig():
@ade.wrap_async(self.error, self.AD.app_dir, f"Failed to start '{app_name}'")
async def safe_start(self: "AppManagement"):
try:
await self.start_app(app_name)
except Exception as exc:
update_actions.apps.failed.add(app_name)
raise ade.AppStartFailure(app_name) from exc

if await self.get_state(app_name) != "compile_error":
await safe_start(self)

async def _import_modules(self, update_actions: UpdateActions) -> set[str]:
"""Calls ``self.import_module`` for each module in the list
Expand Down
2 changes: 1 addition & 1 deletion appdaemon/models/internal/app_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def start_sort(self, dm: DependencyManager, logger: Logger | None = None) -> lis
def term_set(self) -> set[str]:
return self.reload | self.term

def term_sort(self, dm: DependencyManager):
def term_sort(self, dm: DependencyManager) -> list[str]:
"""Finds all the apps that need to be terminated.

Uses a dependency graph to sort the internal ``reload`` and ``term`` sets together
Expand Down
Loading