Skip to content

Commit 0875296

Browse files
committed
refactor: realtime with actor
1 parent d35111b commit 0875296

File tree

3 files changed

+300
-81
lines changed

3 files changed

+300
-81
lines changed
Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
//
2+
// ConnectionManager.swift
3+
// Supabase
4+
//
5+
// Created by Guilherme Souza on 19/11/25.
6+
//
7+
8+
import Foundation
9+
10+
actor ConnectionManager {
11+
enum State {
12+
case disconnected
13+
case connecting(Task<Void, any Error>)
14+
case connected(any WebSocket)
15+
case reconnecting(Task<Void, any Error>, reason: String)
16+
17+
var isConnecting: Bool {
18+
switch self {
19+
case .connecting:
20+
return true
21+
default:
22+
return false
23+
}
24+
}
25+
}
26+
27+
private let (stateStream, stateContinuation) = AsyncStream<State>.makeStream()
28+
private(set) var state: State = .disconnected
29+
30+
private let transport: WebSocketTransport
31+
private let url: URL
32+
private let headers: [String: String]
33+
private let reconnectDelay: TimeInterval
34+
private let logger: (any SupabaseLogger)?
35+
36+
/// Get current connection if connected, nil otherwise.
37+
var connection: (any WebSocket)? {
38+
if case .connected(let conn) = state {
39+
return conn
40+
}
41+
return nil
42+
}
43+
44+
var stateChanges: AsyncStream<State> { stateStream }
45+
46+
/// Check if currently connected.
47+
var isConnected: Bool {
48+
if case .connected = state {
49+
return true
50+
}
51+
return false
52+
}
53+
54+
init(
55+
transport: @escaping WebSocketTransport,
56+
url: URL,
57+
headers: [String: String],
58+
reconnectDelay: TimeInterval,
59+
logger: (any SupabaseLogger)?
60+
) {
61+
self.transport = transport
62+
self.url = url
63+
self.headers = headers
64+
self.reconnectDelay = reconnectDelay
65+
self.logger = logger
66+
}
67+
68+
func connect() async throws {
69+
logger?.debug("current state: \(state)")
70+
71+
switch state {
72+
case .connected:
73+
logger?.debug("Already connected")
74+
75+
case .connecting(let task):
76+
logger?.debug("Connection already in progress, waiting...")
77+
try await task.value
78+
79+
case .disconnected:
80+
logger?.debug("Initiating new connection")
81+
try await performConnection()
82+
83+
case .reconnecting(let task, _):
84+
logger?.debug("Reconnection in progress, waiting...")
85+
try await task.value
86+
}
87+
}
88+
89+
func disconnect(reason: String? = nil) {
90+
logger?.debug("current state: \(state)")
91+
92+
switch state {
93+
case .connected(let conn):
94+
logger?.debug("Disconnecting from WebSocket: \(reason ?? "no reason")")
95+
conn.close(code: nil, reason: reason)
96+
updateState(.disconnected)
97+
98+
case .connecting(let task), .reconnecting(let task, _):
99+
logger?.debug("Cancelling connection attempt: \(reason ?? "no reason")")
100+
task.cancel()
101+
updateState(.disconnected)
102+
103+
case .disconnected:
104+
logger?.debug("Already disconnected")
105+
}
106+
}
107+
108+
/// Handle connection error and initiate reconnect.
109+
///
110+
/// - Parameter error: The error that caused the connection failure
111+
func handleError(_ error: any Error) {
112+
guard case .connected = state else {
113+
logger?.debug("Ignoring error in non-connected state: \(error)")
114+
return
115+
}
116+
117+
logger?.debug("Connection error, initiating reconnect: \(error.localizedDescription)")
118+
initiateReconnect(reason: "error: \(error.localizedDescription)")
119+
}
120+
121+
/// Handle connection close.
122+
///
123+
/// - Parameters:
124+
/// - code: WebSocket close code
125+
/// - reason: WebSocket close reason
126+
func handleClose(code: Int?, reason: String?) {
127+
let closeReason = "code: \(code?.description ?? "none"), reason: \(reason ?? "none")"
128+
logger?.debug("Connection closed: \(closeReason)")
129+
130+
disconnect(reason: reason)
131+
}
132+
133+
private func performConnection() async throws {
134+
let connectionTask = Task {
135+
let conn = try await transport(url, headers)
136+
try Task.checkCancellation()
137+
updateState(.connected(conn))
138+
}
139+
140+
updateState(.connecting(connectionTask))
141+
142+
do {
143+
return try await connectionTask.value
144+
} catch {
145+
updateState(.disconnected)
146+
throw error
147+
}
148+
}
149+
150+
private func initiateReconnect(reason: String) {
151+
let reconnectTask = Task {
152+
try await Task.sleep(nanoseconds: UInt64(reconnectDelay * 1_000_000_000))
153+
logger?.debug("Attempting to reconnect...")
154+
try await performConnection()
155+
}
156+
157+
updateState(.reconnecting(reconnectTask, reason: reason))
158+
}
159+
160+
private func updateState(_ state: State) {
161+
self.state = state
162+
self.stateContinuation.yield(state)
163+
}
164+
}

0 commit comments

Comments
 (0)