Skip to content

Commit 46b70aa

Browse files
authored
Merge pull request #996 from supabase-community/presence-enabled
Add presence enabled flag
2 parents 1be96c8 + 36d4ff6 commit 46b70aa

File tree

12 files changed

+114
-9
lines changed

12 files changed

+114
-9
lines changed

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ sealed interface CallbackManager {
2626

2727
fun setServerChanges(changes: List<PostgresJoinConfig>)
2828

29+
fun getCallbacks(): List<RealtimeCallback<*>>
30+
2931
}
3032

3133
internal class CallbackManagerImpl(
@@ -37,6 +39,10 @@ internal class CallbackManagerImpl(
3739
val serverChanges: List<PostgresJoinConfig> get() = _serverChanges
3840
private val callbacks = AtomicMutableList<RealtimeCallback<*>>()
3941

42+
override fun getCallbacks(): List<RealtimeCallback<*>> {
43+
return callbacks.toList()
44+
}
45+
4046
override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Long {
4147
val id = nextId++
4248
callbacks += RealtimeCallback.BroadcastCallback(callback, event, id)

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/Realtime.kt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,12 @@ interface Realtime : MainPlugin<Realtime.Config>, CustomSerializationPlugin {
7474
*/
7575
suspend fun removeChannel(channel: RealtimeChannel)
7676

77+
/**
78+
* Adds a channel to the [subscriptions] without subscribing to it.
79+
*/
80+
@SupabaseInternal
81+
fun addChannel(channel: RealtimeChannel)
82+
7783
/**
7884
* Unsubscribes and removes all channels from the [subscriptions]
7985
*/

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelBuilder.kt

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ import io.github.jan.supabase.realtime.annotations.ChannelDsl
77
* Used to build a realtime channel
88
*/
99
@ChannelDsl
10-
class RealtimeChannelBuilder @PublishedApi internal constructor(private val topic: String) {
10+
class RealtimeChannelBuilder @PublishedApi internal constructor(
11+
private val topic: String,
12+
) {
1113

1214
private var broadcastJoinConfig = BroadcastJoinConfig(acknowledgeBroadcasts = false, receiveOwnBroadcasts = false)
13-
private var presenceJoinConfig = PresenceJoinConfig("")
15+
private var presenceJoinConfig = PresenceJoinConfig("", false)
1416

1517
/**
1618
* Whether this channel should be private.
@@ -29,7 +31,7 @@ class RealtimeChannelBuilder @PublishedApi internal constructor(private val topi
2931
* @param block The presence join config
3032
*/
3133
fun presence(block: PresenceJoinConfig.() -> Unit) {
32-
presenceJoinConfig = PresenceJoinConfig("").apply(block)
34+
presenceJoinConfig = PresenceJoinConfig("", false).apply(block)
3335
}
3436

3537
@SupabaseInternal

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import kotlinx.coroutines.flow.MutableStateFlow
1717
import kotlinx.coroutines.flow.asStateFlow
1818
import kotlinx.coroutines.flow.callbackFlow
1919
import kotlinx.coroutines.flow.first
20+
import kotlinx.coroutines.launch
2021
import kotlinx.serialization.json.Json
2122
import kotlinx.serialization.json.JsonObject
2223
import kotlinx.serialization.json.buildJsonObject
@@ -55,10 +56,15 @@ internal class RealtimeChannelImpl(
5556
if(!realtimeImpl.config.connectOnSubscribe) error("You can't subscribe to a channel while the realtime client is not connected. Did you forget to call `realtime.connect()`?")
5657
realtimeImpl.connect()
5758
}
59+
if(!realtimeImpl.subscriptions.containsKey(topic)) {
60+
realtime.addChannel(this)
61+
}
5862
_status.value = RealtimeChannel.Status.SUBSCRIBING
5963
Realtime.logger.d { "Subscribing to channel $topic" }
6064
val currentJwt = accessToken()
6165
val postgrestChanges = clientChanges.toList()
66+
val hasPresenceCallback = callbackManager.getCallbacks().filterIsInstance<RealtimeCallback.PresenceCallback>().isNotEmpty()
67+
presenceJoinConfig.enabled = hasPresenceCallback
6268
val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate))
6369
val joinConfigObject = buildJsonObject {
6470
putJsonObject(Json.encodeToJsonElement(joinConfig).jsonObject)
@@ -214,12 +220,25 @@ internal class RealtimeChannelImpl(
214220
trySend(action)
215221
}
216222
val id = callbackManager.addPresenceCallback(callback)
223+
if(status.value == RealtimeChannel.Status.SUBSCRIBED && !presenceJoinConfig.enabled) {
224+
// If the channel is already subscribed, we need to resubscribe to enable presence
225+
Realtime.logger.d { "Resubscribing to channel $topic to enable presence..." }
226+
launch {
227+
resubscribe()
228+
}
229+
}
217230
awaitClose { callbackManager.removeCallbackById(id) }
218231
}
219232

220233
override fun updateStatus(status: RealtimeChannel.Status) {
221234
_status.value = status
222235
}
223236

237+
private suspend fun resubscribe() {
238+
unsubscribe()
239+
_status.first { it == RealtimeChannel.Status.UNSUBSCRIBED }
240+
subscribe()
241+
}
242+
224243
}
225244

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeImpl.kt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ import kotlin.time.Clock
9393
}
9494
}
9595

96+
override fun addChannel(channel: RealtimeChannel) {
97+
_subscriptions[channel.topic] = channel
98+
}
99+
96100
override fun init() {
97101
scope.launch {
98102
supabaseClient.pluginManager.getPluginOrNull(Auth)?.sessionStatus?.collect {

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeJoinPayload.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,10 @@ data class BroadcastJoinConfig(@SerialName("ack") var acknowledgeBroadcasts: Boo
3030

3131
/**
3232
* @param key Used to track presence payloads. Can be e.g. a user id
33+
* @param enabled Whether presence is enabled for this channel
3334
*/
3435
@Serializable
35-
data class PresenceJoinConfig(var key: String)
36+
data class PresenceJoinConfig(var key: String, internal var enabled: Boolean)
3637

3738
@SupabaseInternal
3839
@Serializable

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/event/RCloseEvent.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ data object RCloseEvent : RealtimeEvent {
1313
override suspend fun handle(channel: RealtimeChannel, message: RealtimeMessage) {
1414
channel.realtime.removeChannel(channel)
1515
Realtime.logger.d { "Unsubscribed from channel ${message.topic}" }
16+
channel.updateStatus(RealtimeChannel.Status.UNSUBSCRIBED)
1617
}
1718

1819
override fun appliesTo(message: RealtimeMessage): Boolean {

Realtime/src/commonTest/kotlin/CallbackManagerTest.kt

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import io.github.jan.supabase.realtime.HasRecord
44
import io.github.jan.supabase.realtime.PostgresAction
55
import io.github.jan.supabase.realtime.PostgresJoinConfig
66
import io.github.jan.supabase.realtime.Presence
7+
import io.github.jan.supabase.realtime.RealtimeCallback
78
import io.github.jan.supabase.serializer.KotlinXSerializer
89
import kotlinx.serialization.json.JsonObject
910
import kotlinx.serialization.json.buildJsonObject
@@ -36,6 +37,18 @@ class CallbackManagerTest {
3637
assertFalse { called }
3738
}
3839

40+
@Test
41+
fun testGetCallbacks() {
42+
val cm = CallbackManagerImpl()
43+
val expectedEvent = "event"
44+
cm.addBroadcastCallback(expectedEvent) {
45+
//...
46+
}
47+
val callbacks = cm.getCallbacks()
48+
assertTrue { callbacks.isNotEmpty() }
49+
assertTrue { callbacks.any { it is RealtimeCallback.BroadcastCallback && it.event == expectedEvent } }
50+
}
51+
3952
@Test
4053
fun testPresenceCallbacks() {
4154
val cm = CallbackManagerImpl()

Realtime/src/commonTest/kotlin/RealtimeChannelTest.kt

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import io.ktor.client.engine.mock.respond
2525
import io.ktor.util.encodeBase64
2626
import kotlinx.coroutines.coroutineScope
2727
import kotlinx.coroutines.flow.Flow
28+
import kotlinx.coroutines.flow.launchIn
2829
import kotlinx.coroutines.launch
2930
import kotlinx.coroutines.test.runTest
3031
import kotlinx.serialization.json.Json
@@ -304,7 +305,7 @@ class RealtimeChannelTest {
304305
runTest {
305306
createTestClient(
306307
wsHandler = { i, o ->
307-
handleSubscribe(i, o, channelId)
308+
handleSubscribe(i, o, channelId, true)
308309
for(i in 0..amount) {
309310
o.sendPresence(
310311
channelId,
@@ -383,6 +384,38 @@ class RealtimeChannelTest {
383384
}
384385
}
385386

387+
@Test
388+
fun testResubscribeOnPresenceChange() {
389+
val channelId = "channelId"
390+
runTest {
391+
createTestClient(
392+
wsHandler = { i, o ->
393+
handleSubscribe(i, o, channelId, false)
394+
handleUnsubscribe(i, o, channelId)
395+
handleSubscribe(i, o, channelId, true)
396+
},
397+
realtimeConfig = {
398+
disconnectOnNoSubscriptions = false
399+
},
400+
supabaseHandler = {
401+
val channel = it.channel(channelId)
402+
channel.status.test {
403+
assertEquals(RealtimeChannel.Status.UNSUBSCRIBED, awaitItem())
404+
channel.subscribe(false)
405+
assertEquals(RealtimeChannel.Status.SUBSCRIBING, awaitItem())
406+
assertEquals(RealtimeChannel.Status.SUBSCRIBED, awaitItem())
407+
val job = channel.presenceChangeFlow().launchIn(this@runTest)
408+
assertEquals(RealtimeChannel.Status.UNSUBSCRIBING, awaitItem())
409+
assertEquals(RealtimeChannel.Status.UNSUBSCRIBED, awaitItem())
410+
assertEquals(RealtimeChannel.Status.SUBSCRIBING, awaitItem())
411+
assertEquals(RealtimeChannel.Status.SUBSCRIBED, awaitItem())
412+
job.cancel()
413+
}
414+
}
415+
)
416+
}
417+
}
418+
386419
//For more complex tests we need integration tests
387420

388421
private fun RealtimeChannel.flowFromEventType(event: String, schema: String, table: String, filter: FilterOperation): Flow<PostgresAction> {

Realtime/src/commonTest/kotlin/RealtimeExtTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ class RealtimeExtTest {
4444
)
4545
createTestClient(
4646
wsHandler = { i, o ->
47-
handleSubscribe(i, o, "channelId")
47+
handleSubscribe(i, o, "channelId", true)
4848
},
4949
supabaseHandler = {
5050
val channel = it.channel("channelId")

0 commit comments

Comments
 (0)