diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt index 2f3aa373..442b0fed 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt @@ -2,15 +2,31 @@ package io.github.jan.supabase.realtime import io.github.jan.supabase.SupabaseSerializer import io.github.jan.supabase.annotations.SupabaseInternal -import io.github.jan.supabase.collections.AtomicMutableList import io.github.jan.supabase.serializer.KotlinXSerializer +import kotlinx.collections.immutable.PersistentList +import kotlinx.collections.immutable.PersistentMap +import kotlinx.collections.immutable.persistentHashMapOf +import kotlinx.collections.immutable.persistentListOf +import kotlinx.collections.immutable.plus import kotlinx.serialization.json.JsonObject import kotlin.concurrent.atomics.AtomicInt import kotlin.concurrent.atomics.AtomicReference import kotlin.concurrent.atomics.fetchAndIncrement +import kotlin.concurrent.atomics.update @SupabaseInternal -sealed interface CallbackManager { +sealed class RealtimeCallbackId(val value: Int) { + + class Postgres(value: Int) : RealtimeCallbackId(value) + + class Presence(value: Int) : RealtimeCallbackId(value) + + class Broadcast(value: Int) : RealtimeCallbackId(value) + +} + +@SupabaseInternal +interface CallbackManager { fun triggerPostgresChange(ids: List, data: PostgresAction) @@ -18,20 +34,24 @@ sealed interface CallbackManager { fun triggerPresenceDiff(joins: Map, leaves: Map) - fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int + fun hasPresenceCallback(): Boolean - fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int + fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast - fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int + fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres - fun removeCallbackById(id: Int) + fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence - fun setServerChanges(changes: List) + fun removeCallbackById(id: RealtimeCallbackId) - fun getCallbacks(): List> + fun setServerChanges(changes: List) } +private typealias BroadcastMap = PersistentMap> +private typealias PresenceMap = PersistentMap +private typealias PostgresMap = PersistentMap + internal class CallbackManagerImpl( private val serializer: SupabaseSerializer = KotlinXSerializer() ) : CallbackManager { @@ -39,51 +59,90 @@ internal class CallbackManagerImpl( private val nextId = AtomicInt(0) private val _serverChanges = AtomicReference(listOf()) val serverChanges: List get() = _serverChanges.load() - private val callbacks = AtomicMutableList>() - override fun getCallbacks(): List> { - return callbacks.toList() - } + private val presenceCallbacks = AtomicReference(persistentHashMapOf()) - override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int { + private val broadcastCallbacks = AtomicReference(persistentHashMapOf()) + // Additional map to know from which list a callback may be removed in broadcastCallbacks without searching through the whole map + private val broadcastEventId = AtomicReference>(persistentHashMapOf()) + + private val postgresCallbacks = AtomicReference(persistentHashMapOf()) + + override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast { val id = nextId.fetchAndIncrement() - callbacks += RealtimeCallback.BroadcastCallback(callback, event, id) - return id + broadcastCallbacks.update { + val current = it[event] ?: persistentListOf() + it.put(event, current + RealtimeCallback.BroadcastCallback(callback, event, id)) + } + broadcastEventId.update { + it.put(id, event) + } + return RealtimeCallbackId.Broadcast(id) } - override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int { + override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres { val id = nextId.fetchAndIncrement() - callbacks += RealtimeCallback.PostgresCallback(callback, filter, id) - return id + postgresCallbacks.update { + it.put(id, RealtimeCallback.PostgresCallback(callback, filter, id)) + } + return RealtimeCallbackId.Postgres(id) } override fun triggerPostgresChange(ids: List, data: PostgresAction) { val filter = serverChanges.filter { it.id in ids } - val postgresCallbacks = callbacks.filterIsInstance() val callbacks = - postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } } + postgresCallbacks.load().values.filter { cc -> filter.any { sc -> cc.filter == sc } } callbacks.forEach { it.callback(data) } } override fun triggerBroadcast(event: String, data: JsonObject) { - val broadcastCallbacks = callbacks.filterIsInstance() - val callbacks = broadcastCallbacks.filter { it.event == event } - callbacks.forEach { it.callback(data) } + broadcastCallbacks.load()[event]?.forEach { it.callback(data) } } override fun triggerPresenceDiff(joins: Map, leaves: Map) { - val presenceCallbacks = callbacks.filterIsInstance() - presenceCallbacks.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) } + presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) } } - override fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int { + override fun hasPresenceCallback(): Boolean { + return presenceCallbacks.load().isNotEmpty() + } + + override fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence { val id = nextId.fetchAndIncrement() - callbacks += RealtimeCallback.PresenceCallback(callback, id) - return id + presenceCallbacks.update { + it.put(id, RealtimeCallback.PresenceCallback(callback, id)) + } + return RealtimeCallbackId.Presence(id) + } + + fun removeBroadcastCallbackById(id: Int) { + val event = broadcastEventId.load()[id] ?: return + broadcastCallbacks.update { + it.put(event, it[event]?.removeAll { c -> c.id == id } ?: persistentListOf()) + } + broadcastEventId.update { + it.remove(id) + } + } + + fun removePresenceCallbackById(id: Int) { + presenceCallbacks.update { + it.remove(id) + } + } + + fun removePostgresCallbackById(id: Int) { + postgresCallbacks.update { + it.remove(id) + } } - override fun removeCallbackById(id: Int) { - callbacks.indexOfFirst { it.id == id }.takeIf { it != -1 }?.let { callbacks.removeAt(it) } + override fun removeCallbackById(id: RealtimeCallbackId) { + when (id) { + is RealtimeCallbackId.Broadcast -> removeBroadcastCallbackById(id.value) + is RealtimeCallbackId.Presence -> removePresenceCallbackById(id.value) + is RealtimeCallbackId.Postgres -> removePostgresCallbackById(id.value) + } } override fun setServerChanges(changes: List) { diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt index 5d49b469..2589bf0b 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt @@ -96,7 +96,8 @@ sealed interface PresenceAction { } -@PublishedApi internal class PresenceActionImpl( +@PublishedApi +internal class PresenceActionImpl( @PublishedApi internal val serializer: SupabaseSerializer, override val joins: Map, override val leaves: Map diff --git a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt index 7e2c8436..408224bc 100644 --- a/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt +++ b/Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt @@ -64,7 +64,7 @@ internal class RealtimeChannelImpl( Realtime.logger.d { "Subscribing to channel $topic" } val currentJwt = accessToken() val postgrestChanges = clientChanges.toList() - val hasPresenceCallback = callbackManager.getCallbacks().filterIsInstance().isNotEmpty() + val hasPresenceCallback = callbackManager.hasPresenceCallback() presenceJoinConfig.enabled = hasPresenceCallback val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate)) val joinConfigObject = buildJsonObject { diff --git a/Realtime/src/commonTest/kotlin/CallbackManagerTest.kt b/Realtime/src/commonTest/kotlin/CallbackManagerTest.kt index f85d6732..17a22578 100644 --- a/Realtime/src/commonTest/kotlin/CallbackManagerTest.kt +++ b/Realtime/src/commonTest/kotlin/CallbackManagerTest.kt @@ -4,7 +4,6 @@ import io.github.jan.supabase.realtime.HasRecord import io.github.jan.supabase.realtime.PostgresAction import io.github.jan.supabase.realtime.PostgresJoinConfig import io.github.jan.supabase.realtime.Presence -import io.github.jan.supabase.realtime.RealtimeCallback import io.github.jan.supabase.serializer.KotlinXSerializer import kotlinx.serialization.json.JsonObject import kotlinx.serialization.json.buildJsonObject @@ -37,18 +36,6 @@ class CallbackManagerTest { assertFalse { called } } - @Test - fun testGetCallbacks() { - val cm = CallbackManagerImpl() - val expectedEvent = "event" - cm.addBroadcastCallback(expectedEvent) { - //... - } - val callbacks = cm.getCallbacks() - assertTrue { callbacks.isNotEmpty() } - assertTrue { callbacks.any { it is RealtimeCallback.BroadcastCallback && it.event == expectedEvent } } - } - @Test fun testPresenceCallbacks() { val cm = CallbackManagerImpl() @@ -64,10 +51,12 @@ class CallbackManagerTest { assertEquals(expectedLeaves, it.leaves) called = true } + assertTrue { cm.hasPresenceCallback() } cm.triggerPresenceDiff(expectedJoins, expectedLeaves) assertTrue { called } cm.removeCallbackById(id) called = false + assertFalse { cm.hasPresenceCallback() } cm.triggerPresenceDiff(expectedJoins, expectedLeaves) assertFalse { called } } @@ -96,7 +85,7 @@ class CallbackManagerTest { called = true } cm.setServerChanges(listOf(joinConfig)) - cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord)) + cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord)) assertTrue { called } called = false if(event != "*") { @@ -104,7 +93,7 @@ class CallbackManagerTest { assertFalse { called } } cm.removeCallbackById(id) - cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord)) + cm.triggerPostgresChange(listOf(id.value), actionFromEvent(event, expectedRecord, expectedOldRecord)) assertFalse { called } } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 0c287ca3..b8af7114 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -1,5 +1,5 @@ [versions] -kotlin = "2.2.20-RC2" +kotlin = "2.2.20" accompanist-permissions = "0.37.3" ktor = "3.2.3" dokka = "2.0.0"