diff --git a/Sources/App/Scenes/WebViewSceneDelegate.swift b/Sources/App/Scenes/WebViewSceneDelegate.swift index 1ffec8ed8c..24d2609e7f 100644 --- a/Sources/App/Scenes/WebViewSceneDelegate.swift +++ b/Sources/App/Scenes/WebViewSceneDelegate.swift @@ -178,9 +178,6 @@ final class WebViewSceneDelegate: NSObject, UIWindowSceneDelegate { Current.modelManager.subscribe(isAppInForeground: { UIApplication.shared.applicationState == .active }) - - await Current.appDatabaseUpdater.update() - Current.panelsUpdater.update() } /// Force update location when user opens the app diff --git a/Sources/App/Settings/Settings/SettingsView.swift b/Sources/App/Settings/Settings/SettingsView.swift index 2c9926f770..3153c5dfcc 100644 --- a/Sources/App/Settings/Settings/SettingsView.swift +++ b/Sources/App/Settings/Settings/SettingsView.swift @@ -22,7 +22,9 @@ struct SettingsView: View { .onAppear { appDatabaseUpdaterTask?.cancel() appDatabaseUpdaterTask = Task { - await Current.appDatabaseUpdater.update() + for server in Current.servers.all { + await Current.appDatabaseUpdater.update(server: server) + } } } } diff --git a/Sources/App/WebView/WebViewController.swift b/Sources/App/WebView/WebViewController.swift index ff696c2c0b..36584862d5 100644 --- a/Sources/App/WebView/WebViewController.swift +++ b/Sources/App/WebView/WebViewController.swift @@ -740,6 +740,11 @@ final class WebViewController: UIViewController, WKNavigationDelegate, WKUIDeleg DispatchQueue.main.async { [self] in loadActiveURLIfNeeded() } + + Task { + await Current.appDatabaseUpdater.update(server: server) + Current.panelsUpdater.update() + } } private func showNoActiveURLError() { diff --git a/Sources/Shared/Environment/AppDatabaseUpdater.swift b/Sources/Shared/Environment/AppDatabaseUpdater.swift index d10db6213d..1fb47fc2a4 100644 --- a/Sources/Shared/Environment/AppDatabaseUpdater.swift +++ b/Sources/Shared/Environment/AppDatabaseUpdater.swift @@ -3,9 +3,12 @@ import GRDB import HAKit import UIKit +/// AppDatabaseUpdater coordinates fetching data from servers and persisting it into the local database. +/// It ensures only one update per server runs at a time (different servers can update concurrently), +/// applies per-server throttling with backoff, and performs careful cancellation and batched DB writes. public protocol AppDatabaseUpdaterProtocol { func stop() - func update() async + func update(server: Server) async } final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { @@ -13,10 +16,41 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { case noAPI } - static var shared = AppDatabaseUpdater() + // Actor for thread-safe task management + private actor TaskCoordinator { + private var currentUpdateTasks: [String: Task] = [:] + + func getTask(for serverId: String) -> Task? { + currentUpdateTasks[serverId] + } + + func setTask(_ task: Task, for serverId: String) { + currentUpdateTasks[serverId] = task + } + + func removeTask(for serverId: String) { + currentUpdateTasks.removeValue(forKey: serverId) + } + + func cancelAllTasks() { + for (_, task) in currentUpdateTasks { + task.cancel() + } + currentUpdateTasks.removeAll() + } + } + + private let taskCoordinator = TaskCoordinator() - private var lastUpdate: Date? - private var updateTask: Task? + // Simple adaptive throttling/backoff + // - Tracks consecutive failures per server to increase delay between attempts. + // - Tracks per-server last successful (or attempted) update times to avoid over-fetching. + private var consecutiveFailuresByServer: [String: Int] = [:] + private var perServerLastUpdate: [String: Date] = [:] + // Base throttle applied to all servers; backoff is added on top of this. + private let baseThrottleSeconds: TimeInterval = 120 + + static var shared = AppDatabaseUpdater() init() { NotificationCenter.default.addObserver( @@ -31,71 +65,139 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { stop() } + /// Cancels any in-flight work and clears transient state. + /// Called when app enters background or when we need to abort updates early. func stop() { - updateTask?.cancel() - updateTask = nil - } + Task { + await taskCoordinator.cancelAllTasks() + } - func update() async { - stop() + // Reset backoff tracking to free memory and avoid stale penalties + consecutiveFailuresByServer.removeAll() + } - if let lastUpdate, lastUpdate.timeIntervalSinceNow > -120 { - Current.Log.verbose("Skipping database update, last update was \(lastUpdate)") + /// Starts an update for a specific server. + /// - Parameter server: The specific server to update. + /// - Ensures only one update per server runs at a time. Different servers can update concurrently. + /// - Applies per-server throttling with exponential backoff on failures. + func update(server: Server) async { + let serverId = server.identifier.rawValue + + // Check if an update for this specific server is already running + if let existingTask = await taskCoordinator.getTask(for: serverId) { + Current.Log.verbose("Update already in progress for server \(server.info.name), awaiting existing task") + await existingTask.value return - } else { - lastUpdate = Date() } - Current.Log.verbose("Updating database, servers count \(Current.servers.all.count)") + Current.Log.verbose("Updating database for server \(server.info.name)") - updateTask = Task { [weak self] in - for server in Current.servers.all { - guard let self else { - break + // Launch the server-specific update task + let updateTask = Task { [weak self] in + guard let self else { return } + defer { + // Clean up task reference when complete + Task { + await self.taskCoordinator.removeTask(for: serverId) } + } - guard server.info.connection.activeURL() != nil else { - continue - } - // Check if task was cancelled before processing next server - if Task.isCancelled { - Current.Log.verbose("Update task cancelled") - break - } + await performSingleServerUpdate(server: server) + } - await updateServer(server: server) - } + // Store the task for this server + await taskCoordinator.setTask(updateTask, for: serverId) + + await updateTask.value + } + + /// Determines if a specific server should be updated based on connection and throttle rules. + private func shouldUpdateServer(_ server: Server) -> Bool { + guard server.info.connection.activeURL() != nil else { return false } + if Task.isCancelled { return false } + + // Per-server throttle with exponential backoff + if let last = perServerLastUpdate[server.identifier.rawValue] { + let failures = consecutiveFailuresByServer[server.identifier.rawValue] ?? 0 + let backoff = min(pow(2.0, Double(failures)) * 10.0, 300.0) // 10s, 20s, 40s... up to 5m + let threshold = -(baseThrottleSeconds + backoff) + return last.timeIntervalSinceNow <= threshold + } + return true + } + + /// Performs an update for a single specific server. + private func performSingleServerUpdate(server: Server) async { + guard !Task.isCancelled else { return } + guard shouldUpdateServer(server) else { + Current.Log.verbose("Skipping update for server \(server.info.name) - throttled") + return } - await updateTask?.value + let success = await safeUpdateServer(server: server) + updateServerTracking(serverId: server.identifier.rawValue, success: success) } + /// Updates per-server tracking after an update attempt completes. + private func updateServerTracking(serverId: String, success: Bool) { + if success { + perServerLastUpdate[serverId] = Date() + consecutiveFailuresByServer[serverId] = 0 + } else { + consecutiveFailuresByServer[serverId, default: 0] += 1 + } + } + + /// Wraps a per-server update with cancellation checks and returns whether it succeeded. + /// This allows the scheduler to apply backoff on failures and update last-run times on success. + private func safeUpdateServer(server: Server) async -> Bool { + if Task.isCancelled { return false } + await updateServer(server: server) + if Task.isCancelled { return false } + return true + } + + /// Runs the full update pipeline for a single server in sequence. + /// Each phase checks for cancellation to bail out quickly when needed. private func updateServer(server: Server) async { - // Entities (fetch_states) + guard !Task.isCancelled else { return } + // 1) Entities (fetch_states) await updateEntitiesDatabase(server: server) - - // Entities registry list for display + if Task.isCancelled { return } + // 2) Entities registry list for display await updateEntitiesRegistryListForDisplay(server: server) - - // Entities registry + if Task.isCancelled { return } + // 3) Entities registry await updateEntitiesRegistry(server: server) - - // Devices registry + if Task.isCancelled { return } + // 4) Devices registry await updateDevicesRegistry(server: server) - - // Areas with their entities + if Task.isCancelled { return } + // 5) Areas with their entities // IMPORTANT: This must be executed after entities and device registry // since we rely on that data to map entities to areas await updateAreasDatabase(server: server) } + /// Fetches entities' states from the API and forwards results to persistence. + /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesDatabase(server: Server) async { - await withCheckedContinuation { (continuation: CheckedContinuation) in + guard !Task.isCancelled else { return } + await withCheckedContinuation { [weak self] (continuation: CheckedContinuation) in + guard self != nil else { + continuation.resume() + return + } guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume() return } + // If cancelled after acquiring API, resume the continuation to avoid hanging. + if Task.isCancelled { + continuation.resume() + return + } api.connection.send(HATypedRequest<[HAEntity]>.fetchStates()) { result in switch result { case let .success(entities): @@ -115,17 +217,29 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Fetches entity registry from the API and forwards results to persistence. + /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesRegistry(server: Server) async { + guard !Task.isCancelled else { return } let registryEntries: [EntityRegistryEntry]? = - await withCheckedContinuation { (continuation: CheckedContinuation< + await withCheckedContinuation { [weak self] (continuation: CheckedContinuation< [EntityRegistryEntry]?, Never >) in + guard let self else { + continuation.resume(returning: nil) + return + } guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } + // If cancelled after acquiring API, resume the continuation to avoid hanging. + if Task.isCancelled { + continuation.resume(returning: nil) + return + } api.connection.send(.configEntityRegistryList()) { result in switch result { case let .success(entries): @@ -150,17 +264,29 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Fetches device registry from the API and forwards results to persistence. + /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateDevicesRegistry(server: Server) async { + guard !Task.isCancelled else { return } let registryEntries: [DeviceRegistryEntry]? = - await withCheckedContinuation { (continuation: CheckedContinuation< + await withCheckedContinuation { [weak self] (continuation: CheckedContinuation< [DeviceRegistryEntry]?, Never >) in + guard let self else { + continuation.resume(returning: nil) + return + } guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } + // If cancelled after acquiring API, resume the continuation to avoid hanging. + if Task.isCancelled { + continuation.resume(returning: nil) + return + } api.connection.send(.configDeviceRegistryList()) { result in switch result { case let .success(entries): @@ -185,17 +311,29 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Fetches entity registry list-for-display from the API and forwards results to persistence. + /// Early-exits on cancellation and resumes continuations to avoid leaks. private func updateEntitiesRegistryListForDisplay(server: Server) async { + guard !Task.isCancelled else { return } let response: EntityRegistryListForDisplay? = - await withCheckedContinuation { (continuation: CheckedContinuation< + await withCheckedContinuation { [weak self] (continuation: CheckedContinuation< EntityRegistryListForDisplay?, Never >) in + guard let self else { + continuation.resume(returning: nil) + return + } guard let api = Current.api(for: server) else { Current.Log.error("No API available for server \(server.info.name)") continuation.resume(returning: nil) return } + // If cancelled after acquiring API, resume the continuation to avoid hanging. + if Task.isCancelled { + continuation.resume(returning: nil) + return + } api.connection.send( HATypedRequest.configEntityRegistryListForDisplay() ) { result in @@ -236,6 +374,9 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { ) } + /// Persists areas and their entity relationships for a server. + /// Uses a single asyncWrite transaction for batching, replaces existing rows, and deletes stale ones. + /// For simplicity and speed, we upsert via `save(onConflict: .replace)`; deeper diffing can be added if needed. private func saveAreasToDatabase( areas: [HAAreasRegistryResponse], areasAndEntities: [String: Set], @@ -255,8 +396,17 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { ) } + // Nothing to persist; keep going (delete pass below might still remove stale rows). + if appAreas.isEmpty { + Current.Log.verbose("No areas to save for server \(serverId)") + } + do { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in + guard self != nil else { + continuation.resume(throwing: CancellationError()) + return + } Current.database().asyncWrite { db in let existingAreaIds = try AppArea .filter(Column(DatabaseTables.AppArea.serverId.rawValue) == serverId) @@ -301,6 +451,8 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Persists the entity registry list-for-display for a server with batched writes and stale deletions. + /// Builds the payload with a streaming loop to reduce intermediate allocations vs filter+map. private func saveEntityRegistryListForDisplay(_ response: EntityRegistryListForDisplay, serverId: String) async { // Check for cancellation before starting database work guard !Task.isCancelled else { @@ -308,17 +460,33 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { return } - let entitiesListForDisplay = response.entities.filter({ $0.decimalPlaces != nil || $0.entityCategory != nil }) - .map { registry in - AppEntityRegistryListForDisplay( - id: ServerEntity.uniqueId(serverId: serverId, entityId: registry.entityId), - serverId: serverId, - entityId: registry.entityId, - registry: registry + var entitiesListForDisplay: [AppEntityRegistryListForDisplay] = [] + entitiesListForDisplay.reserveCapacity(response.entities.count) + for registry in response.entities { + if registry.decimalPlaces != nil || registry.entityCategory != nil { + entitiesListForDisplay.append( + AppEntityRegistryListForDisplay( + id: ServerEntity.uniqueId(serverId: serverId, entityId: registry.entityId), + serverId: serverId, + entityId: registry.entityId, + registry: registry + ) ) } + } do { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in + guard self != nil else { + continuation.resume(throwing: CancellationError()) + return + } + guard !Task.isCancelled else { + continuation.resume(throwing: CancellationError()) + return + } + // Note: we batch entities into memory before this write. This is a trade-off for simpler, atomic + // updates; + // if memory usage becomes an issue for very large datasets, consider a streaming or chunked approach. Current.database().asyncWrite { db in // Get existing IDs for this server let existingIds = try AppEntityRegistryListForDisplay @@ -364,8 +532,9 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Persists the entity registry for a server using a single transaction and differential deletes. private func saveEntityRegistry(_ registryEntries: [EntityRegistryEntry], serverId: String) async { - // Check for cancellation before starting database work + // If cancelled before touching the DB, bail out early to avoid unnecessary work. guard !Task.isCancelled else { Current.Log.verbose("Skipping entity registry database save - task cancelled") return @@ -376,7 +545,15 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } do { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in + guard self != nil else { + continuation.resume(throwing: CancellationError()) + return + } + if Task.isCancelled { + continuation.resume(throwing: CancellationError()) + return + } Current.database().asyncWrite { db in // Get existing unique IDs for this server let existingIds = try AppEntityRegistry @@ -425,8 +602,9 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } } + /// Persists the device registry for a server using a single transaction and differential deletes. private func saveDeviceRegistry(_ registryEntries: [DeviceRegistryEntry], serverId: String) async { - // Check for cancellation before starting database work + // If cancelled before touching the DB, bail out early to avoid unnecessary work. guard !Task.isCancelled else { Current.Log.verbose("Skipping device registry database save - task cancelled") return @@ -437,7 +615,15 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol { } do { - try await withCheckedThrowingContinuation { (continuation: CheckedContinuation) in + try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in + guard self != nil else { + continuation.resume(throwing: CancellationError()) + return + } + if Task.isCancelled { + continuation.resume(throwing: CancellationError()) + return + } Current.database().asyncWrite { db in // Get existing device IDs for this server let existingIds = try AppDeviceRegistry