Skip to content

Commit 007604c

Browse files
authored
Queue server database updates for sequential processing (#4232)
1 parent 4a7df09 commit 007604c

File tree

2 files changed

+82
-64
lines changed

2 files changed

+82
-64
lines changed

Sources/Shared/AreasService.swift

Lines changed: 23 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -122,59 +122,43 @@ final class AreasService: AreasServiceProtocol {
122122
devicesAndAreas: [AppDeviceRegistry],
123123
entitiesAndAreas: [AppEntityRegistry]
124124
) -> [String: Set<String>] {
125-
/// area_id : [device_id]
126-
var areasAndDevicesDict: [String: [String]] = [:]
125+
/// area_id : Set<device_id>
126+
var areasAndDevicesDict: [String: Set<String>] = [:]
127+
/// device_id : area_id (reverse lookup for O(1) access)
128+
var deviceToAreaMap: [String: String] = [:]
127129

128-
// Get all devices from an area
130+
// Build area->devices mapping and device->area reverse lookup
129131
for device in devicesAndAreas {
130-
let deviceId = device.deviceId
131132
if let areaId = device.areaId {
132-
if var deviceIds = areasAndDevicesDict[areaId] {
133-
deviceIds.append(deviceId)
134-
areasAndDevicesDict[areaId] = deviceIds
135-
} else {
136-
areasAndDevicesDict[areaId] = [deviceId]
137-
}
133+
areasAndDevicesDict[areaId, default: []].insert(device.deviceId)
134+
deviceToAreaMap[device.deviceId] = areaId
138135
}
139136
}
140137

141-
/// area_id : [entity_id]
138+
/// area_id : Set<entity_id>
142139
var areasAndEntitiesDict: [String: Set<String>] = [:]
140+
/// device_id : Set<entity_id> (built in one pass)
141+
var deviceChildrenEntities: [String: Set<String>] = [:]
143142

144-
// Get all entities from an area
143+
// Single pass through entities: add to areas and build device->entities mapping
145144
for entity in entitiesAndAreas {
146-
if let areaId = entity.areaId, let entityId = entity.entityId {
147-
if var entityIds = areasAndEntitiesDict[areaId] {
148-
entityIds.insert(entityId)
149-
areasAndEntitiesDict[areaId] = entityIds
150-
} else {
151-
areasAndEntitiesDict[areaId] = [entityId]
152-
}
153-
}
154-
}
145+
guard let entityId = entity.entityId else { continue }
155146

156-
/// device_id : [entity_id]
157-
var deviceChildrenEntities: [String: [String]] = [:]
147+
// Add entity directly to its area
148+
if let areaId = entity.areaId {
149+
areasAndEntitiesDict[areaId, default: []].insert(entityId)
150+
}
158151

159-
// Get entities from a device
160-
for areaAndDevices in areasAndDevicesDict {
161-
for deviceId in areaAndDevices.value {
162-
deviceChildrenEntities[deviceId] = entitiesAndAreas.filter { $0.deviceId == deviceId }
163-
.compactMap(\.entityId)
152+
// Build device->entities mapping for later
153+
if let deviceId = entity.deviceId {
154+
deviceChildrenEntities[deviceId, default: []].insert(entityId)
164155
}
165156
}
166157

167-
// Add device children entities to dictionary of areas and entities
168-
deviceChildrenEntities.forEach { deviceAndChildren in
169-
guard let areaOfDevice = areasAndDevicesDict.first(where: { areaAndDevices in
170-
areaAndDevices.value.contains(deviceAndChildren.key)
171-
})?.key else { return }
172-
173-
if var entityIds = areasAndEntitiesDict[areaOfDevice] {
174-
deviceAndChildren.value.forEach { entityIds.insert($0) }
175-
areasAndEntitiesDict[areaOfDevice] = entityIds
176-
} else {
177-
areasAndEntitiesDict[areaOfDevice] = Set(deviceAndChildren.value)
158+
// Add device children entities to their areas (using reverse lookup)
159+
for (deviceId, entityIds) in deviceChildrenEntities {
160+
if let areaId = deviceToAreaMap[deviceId] {
161+
areasAndEntitiesDict[areaId, default: []].formUnion(entityIds)
178162
}
179163
}
180164

Sources/Shared/Environment/AppDatabaseUpdater.swift

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol {
5656
}
5757
}
5858

59-
// Actor for thread-safe task management
59+
// Actor for thread-safe task management and queuing
6060
private actor TaskCoordinator {
6161
private var currentUpdateTasks: [String: Task<Void, Never>] = [:]
62+
private var updateQueue: [(serverId: String, task: () async -> Void)] = []
63+
private var isProcessingQueue = false
6264

6365
func getTask(for serverId: String) -> Task<Void, Never>? {
6466
currentUpdateTasks[serverId]
@@ -77,6 +79,40 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol {
7779
task.cancel()
7880
}
7981
currentUpdateTasks.removeAll()
82+
updateQueue.removeAll()
83+
isProcessingQueue = false
84+
}
85+
86+
/// Enqueues a server update task to be processed sequentially
87+
func enqueueUpdate(serverId: String, task: @escaping () async -> Void) {
88+
// Check if this server is already in the queue
89+
if updateQueue.contains(where: { $0.serverId == serverId }) {
90+
Current.Log.verbose("Update for server \(serverId) already queued, skipping duplicate")
91+
return
92+
}
93+
94+
updateQueue.append((serverId: serverId, task: task))
95+
96+
// Start processing if not already running
97+
if !isProcessingQueue {
98+
Task {
99+
await processQueue()
100+
}
101+
}
102+
}
103+
104+
/// Processes queued updates one at a time
105+
private func processQueue() async {
106+
guard !isProcessingQueue else { return }
107+
isProcessingQueue = true
108+
109+
while !updateQueue.isEmpty {
110+
let queuedUpdate = updateQueue.removeFirst()
111+
Current.Log.verbose("Processing queued update for server: \(queuedUpdate.serverId)")
112+
await queuedUpdate.task()
113+
}
114+
115+
isProcessingQueue = false
80116
}
81117
}
82118

@@ -119,7 +155,7 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol {
119155
/// Starts an update for a specific server in the background.
120156
/// This method returns immediately and does not block the caller.
121157
/// - Parameter server: The specific server to update.
122-
/// - Ensures only one update per server runs at a time. Different servers can update concurrently.
158+
/// - Server updates are queued and processed sequentially, one at a time.
123159
/// - Applies per-server throttling with exponential backoff on failures.
124160
func update(server: Server) {
125161
// Explicitly detach from the calling context to ensure we don't block the main thread
@@ -129,36 +165,34 @@ final class AppDatabaseUpdater: AppDatabaseUpdaterProtocol {
129165

130166
let serverId = server.identifier.rawValue
131167

132-
// Check if an update for this specific server is already running
133-
if let existingTask = await taskCoordinator.getTask(for: serverId) {
134-
Current.Log.verbose("Update already in progress for server \(server.info.name), awaiting existing task")
135-
await existingTask.value
136-
return
137-
}
168+
// Enqueue the update to be processed sequentially
169+
await taskCoordinator.enqueueUpdate(serverId: serverId) { [weak self] in
170+
guard let self else { return }
138171

139-
Current.Log.verbose("Updating database for server \(server.info.name)")
172+
Current.Log.verbose("Updating database for server \(server.info.name)")
140173

141-
// Show toast indicating update has started
142-
await showUpdateToast(for: server)
174+
// Show toast indicating update has started
175+
await showUpdateToast(for: server)
143176

144-
// Launch the server-specific update task
145-
let updateTask = Task { [weak self] in
146-
guard let self else { return }
147-
defer {
148-
// Hide toast and clean up task reference when complete
149-
Task {
150-
await self.hideUpdateToast(for: server)
151-
await self.taskCoordinator.removeTask(for: serverId)
177+
// Launch the server-specific update task
178+
let updateTask = Task { [weak self] in
179+
guard let self else { return }
180+
defer {
181+
// Hide toast and clean up task reference when complete
182+
Task {
183+
await self.hideUpdateToast(for: server)
184+
await self.taskCoordinator.removeTask(for: serverId)
185+
}
152186
}
153-
}
154187

155-
await performSingleServerUpdate(server: server)
156-
}
188+
await performSingleServerUpdate(server: server)
189+
}
157190

158-
// Store the task for this server
159-
await taskCoordinator.setTask(updateTask, for: serverId)
191+
// Store the task for this server
192+
await taskCoordinator.setTask(updateTask, for: serverId)
160193

161-
await updateTask.value
194+
await updateTask.value
195+
}
162196
}
163197
}
164198

0 commit comments

Comments
 (0)