Skip to content

Commit 1b82458

Browse files
committed
add more channel resubscribing
1 parent c2aa734 commit 1b82458

File tree

5 files changed

+32
-2
lines changed

5 files changed

+32
-2
lines changed

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,16 @@ interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlugin {
127127
* @property disconnectOnNoSubscriptions Whether to disconnect from the websocket when there are no more subscriptions. Defaults to true
128128
* @property serializer A serializer used for serializing/deserializing objects e.g. in [PresenceAction.decodeJoinsAs] or [RealtimeChannel.broadcast]. Defaults to [KotlinXSerializer]
129129
* @property websocketFactory A custom websocket factory. If this is set, the [websocketConfig] will be ignored
130+
* @property rejoinDelay The interval between channel rejoin attempts
131+
* @property maxAttempts The maximum amount of connection attempts before giving up. Defaults to 5
130132
*/
131133
data class Config(
132134
var websocketConfig: WebSockets.Config.() -> Unit = {},
133135
var secure: Boolean? = null,
134136
var heartbeatInterval: Duration = 15.seconds,
135137
var reconnectDelay: Duration = 7.seconds,
138+
var rejoinDelay: Duration = 2.seconds,
139+
var maxAttempts: Int = 5,
136140
var disconnectOnSessionLoss: Boolean = true,
137141
var connectOnSubscribe: Boolean = true,
138142
@property:SupabaseInternal var websocketFactory: RealtimeWebsocketFactory? = null,

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannel.kt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,9 @@ interface RealtimeChannel {
110110
@SupabaseInternal
111111
fun updateStatus(status: Status)
112112

113+
@SupabaseInternal
114+
suspend fun scheduleRejoin()
115+
113116
/**
114117
* Represents the status of a channel
115118
*/

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import io.github.jan.supabase.realtime.event.RealtimeEvent
1111
import io.ktor.client.statement.bodyAsText
1212
import io.ktor.http.headers
1313
import kotlinx.coroutines.channels.awaitClose
14+
import kotlinx.coroutines.delay
1415
import kotlinx.coroutines.ensureActive
1516
import kotlinx.coroutines.flow.Flow
1617
import kotlinx.coroutines.flow.MutableStateFlow
@@ -25,6 +26,7 @@ import kotlinx.serialization.json.encodeToJsonElement
2526
import kotlinx.serialization.json.jsonObject
2627
import kotlinx.serialization.json.put
2728
import kotlinx.serialization.json.putJsonObject
29+
import kotlin.concurrent.atomics.AtomicInt
2830
import kotlin.concurrent.atomics.incrementAndFetch
2931
import kotlin.reflect.KClass
3032
import kotlin.reflect.KType
@@ -49,6 +51,8 @@ internal class RealtimeChannelImpl(
4951
private val subTopic = topic.replaceFirst(Regex("^${RealtimeTopic.PREFIX}:", RegexOption.IGNORE_CASE), "")
5052
private val httpClient = realtimeImpl.supabaseClient.httpClient
5153

54+
internal val joinAttempt = AtomicInt(0)
55+
5256
private suspend fun accessToken() = realtimeImpl.config.accessToken(supabaseClient) ?: realtimeImpl.accessToken
5357

5458
@OptIn(SupabaseInternal::class)
@@ -92,6 +96,12 @@ internal class RealtimeChannelImpl(
9296
event.handle(this, message)
9397
}
9498

99+
override suspend fun scheduleRejoin() {
100+
Realtime.logger.d { "Rejoining channel $topic in" }
101+
delay(realtime.config.rejoinDelay)
102+
resubscribe()
103+
}
104+
95105
override suspend fun unsubscribe() {
96106
_status.value = RealtimeChannel.Status.UNSUBSCRIBING
97107
Realtime.logger.d { "Unsubscribing from channel $topic" }
@@ -232,6 +242,9 @@ internal class RealtimeChannelImpl(
232242
}
233243

234244
override fun updateStatus(status: RealtimeChannel.Status) {
245+
if(this.status.value == RealtimeChannel.Status.SUBSCRIBED) {
246+
joinAttempt.store(0)
247+
}
235248
_status.value = status
236249
}
237250

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RErrorEvent.kt

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,25 @@ package io.github.jan.supabase.realtime.event
33
import io.github.jan.supabase.logging.e
44
import io.github.jan.supabase.realtime.Realtime
55
import io.github.jan.supabase.realtime.RealtimeChannel
6+
import io.github.jan.supabase.realtime.RealtimeChannelImpl
67
import io.github.jan.supabase.realtime.RealtimeMessage
8+
import kotlin.concurrent.atomics.incrementAndFetch
79

810
/**
911
* Event that handles an error event
1012
*/
1113
data object RErrorEvent : RealtimeEvent {
1214

1315
override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
14-
Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token" }
16+
channel as RealtimeChannelImpl
17+
val currentAttempt = channel.joinAttempt.incrementAndFetch()
18+
if(currentAttempt >= channel.realtime.config.maxAttempts) {
19+
Realtime.logger.e { "Failed to rejoin channel ${message.topic} after $currentAttempt attempts. Giving up." }
20+
channel.updateStatus(RealtimeChannel.Status.UNSUBSCRIBED)
21+
return
22+
}
23+
Realtime.logger.e { "Received an error in channel ${message.topic}. That could be as a result of an invalid access token. Trying to rejoin the channel..." }
24+
channel.scheduleRejoin()
1525
}
1626

1727
override fun appliesTo(message: RealtimeMessage): Boolean {

gradle.properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ org.jetbrains.compose.experimental.jscanvas.enabled=true
1111
org.jetbrains.compose.experimental.wasm.enabled=true
1212
org.jetbrains.dokka.experimental.gradle.pluginMode=V2Enabled
1313

14-
supabase-version = 3.2.4-dev
14+
supabase-version = 3.2.4
1515
base-group = io.github.jan-tennert.supabase

0 commit comments

Comments
 (0)