-
Notifications
You must be signed in to change notification settings - Fork 162
Prepare connection / region pinning #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
356fefc
eba6899
652b87f
14c0ffe
c7901f9
6990f8f
69cff3e
570da75
1785583
0799efe
812a294
9761f24
9f9e42b
962c703
59faa1f
d886ed6
4bb928f
02a6f47
16a4a44
216adfa
0af7d12
f887ec2
d68c749
3b50244
4627d15
eba7b15
efa03aa
d7d91c6
aba6f99
9584047
2a7f4c3
e04ac1b
c7c805e
257fb2f
068f3b7
43fe2a2
5e1c2ba
9ef5de9
9aca9ff
5842d74
841f4dc
6d678e5
27d7245
1ca5af7
5b12bc4
29d3698
452fbd1
2d982dd
c3f2656
34e90d7
70019b0
bde7edb
5a77578
4a72f78
c849c94
fc92c12
b888cfb
dec978c
57ffd39
51e23f8
eb2d882
8536dd7
128585c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| patch type="added" "Prepare connection & region pinning" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,9 @@ | ||
| excluded: | ||
| - .build | ||
| - .build-test | ||
| - .cache | ||
| - .swiftpm | ||
| - build | ||
| - Sources/LiveKit/Protos | ||
|
|
||
| disabled_rules: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,202 @@ | ||
| /* | ||
| * Copyright 2026 LiveKit | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| import Foundation | ||
|
|
||
| // MARK: - RegionManager | ||
|
|
||
| actor RegionManager: Loggable { | ||
| struct State: Sendable { | ||
| var lastRequested: Date? | ||
| var all: [RegionInfo] = [] | ||
| var remaining: [RegionInfo] = [] | ||
| } | ||
|
|
||
| static let cacheInterval: TimeInterval = 30 | ||
|
|
||
| nonisolated let providedUrl: URL | ||
| private var state = State() | ||
| private var settingsFetchTask: Task<[RegionInfo], Error>? | ||
| private var settingsFetchTaskId: UUID? | ||
|
|
||
| init(providedUrl: URL) { | ||
| self.providedUrl = providedUrl | ||
| } | ||
|
|
||
| func cancel() { | ||
| settingsFetchTask?.cancel() | ||
| settingsFetchTask = nil | ||
| settingsFetchTaskId = nil | ||
| } | ||
|
|
||
| func resetAttempts(onlyIfExhausted: Bool = false) { | ||
| if onlyIfExhausted { | ||
| guard state.remaining.isEmpty else { return } | ||
| } | ||
| guard !state.all.isEmpty else { return } | ||
| state.remaining = state.all | ||
| } | ||
|
|
||
| func resetAll() { | ||
| state = State() | ||
| } | ||
|
|
||
| func markFailed(region: RegionInfo) { | ||
| state.remaining.removeAll { $0 == region } | ||
| } | ||
|
|
||
| func shouldRequestSettings() -> Bool { | ||
| guard providedUrl.isCloud else { return false } | ||
| guard let lastRequested = state.lastRequested else { return true } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Just to double check, is it intentional to return true when lastRequested does not exist ?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, intentional. If lastRequested is nil, no settings have been fetched yet, so we should request them. This matches JS/Android behavior. |
||
| return Date().timeIntervalSince(lastRequested) > Self.cacheInterval | ||
| } | ||
|
|
||
| func prepareSettingsFetch(token: String) { | ||
| guard shouldRequestSettings() else { return } | ||
| _ = startSettingsFetchIfNeeded(token: token) | ||
| } | ||
|
|
||
| func resolveBest(token: String) async throws -> RegionInfo { | ||
| try await requestSettingsIfNeeded(token: token) | ||
| guard let selected = state.remaining.first else { | ||
| throw LiveKitError(.regionManager, message: "No more remaining regions.") | ||
| } | ||
|
|
||
| log("[Region] Resolved region: \(String(describing: selected))", .debug) | ||
| return selected | ||
| } | ||
|
|
||
| func updateFromServerReportedRegions(_ regions: Livekit_RegionSettings) { | ||
| guard providedUrl.isCloud else { return } | ||
|
|
||
| let allRegions = regions.regions.compactMap { $0.toLKType() } | ||
| guard !allRegions.isEmpty else { return } | ||
|
|
||
| // Keep previously failed regions excluded when updating the list. | ||
| let allIds = Set(state.all.map(\.regionId)) | ||
| let remainingIds = Set(state.remaining.map(\.regionId)) | ||
| let failedRegionIds = allIds.subtracting(remainingIds) | ||
|
|
||
| let remainingRegions = allRegions.filter { !failedRegionIds.contains($0.regionId) } | ||
| log("[Region] Updating regions from server-reported settings (\(allRegions.count)), remaining: \(remainingRegions.count)", .info) | ||
|
|
||
| state.all = allRegions | ||
| state.remaining = remainingRegions | ||
| state.lastRequested = Date() | ||
| } | ||
|
|
||
| // MARK: - Testing | ||
|
|
||
| func snapshot() -> State { state } | ||
|
|
||
| func setStateForTesting(_ state: State) { | ||
| self.state = state | ||
| } | ||
|
|
||
| // MARK: - Private | ||
|
|
||
| private func startSettingsFetchIfNeeded(token: String) -> Task<[RegionInfo], Error> { | ||
| if let task = settingsFetchTask { return task } | ||
|
|
||
| let taskId = UUID() | ||
| settingsFetchTaskId = taskId | ||
|
|
||
| let task = Task { [providedUrl, token, taskId] in | ||
| defer { clearSettingsFetchTask(matching: taskId) } | ||
| do { | ||
| let data = try await Self.fetchRegionSettings(providedUrl: providedUrl, token: token) | ||
| let allRegions = try Self.parseRegionSettings(data: data) | ||
| try Task.checkCancellation() | ||
| applyFetchedRegions(allRegions) | ||
hiroshihorie marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| return allRegions | ||
| } catch { | ||
| log("[Region] Failed to fetch region settings: \(error)", .error) | ||
| throw error | ||
| } | ||
| } | ||
|
|
||
| settingsFetchTask = task | ||
| return task | ||
| } | ||
|
|
||
| private func requestSettingsIfNeeded(token: String) async throws { | ||
| guard providedUrl.isCloud else { | ||
| throw LiveKitError(.onlyForCloud) | ||
| } | ||
|
|
||
| guard shouldRequestSettings() else { return } | ||
| let task = startSettingsFetchIfNeeded(token: token) | ||
| _ = try await task.value | ||
| } | ||
|
|
||
| private func applyFetchedRegions(_ allRegions: [RegionInfo]) { | ||
| log("[Region] all regions: \(String(describing: allRegions))", .debug) | ||
| state.all = allRegions | ||
| state.remaining = allRegions | ||
| state.lastRequested = Date() | ||
| } | ||
|
|
||
| private func clearSettingsFetchTask(matching taskId: UUID) { | ||
| guard settingsFetchTaskId == taskId else { return } | ||
| settingsFetchTaskId = nil | ||
| settingsFetchTask = nil | ||
| } | ||
|
|
||
| // MARK: - Static helpers (non-isolated) | ||
|
|
||
| private nonisolated static func fetchRegionSettings(providedUrl: URL, token: String) async throws -> Data { | ||
| var request = URLRequest(url: providedUrl.regionSettingsUrl(), | ||
| cachePolicy: .reloadIgnoringLocalAndRemoteCacheData) | ||
| request.addValue("Bearer \(token)", forHTTPHeaderField: "authorization") | ||
|
|
||
| let (data, response) = try await URLSession.shared.data(for: request) | ||
| guard let httpResponse = response as? HTTPURLResponse else { | ||
| throw LiveKitError(.regionManager, message: "Failed to fetch region settings") | ||
| } | ||
|
|
||
| let statusCode = httpResponse.statusCode | ||
| guard (200 ..< 300).contains(statusCode) else { | ||
| let rawBody = String(data: data, encoding: .utf8)? | ||
| .trimmingCharacters(in: .whitespacesAndNewlines) | ||
| let body = if let rawBody, !rawBody.isEmpty { | ||
| rawBody.count > 1024 ? String(rawBody.prefix(1024)) + "..." : rawBody | ||
| } else { | ||
| "(No server message)" | ||
| } | ||
|
|
||
| if (400 ..< 500).contains(statusCode) { | ||
| throw LiveKitError(.validation, message: "Region settings error: HTTP \(statusCode): \(body)") | ||
| } | ||
|
|
||
| throw LiveKitError(.regionManager, message: "Failed to fetch region settings: HTTP \(statusCode): \(body)") | ||
| } | ||
|
|
||
| return data | ||
| } | ||
|
|
||
| private nonisolated static func parseRegionSettings(data: Data) throws -> [RegionInfo] { | ||
| do { | ||
| let regionSettings = try Livekit_RegionSettings(jsonUTF8Data: data) | ||
| let allRegions = regionSettings.regions.compactMap { $0.toLKType() } | ||
| guard !allRegions.isEmpty else { | ||
| throw LiveKitError(.regionManager, message: "Fetched region data is empty.") | ||
| } | ||
| return allRegions | ||
| } catch { | ||
| throw LiveKitError(.regionManager, message: "Failed to parse region settings with error: \(error)") | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should you also update the lastRequested if settingsFetchTask is being cancelled ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Intentionally not updating. Cancellation means we didn't get valid data: didn't complete the fetch. So we shouldn't block retry for 30 seconds. The next connect() should be allowed to fetch fresh region settings. 🤔