Skip to content

Commit 18159ec

Browse files
committed
Merge branch 'master' into feat-signin
2 parents cf40bb0 + abdba83 commit 18159ec

File tree

3 files changed

+80
-1
lines changed

3 files changed

+80
-1
lines changed

templates/android/library/src/main/java/io/package/services/Realtime.kt.twig

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
2929
private companion object {
3030
private const val TYPE_ERROR = "error"
3131
private const val TYPE_EVENT = "event"
32+
private const val TYPE_PONG = "pong"
33+
private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds
3234

3335
private const val DEBOUNCE_MILLIS = 1L
3436

@@ -40,6 +42,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
4042
private var reconnectAttempts = 0
4143
private var subscriptionsCounter = 0
4244
private var reconnect = true
45+
private var heartbeatJob: Job? = null
4346
}
4447

4548
private fun createSocket() {
@@ -80,9 +83,25 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
8083
}
8184

8285
private fun closeSocket() {
86+
stopHeartbeat()
8387
socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
8488
}
8589

90+
private fun startHeartbeat() {
91+
stopHeartbeat()
92+
heartbeatJob = launch {
93+
while (isActive) {
94+
delay(HEARTBEAT_INTERVAL)
95+
socket?.send("""{"type":"ping"}""")
96+
}
97+
}
98+
}
99+
100+
private fun stopHeartbeat() {
101+
heartbeatJob?.cancel()
102+
heartbeatJob = null
103+
}
104+
86105
private fun getTimeout() = when {
87106
reconnectAttempts < 5 -> 1000L
88107
reconnectAttempts < 15 -> 5000L
@@ -145,6 +164,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
145164
override fun onOpen(webSocket: WebSocket, response: Response) {
146165
super.onOpen(webSocket, response)
147166
reconnectAttempts = 0
167+
startHeartbeat()
148168
}
149169

150170
override fun onMessage(webSocket: WebSocket, text: String) {
@@ -181,6 +201,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
181201

182202
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
183203
super.onClosing(webSocket, code, reason)
204+
stopHeartbeat()
184205
if (!reconnect || code == RealtimeCode.POLICY_VIOLATION.value) {
185206
reconnect = true
186207
return
@@ -203,6 +224,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
203224

204225
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
205226
super.onFailure(webSocket, t, response)
227+
stopHeartbeat()
206228
t.printStackTrace()
207229
}
208230
}

templates/flutter/lib/src/realtime_mixin.dart.twig

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,33 @@ mixin RealtimeMixin {
2727
int _retries = 0;
2828
StreamSubscription? _websocketSubscription;
2929
bool _creatingSocket = false;
30+
Timer? _heartbeatTimer;
3031

3132
Future<dynamic> _closeConnection() async {
33+
_stopHeartbeat();
3234
await _websocketSubscription?.cancel();
3335
await _websok?.sink.close(status.normalClosure, 'Ending session');
3436
_lastUrl = null;
3537
_retries = 0;
3638
_reconnect = false;
3739
}
3840

41+
void _startHeartbeat() {
42+
_stopHeartbeat();
43+
_heartbeatTimer = Timer.periodic(Duration(seconds: 20), (_) {
44+
if (_websok != null) {
45+
_websok!.sink.add(jsonEncode({
46+
"type": "ping"
47+
}));
48+
}
49+
});
50+
}
51+
52+
void _stopHeartbeat() {
53+
_heartbeatTimer?.cancel();
54+
_heartbeatTimer = null;
55+
}
56+
3957
_createSocket() async {
4058
if(_creatingSocket || _channels.isEmpty) return;
4159
_creatingSocket = true;
@@ -78,6 +96,10 @@ mixin RealtimeMixin {
7896
}));
7997
}
8098
}
99+
_startHeartbeat(); // Start heartbeat after successful connection
100+
break;
101+
case 'pong':
102+
debugPrint('Received heartbeat response from realtime server');
81103
break;
82104
case 'event':
83105
final message = RealtimeMessage.fromMap(data.data);
@@ -91,8 +113,10 @@ mixin RealtimeMixin {
91113
break;
92114
}
93115
}, onDone: () {
116+
_stopHeartbeat();
94117
_retry();
95118
}, onError: (err, stack) {
119+
_stopHeartbeat();
96120
for (var subscription in _subscriptions.values) {
97121
subscription.controller.addError(err, stack);
98122
}
@@ -187,4 +211,4 @@ mixin RealtimeMixin {
187211
_retry();
188212
}
189213
}
190-
}
214+
}

templates/swift/Sources/Services/Realtime.swift.twig

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,14 @@ open class Realtime : Service {
77

88
private let TYPE_ERROR = "error"
99
private let TYPE_EVENT = "event"
10+
private let TYPE_PONG = "pong"
1011
private let DEBOUNCE_NANOS = 1_000_000
12+
private let HEARTBEAT_INTERVAL: UInt64 = 20_000_000_000 // 20 seconds in nanoseconds
1113

1214
private var socketClient: WebSocketClient? = nil
1315
private var activeChannels = Set<String>()
1416
private var activeSubscriptions = [Int: RealtimeCallback]()
17+
private var heartbeatTask: Task<Void, Swift.Error>? = nil
1518

1619
let connectSync = DispatchQueue(label: "ConnectSync")
1720

@@ -20,6 +23,29 @@ open class Realtime : Service {
2023
private var subscriptionsCounter = 0
2124
private var reconnect = true
2225

26+
private func startHeartbeat() {
27+
stopHeartbeat()
28+
heartbeatTask = Task {
29+
do {
30+
while !Task.isCancelled {
31+
if let client = socketClient, client.isConnected {
32+
client.send(text: #"{"type": "ping"}"#)
33+
}
34+
try await Task.sleep(nanoseconds: HEARTBEAT_INTERVAL)
35+
}
36+
} catch {
37+
if !Task.isCancelled {
38+
print("Heartbeat task failed: \(error.localizedDescription)")
39+
}
40+
}
41+
}
42+
}
43+
44+
private func stopHeartbeat() {
45+
heartbeatTask?.cancel()
46+
heartbeatTask = nil
47+
}
48+
2349
private func createSocket() async throws {
2450
guard activeChannels.count > 0 else {
2551
reconnect = false
@@ -50,6 +76,8 @@ open class Realtime : Service {
5076
}
5177

5278
private func closeSocket() async throws {
79+
stopHeartbeat()
80+
5381
guard let client = socketClient,
5482
let group = client.threadGroup else {
5583
return
@@ -163,6 +191,7 @@ extension Realtime: WebSocketClientDelegate {
163191

164192
public func onOpen(channel: Channel) {
165193
self.reconnectAttempts = 0
194+
startHeartbeat()
166195
}
167196

168197
public func onMessage(text: String) {
@@ -172,13 +201,16 @@ extension Realtime: WebSocketClientDelegate {
172201
switch type {
173202
case TYPE_ERROR: try! handleResponseError(from: json)
174203
case TYPE_EVENT: handleResponseEvent(from: json)
204+
case TYPE_PONG: break // Handle pong response if needed
175205
default: break
176206
}
177207
}
178208
}
179209
}
180210

181211
public func onClose(channel: Channel, data: Data) async throws {
212+
stopHeartbeat()
213+
182214
if (!reconnect) {
183215
reconnect = true
184216
return
@@ -196,6 +228,7 @@ extension Realtime: WebSocketClientDelegate {
196228
}
197229

198230
public func onError(error: Swift.Error?, status: HTTPResponseStatus?) {
231+
stopHeartbeat()
199232
print(error?.localizedDescription ?? "Unknown error")
200233
}
201234

0 commit comments

Comments
 (0)