Skip to content

Commit a382f60

Browse files
committed
feat(startup): add dependency-aware parallel startup and shutdown-safe waits
- run service startup in parallel while honoring deps (postgres→riven/zilean, riven→frontend, pgadmin→postgres) - add mount/url wait logic to process start and make waits shutdown-safe - ensure rclone waits on decypharr/nzbdav webdav endpoints - inject mount wait lists for media/arr services - addresses potential shutdown issues during dependency checks by incorporating shutdown checks. - updates Zilean and Riven connection strings based on the Postgres configuration.
1 parent 2fdc98b commit a382f60

6 files changed

Lines changed: 459 additions & 24 deletions

File tree

main.py

Lines changed: 158 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,8 @@
88
from utils.auto_update import Update
99
from utils.dependencies import initialize_dependencies
1010
from utils.plex_dbrepair import start_plex_dbrepair_worker
11-
import subprocess, threading, time, tomllib, os, socket, errno, psutil
11+
from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED
12+
import subprocess, threading, time, tomllib, os, socket, errno, psutil, json
1213

1314

1415
def log_ascii_art():
@@ -265,6 +266,153 @@ def start_configured_process(config_obj, updater, key_name, exit_on_error=True):
265266
raise
266267

267268

269+
def _service_has_enabled_instance(config_obj: dict) -> bool:
270+
if not isinstance(config_obj, dict):
271+
return False
272+
if "instances" in config_obj and isinstance(config_obj["instances"], dict):
273+
return any(
274+
isinstance(inst, dict) and inst.get("enabled")
275+
for inst in config_obj["instances"].values()
276+
)
277+
return bool(config_obj.get("enabled"))
278+
279+
280+
def _read_decypharr_mount_path(decypharr_cfg: dict) -> str | None:
281+
if not decypharr_cfg.get("use_embedded_rclone"):
282+
return None
283+
config_file = decypharr_cfg.get("config_file")
284+
if config_file and os.path.exists(config_file):
285+
try:
286+
with open(config_file, "r") as handle:
287+
data = json.load(handle)
288+
mount_path = (data.get("rclone") or {}).get("mount_path")
289+
if isinstance(mount_path, str) and mount_path.strip():
290+
return mount_path
291+
except Exception as e:
292+
logger.debug("Failed to read Decypharr mount path: %s", e)
293+
return "/mnt/debrid/decypharr"
294+
295+
296+
def _collect_mount_paths(config_manager) -> list[str]:
297+
mount_paths = set()
298+
rclone_instances = config_manager.get("rclone", {}).get("instances", {}) or {}
299+
for instance in rclone_instances.values():
300+
if not isinstance(instance, dict) or not instance.get("enabled"):
301+
continue
302+
mount_dir = instance.get("mount_dir")
303+
mount_name = instance.get("mount_name")
304+
if mount_dir and mount_name:
305+
mount_paths.add(os.path.join(mount_dir, mount_name))
306+
307+
decypharr_cfg = config_manager.get("decypharr", {}) or {}
308+
if decypharr_cfg.get("enabled") and decypharr_cfg.get("use_embedded_rclone"):
309+
mount_path = _read_decypharr_mount_path(decypharr_cfg)
310+
if mount_path:
311+
mount_paths.add(mount_path)
312+
313+
return sorted(mount_paths)
314+
315+
316+
def _merge_wait_for_mounts(config_obj: dict, mount_paths: list[str]) -> None:
317+
existing = config_obj.get("wait_for_mounts") or []
318+
merged = sorted(set(existing) | set(mount_paths))
319+
if merged:
320+
config_obj["wait_for_mounts"] = merged
321+
322+
323+
def _apply_mount_waits(config_manager, mount_paths: list[str]) -> None:
324+
if not mount_paths:
325+
return
326+
mount_wait_keys = {
327+
"plex",
328+
"jellyfin",
329+
"emby",
330+
}
331+
for key in mount_wait_keys:
332+
cfg = config_manager.get(key, {})
333+
if not isinstance(cfg, dict):
334+
continue
335+
if "instances" in cfg and isinstance(cfg["instances"], dict):
336+
for inst in cfg["instances"].values():
337+
if isinstance(inst, dict) and inst.get("enabled"):
338+
_merge_wait_for_mounts(inst, mount_paths)
339+
elif cfg.get("enabled"):
340+
_merge_wait_for_mounts(cfg, mount_paths)
341+
342+
343+
def _build_dependency_map(config_manager) -> dict[str, set[str]]:
344+
deps = {
345+
"riven_backend": {"postgres"},
346+
"riven_frontend": {"riven_backend"},
347+
"zilean": {"postgres"},
348+
"pgadmin": {"postgres"},
349+
}
350+
351+
rclone_deps = set()
352+
rclone_instances = config_manager.get("rclone", {}).get("instances", {}) or {}
353+
for instance in rclone_instances.values():
354+
if not isinstance(instance, dict) or not instance.get("enabled"):
355+
continue
356+
if instance.get("zurg_enabled"):
357+
rclone_deps.add("zurg")
358+
if instance.get("decypharr_enabled"):
359+
rclone_deps.add("decypharr")
360+
key_type = (instance.get("key_type") or "").lower()
361+
if key_type == "nzbdav" or instance.get("core_service") == "nzbdav":
362+
rclone_deps.add("nzbdav")
363+
if rclone_deps:
364+
deps["rclone"] = rclone_deps
365+
366+
return deps
367+
368+
369+
def _start_processes_with_dependencies(
370+
process_handler, updater, config_manager, keys: list[str], dependency_map
371+
) -> None:
372+
enabled = {
373+
key: _service_has_enabled_instance(config_manager.get(key, {})) for key in keys
374+
}
375+
pending = {key for key in keys if enabled.get(key)}
376+
deps = {
377+
key: {d for d in dependency_map.get(key, set()) if enabled.get(d)}
378+
for key in pending
379+
}
380+
381+
def _start_key(key: str) -> None:
382+
cfg = config_manager.get(key, {})
383+
start_configured_process(cfg, updater, key)
384+
385+
in_progress = {}
386+
completed = set()
387+
with ThreadPoolExecutor() as executor:
388+
while pending or in_progress:
389+
if process_handler.shutting_down:
390+
for future in list(in_progress):
391+
future.cancel()
392+
return
393+
ready = [key for key in list(pending) if deps.get(key, set()) <= completed]
394+
for key in ready:
395+
if process_handler.shutting_down:
396+
break
397+
pending.remove(key)
398+
in_progress[executor.submit(_start_key, key)] = key
399+
400+
if not in_progress:
401+
raise RuntimeError(
402+
f"Dependency resolution stalled. Remaining: {sorted(pending)}"
403+
)
404+
405+
done, _ = wait(in_progress.keys(), return_when=FIRST_COMPLETED)
406+
for future in done:
407+
key = in_progress.pop(future)
408+
try:
409+
future.result()
410+
except Exception as e:
411+
logger.error("Failed while starting %s: %s", key, e)
412+
process_handler.shutdown(exit_code=1)
413+
completed.add(key)
414+
415+
268416
def main():
269417
log_ascii_art()
270418

@@ -290,6 +438,8 @@ def main():
290438
process_handler.shutdown(exit_code=1)
291439

292440
_apply_global_port_reservations(config)
441+
mount_paths = _collect_mount_paths(config)
442+
_apply_mount_waits(config, mount_paths)
293443

294444
if config.get("dumb", {}).get("api_service", {}).get("enabled"):
295445
start_fastapi_process()
@@ -351,8 +501,10 @@ def _get_metrics_cfg():
351501
time.sleep(interval)
352502

353503
try:
504+
if config.get("traefik", {}).get("enabled"):
505+
start_configured_process(config.get("traefik", {}), updater, "traefik")
506+
354507
grouped_keys = [
355-
"traefik",
356508
"zurg",
357509
"prowlarr",
358510
"radarr",
@@ -377,9 +529,10 @@ def _get_metrics_cfg():
377529
"tautulli",
378530
"seerr",
379531
]
380-
for key in grouped_keys:
381-
cfg = config.get(key, {})
382-
start_configured_process(cfg, updater, key)
532+
dependency_map = _build_dependency_map(config)
533+
_start_processes_with_dependencies(
534+
process_handler, updater, config, grouped_keys, dependency_map
535+
)
383536

384537
except Exception as e:
385538
logger.error(e)

utils/auto_update.py

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -507,11 +507,40 @@ def start_process(self, process_name, config, key, instance_name):
507507

508508
if config.get("wait_for_dir", False):
509509
while not os.path.exists(wait_dir := config["wait_for_dir"]):
510+
if self.process_handler.shutting_down:
511+
self.logger.info(
512+
"Shutdown requested; skipping wait for directory %s.",
513+
wait_dir,
514+
)
515+
return False, "Shutdown requested"
510516
self.logger.info(
511517
f"Waiting for directory {wait_dir} to become available before starting {process_name}"
512518
)
513519
time.sleep(10)
514520

521+
wait_mounts = config.get("wait_for_mounts") or []
522+
if wait_mounts:
523+
while True:
524+
if self.process_handler.shutting_down:
525+
self.logger.info(
526+
"Shutdown requested; skipping wait for mounts before %s.",
527+
process_name,
528+
)
529+
return False, "Shutdown requested"
530+
missing = [
531+
mount_path
532+
for mount_path in wait_mounts
533+
if not os.path.ismount(mount_path)
534+
]
535+
if not missing:
536+
break
537+
self.logger.info(
538+
"Waiting for mounts to become available before starting %s: %s",
539+
process_name,
540+
", ".join(missing),
541+
)
542+
time.sleep(10)
543+
515544
if config.get("wait_for_url", False):
516545
wait_for_urls = config["wait_for_url"]
517546
time.sleep(5)
@@ -526,6 +555,12 @@ def start_process(self, process_name, config, key, instance_name):
526555
)
527556

528557
while time.time() - start_time < 600:
558+
if self.process_handler.shutting_down:
559+
self.logger.info(
560+
"Shutdown requested; skipping wait for %s.",
561+
wait_url,
562+
)
563+
return False, "Shutdown requested"
529564
try:
530565
if auth:
531566
response = requests.get(
@@ -581,13 +616,17 @@ def start_process(self, process_name, config, key, instance_name):
581616
suppress_logging=suppress_logging,
582617
env=env,
583618
)
619+
if self.process_handler.shutting_down:
620+
return process, "Shutdown requested"
584621
if key == "riven_backend":
585622
from utils.riven_settings import load_settings
586623

587624
time.sleep(10)
588625
load_settings()
589626

590627
if key == "decypharr":
628+
if self.process_handler.shutting_down:
629+
return process, "Shutdown requested"
591630
from utils.decypharr_settings import patch_decypharr_config
592631

593632
time.sleep(10)
@@ -607,6 +646,8 @@ def start_process(self, process_name, config, key, instance_name):
607646
self.logger.warning("Decypharr config patch failed: %s", error)
608647

609648
if key == "nzbdav":
649+
if self.process_handler.shutting_down:
650+
return process, "Shutdown requested"
610651
from utils.nzbdav_settings import patch_nzbdav_config
611652

612653
time.sleep(10)
@@ -634,6 +675,8 @@ def start_process(self, process_name, config, key, instance_name):
634675
"whisparr",
635676
"whisparr-v3",
636677
]:
678+
if self.process_handler.shutting_down:
679+
return process, "Shutdown requested"
637680
from utils.prowlarr_settings import patch_prowlarr_apps
638681

639682
time.sleep(10)
@@ -642,6 +685,8 @@ def start_process(self, process_name, config, key, instance_name):
642685
self.logger.warning("Prowlarr app sync failed: %s", err)
643686

644687
if key == "plex":
688+
if self.process_handler.shutting_down:
689+
return process, "Shutdown requested"
645690
from utils.plex_settings import patch_plex_config
646691

647692
time.sleep(10)

utils/decypharr_settings.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,8 @@ def _wait_for_arr(
364364
deadline = time.time() + max(1, timeout_s)
365365
url = _join(host, "/api/v3/system/status")
366366
while time.time() < deadline:
367+
if _shutdown_requested():
368+
return False
367369
try:
368370
_arr_req(url, token, "GET", timeout=5)
369371
return True
@@ -384,6 +386,8 @@ def _with_retries(
384386
last_err = None
385387
delay = max(0.1, base_delay_s)
386388
for i in range(max(1, attempts)):
389+
if _shutdown_requested():
390+
raise RuntimeError("Shutdown requested")
387391
try:
388392
return fn(*args, **kwargs)
389393
except Exception as e:
@@ -395,12 +399,24 @@ def _with_retries(
395399
raise last_err if last_err else RuntimeError("retry failed")
396400

397401

402+
def _shutdown_requested() -> bool:
403+
try:
404+
from utils.dependencies import get_process_handler
405+
406+
handler = get_process_handler()
407+
except Exception:
408+
return False
409+
return bool(getattr(handler, "shutting_down", False))
410+
411+
398412
# ---------------------------------------------------------------------------
399413
# Main patch entrypoint
400414
# ---------------------------------------------------------------------------
401415

402416

403417
def patch_decypharr_config():
418+
if _shutdown_requested():
419+
return False, "Shutdown requested"
404420
config_path = CONFIG_MANAGER.get("decypharr", {}).get(
405421
"config_file", "/decypharr/config.json"
406422
)

0 commit comments

Comments
 (0)