@@ -7,11 +7,14 @@ open class Realtime : Service {
7
7
8
8
private let TYPE_ERROR = "error"
9
9
private let TYPE_EVENT = "event"
10
+ private let TYPE_PONG = "pong"
10
11
private let DEBOUNCE_NANOS = 1_000_000
12
+ private let HEARTBEAT_INTERVAL: UInt64 = 20_000_000_000 // 20 seconds in nanoseconds
11
13
12
14
private var socketClient: WebSocketClient? = nil
13
15
private var activeChannels = Set<String >()
14
16
private var activeSubscriptions = [Int: RealtimeCallback]()
17
+ private var heartbeatTask: Task<Void , Error >? = nil
15
18
16
19
let connectSync = DispatchQueue(label: "ConnectSync")
17
20
@@ -20,6 +23,33 @@ open class Realtime : Service {
20
23
private var subscriptionsCounter = 0
21
24
private var reconnect = true
22
25
26
+ private func startHeartbeat() {
27
+ stopHeartbeat()
28
+ heartbeatTask = Task {
29
+ do {
30
+ while !Task.isCancelled {
31
+ if let client = socketClient, client.isConnected {
32
+ let pingMessage = ["type": "ping"]
33
+ if let jsonData = try JSONSerialization.data(withJSONObject: pingMessage),
34
+ let jsonString = String(data: jsonData, encoding: .utf8) {
35
+ client.send(text: jsonString)
36
+ }
37
+ }
38
+ try await Task.sleep(nanoseconds: HEARTBEAT_INTERVAL)
39
+ }
40
+ } catch {
41
+ if !Task.isCancelled {
42
+ print("Heartbeat task failed: \(error.localizedDescription)")
43
+ }
44
+ }
45
+ }
46
+ }
47
+
48
+ private func stopHeartbeat() {
49
+ heartbeatTask?.cancel()
50
+ heartbeatTask = nil
51
+ }
52
+
23
53
private func createSocket() async throws {
24
54
guard activeChannels.count > 0 else {
25
55
reconnect = false
@@ -50,6 +80,8 @@ open class Realtime : Service {
50
80
}
51
81
52
82
private func closeSocket() async throws {
83
+ stopHeartbeat()
84
+
53
85
guard let client = socketClient,
54
86
let group = client.threadGroup else {
55
87
return
@@ -163,6 +195,7 @@ extension Realtime: WebSocketClientDelegate {
163
195
164
196
public func onOpen(channel: Channel) {
165
197
self.reconnectAttempts = 0
198
+ startHeartbeat()
166
199
}
167
200
168
201
public func onMessage(text: String) {
@@ -172,13 +205,16 @@ extension Realtime: WebSocketClientDelegate {
172
205
switch type {
173
206
case TYPE_ERROR: try! handleResponseError(from: json)
174
207
case TYPE_EVENT: handleResponseEvent(from: json)
208
+ case TYPE_PONG: break // Handle pong response if needed
175
209
default: break
176
210
}
177
211
}
178
212
}
179
213
}
180
214
181
215
public func onClose(channel: Channel, data: Data) async throws {
216
+ stopHeartbeat()
217
+
182
218
if (!reconnect) {
183
219
reconnect = true
184
220
return
@@ -196,6 +232,7 @@ extension Realtime: WebSocketClientDelegate {
196
232
}
197
233
198
234
public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
235
+ stopHeartbeat()
199
236
print(error?.localizedDescription ?? "Unknown error")
200
237
}
201
238
0 commit comments