@@ -29,6 +29,8 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
29
29
private companion object {
30
30
private const val TYPE_ERROR = "error"
31
31
private const val TYPE_EVENT = "event"
32
+ private const val TYPE_PONG = "pong"
33
+ private const val HEARTBEAT_INTERVAL = 20_000L // 20 seconds
32
34
33
35
private const val DEBOUNCE_MILLIS = 1L
34
36
@@ -40,6 +42,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
40
42
private var reconnectAttempts = 0
41
43
private var subscriptionsCounter = 0
42
44
private var reconnect = true
45
+ private var heartbeatJob: Job? = null
43
46
}
44
47
45
48
private fun createSocket() {
@@ -80,9 +83,25 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
80
83
}
81
84
82
85
private fun closeSocket() {
86
+ stopHeartbeat()
83
87
socket?.close(RealtimeCode.POLICY_VIOLATION.value, null)
84
88
}
85
89
90
+ private fun startHeartbeat() {
91
+ stopHeartbeat()
92
+ heartbeatJob = launch {
93
+ while (isActive) {
94
+ delay(HEARTBEAT_INTERVAL)
95
+ socket?.send("""{"type":"ping"}""")
96
+ }
97
+ }
98
+ }
99
+
100
+ private fun stopHeartbeat() {
101
+ heartbeatJob?.cancel()
102
+ heartbeatJob = null
103
+ }
104
+
86
105
private fun getTimeout() = when {
87
106
reconnectAttempts < 5 -> 1000L
88
107
reconnectAttempts < 15 -> 5000L
@@ -145,6 +164,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
145
164
override fun onOpen(webSocket: WebSocket, response: Response) {
146
165
super.onOpen(webSocket, response)
147
166
reconnectAttempts = 0
167
+ startHeartbeat()
148
168
}
149
169
150
170
override fun onMessage(webSocket: WebSocket, text: String) {
@@ -181,6 +201,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
181
201
182
202
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
183
203
super.onClosing(webSocket, code, reason)
204
+ stopHeartbeat()
184
205
if (!reconnect || code == RealtimeCode.POLICY_VIOLATION.value) {
185
206
reconnect = true
186
207
return
@@ -203,6 +224,7 @@ class Realtime(client: Client) : Service(client), CoroutineScope {
203
224
204
225
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
205
226
super.onFailure(webSocket, t, response)
227
+ stopHeartbeat()
206
228
t.printStackTrace()
207
229
}
208
230
}
0 commit comments