Skip to content

Commit 1906deb

Browse files
committed
Remove AtomicFu, split callbacks in the CallbackManager
1 parent 7fa9362 commit 1906deb

File tree

5 files changed

+96
-49
lines changed

5 files changed

+96
-49
lines changed

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/CallbackManager.kt

Lines changed: 88 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -2,88 +2,145 @@ package io.github.jan.supabase.realtime
22

33
import io.github.jan.supabase.SupabaseSerializer
44
import io.github.jan.supabase.annotations.SupabaseInternal
5-
import io.github.jan.supabase.collections.AtomicMutableList
65
import io.github.jan.supabase.serializer.KotlinXSerializer
6+
import kotlinx.collections.immutable.PersistentList
7+
import kotlinx.collections.immutable.PersistentMap
8+
import kotlinx.collections.immutable.persistentHashMapOf
9+
import kotlinx.collections.immutable.persistentListOf
10+
import kotlinx.collections.immutable.plus
711
import kotlinx.serialization.json.JsonObject
812
import kotlin.concurrent.atomics.AtomicInt
913
import kotlin.concurrent.atomics.AtomicReference
1014
import kotlin.concurrent.atomics.fetchAndIncrement
15+
import kotlin.concurrent.atomics.update
1116

1217
@SupabaseInternal
13-
sealed interface CallbackManager {
18+
sealed class RealtimeCallbackId(val id: Int) {
19+
20+
class Postgres(id: Int) : RealtimeCallbackId(id)
21+
22+
class Presence(id: Int) : RealtimeCallbackId(id)
23+
24+
class Broadcast(id: Int) : RealtimeCallbackId(id)
25+
26+
}
27+
28+
@SupabaseInternal
29+
interface CallbackManager {
1430

1531
fun triggerPostgresChange(ids: List<Int>, data: PostgresAction)
1632

1733
fun triggerBroadcast(event: String, data: JsonObject)
1834

1935
fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>)
2036

21-
fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int
37+
fun hasPresenceCallback(): Boolean
2238

23-
fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int
39+
fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast
2440

25-
fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int
41+
fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres
2642

27-
fun removeCallbackById(id: Int)
43+
fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence
2844

29-
fun setServerChanges(changes: List<PostgresJoinConfig>)
45+
fun removeCallbackById(id: RealtimeCallbackId)
3046

31-
fun getCallbacks(): List<RealtimeCallback<*>>
47+
fun setServerChanges(changes: List<PostgresJoinConfig>)
3248

3349
}
3450

35-
internal class CallbackManagerImpl(
51+
typealias BroadcastMap = PersistentMap<String, PersistentList<RealtimeCallback.BroadcastCallback>>
52+
53+
54+
class CallbackManagerImpl(
3655
private val serializer: SupabaseSerializer = KotlinXSerializer()
3756
) : CallbackManager {
3857

3958
private val nextId = AtomicInt(0)
4059
private val _serverChanges = AtomicReference(listOf<PostgresJoinConfig>())
4160
val serverChanges: List<PostgresJoinConfig> get() = _serverChanges.load()
42-
private val callbacks = AtomicMutableList<RealtimeCallback<*>>()
4361

44-
override fun getCallbacks(): List<RealtimeCallback<*>> {
45-
return callbacks.toList()
46-
}
62+
private val presenceCallbacks = AtomicReference<PersistentMap<Int, RealtimeCallback.PresenceCallback>>(persistentHashMapOf())
4763

48-
override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): Int {
64+
private val broadcastCallbacks = AtomicReference<BroadcastMap>(persistentHashMapOf())
65+
private val broadcastEventId = AtomicReference<PersistentMap<Int, String>>(persistentHashMapOf())
66+
67+
private val postgresCallbacks = AtomicReference<PersistentMap<Int, RealtimeCallback.PostgresCallback>>(persistentHashMapOf())
68+
69+
override fun addBroadcastCallback(event: String, callback: (JsonObject) -> Unit): RealtimeCallbackId.Broadcast {
4970
val id = nextId.fetchAndIncrement()
50-
callbacks += RealtimeCallback.BroadcastCallback(callback, event, id)
51-
return id
71+
broadcastCallbacks.update {
72+
val current = it[event] ?: persistentListOf()
73+
it.put(event, current + RealtimeCallback.BroadcastCallback(callback, event, id))
74+
}
75+
broadcastEventId.update {
76+
it.put(id, event)
77+
}
78+
return RealtimeCallbackId.Broadcast(id)
5279
}
5380

54-
override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): Int {
81+
override fun addPostgresCallback(filter: PostgresJoinConfig, callback: (PostgresAction) -> Unit): RealtimeCallbackId.Postgres {
5582
val id = nextId.fetchAndIncrement()
56-
callbacks += RealtimeCallback.PostgresCallback(callback, filter, id)
57-
return id
83+
postgresCallbacks.update {
84+
it.put(id, RealtimeCallback.PostgresCallback(callback, filter, id))
85+
}
86+
return RealtimeCallbackId.Postgres(id)
5887
}
5988

6089
override fun triggerPostgresChange(ids: List<Int>, data: PostgresAction) {
6190
val filter = serverChanges.filter { it.id in ids }
62-
val postgresCallbacks = callbacks.filterIsInstance<RealtimeCallback.PostgresCallback>()
6391
val callbacks =
64-
postgresCallbacks.filter { cc -> filter.any { sc -> cc.filter == sc } }
92+
postgresCallbacks.load().values.filter { cc -> filter.any { sc -> cc.filter == sc } }
6593
callbacks.forEach { it.callback(data) }
6694
}
6795

6896
override fun triggerBroadcast(event: String, data: JsonObject) {
69-
val broadcastCallbacks = callbacks.filterIsInstance<RealtimeCallback.BroadcastCallback>()
70-
val callbacks = broadcastCallbacks.filter { it.event == event }
71-
callbacks.forEach { it.callback(data) }
97+
broadcastCallbacks.load()[event]?.forEach { it.callback(data) }
7298
}
7399

74100
override fun triggerPresenceDiff(joins: Map<String, Presence>, leaves: Map<String, Presence>) {
75-
val presenceCallbacks = callbacks.filterIsInstance<RealtimeCallback.PresenceCallback>()
76-
presenceCallbacks.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
101+
presenceCallbacks.load().values.forEach { it.callback(PresenceActionImpl(serializer, joins, leaves)) }
77102
}
78103

79-
override fun addPresenceCallback(callback: (PresenceAction) -> Unit): Int {
104+
override fun hasPresenceCallback(): Boolean {
105+
return presenceCallbacks.load().isNotEmpty()
106+
}
107+
108+
override fun addPresenceCallback(callback: (PresenceAction) -> Unit): RealtimeCallbackId.Presence {
80109
val id = nextId.fetchAndIncrement()
81-
callbacks += RealtimeCallback.PresenceCallback(callback, id)
82-
return id
110+
presenceCallbacks.update {
111+
it.put(id, RealtimeCallback.PresenceCallback(callback, id))
112+
}
113+
return RealtimeCallbackId.Presence(id)
114+
}
115+
116+
fun removeBroadcastCallbackById(id: Int) {
117+
val event = broadcastEventId.load()[id] ?: return
118+
broadcastCallbacks.update {
119+
it.put(event, it[event]?.removeAll { c -> c.id == id } ?: persistentListOf())
120+
}
121+
broadcastEventId.update {
122+
it.remove(id)
123+
}
124+
}
125+
126+
fun removePresenceCallbackById(id: Int) {
127+
presenceCallbacks.update {
128+
it.remove(id)
129+
}
130+
}
131+
132+
fun removePostgresCallbackById(id: Int) {
133+
postgresCallbacks.update {
134+
it.remove(id)
135+
}
83136
}
84137

85-
override fun removeCallbackById(id: Int) {
86-
callbacks.indexOfFirst { it.id == id }.takeIf { it != -1 }?.let { callbacks.removeAt(it) }
138+
override fun removeCallbackById(id: RealtimeCallbackId) {
139+
when (id) {
140+
is RealtimeCallbackId.Broadcast -> removeBroadcastCallbackById(id.id)
141+
is RealtimeCallbackId.Presence -> removePresenceCallbackById(id.id)
142+
is RealtimeCallbackId.Postgres -> removePostgresCallbackById(id.id)
143+
}
87144
}
88145

89146
override fun setServerChanges(changes: List<PostgresJoinConfig>) {

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/PresenceAction.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,8 @@ sealed interface PresenceAction {
9696

9797
}
9898

99-
@PublishedApi internal class PresenceActionImpl(
99+
//@PublishedApi
100+
class PresenceActionImpl(
100101
@PublishedApi internal val serializer: SupabaseSerializer,
101102
override val joins: Map<String, Presence>,
102103
override val leaves: Map<String, Presence>

Realtime/src/commonMain/kotlin/io/github/jan/supabase/realtime/RealtimeChannelImpl.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ internal class RealtimeChannelImpl(
6464
Realtime.logger.d { "Subscribing to channel $topic" }
6565
val currentJwt = accessToken()
6666
val postgrestChanges = clientChanges.toList()
67-
val hasPresenceCallback = callbackManager.getCallbacks().filterIsInstance<RealtimeCallback.PresenceCallback>().isNotEmpty()
67+
val hasPresenceCallback = callbackManager.hasPresenceCallback()
6868
presenceJoinConfig.enabled = hasPresenceCallback
6969
val joinConfig = RealtimeJoinPayload(RealtimeJoinConfig(broadcastJoinConfig, presenceJoinConfig, postgrestChanges, isPrivate))
7070
val joinConfigObject = buildJsonObject {

Realtime/src/commonTest/kotlin/CallbackManagerTest.kt

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import io.github.jan.supabase.realtime.HasRecord
44
import io.github.jan.supabase.realtime.PostgresAction
55
import io.github.jan.supabase.realtime.PostgresJoinConfig
66
import io.github.jan.supabase.realtime.Presence
7-
import io.github.jan.supabase.realtime.RealtimeCallback
87
import io.github.jan.supabase.serializer.KotlinXSerializer
98
import kotlinx.serialization.json.JsonObject
109
import kotlinx.serialization.json.buildJsonObject
@@ -37,18 +36,6 @@ class CallbackManagerTest {
3736
assertFalse { called }
3837
}
3938

40-
@Test
41-
fun testGetCallbacks() {
42-
val cm = CallbackManagerImpl()
43-
val expectedEvent = "event"
44-
cm.addBroadcastCallback(expectedEvent) {
45-
//...
46-
}
47-
val callbacks = cm.getCallbacks()
48-
assertTrue { callbacks.isNotEmpty() }
49-
assertTrue { callbacks.any { it is RealtimeCallback.BroadcastCallback && it.event == expectedEvent } }
50-
}
51-
5239
@Test
5340
fun testPresenceCallbacks() {
5441
val cm = CallbackManagerImpl()
@@ -64,10 +51,12 @@ class CallbackManagerTest {
6451
assertEquals(expectedLeaves, it.leaves)
6552
called = true
6653
}
54+
assertTrue { cm.hasPresenceCallback() }
6755
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
6856
assertTrue { called }
6957
cm.removeCallbackById(id)
7058
called = false
59+
assertFalse { cm.hasPresenceCallback() }
7160
cm.triggerPresenceDiff(expectedJoins, expectedLeaves)
7261
assertFalse { called }
7362
}
@@ -96,15 +85,15 @@ class CallbackManagerTest {
9685
called = true
9786
}
9887
cm.setServerChanges(listOf(joinConfig))
99-
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
88+
cm.triggerPostgresChange(listOf(id.id), actionFromEvent(event, expectedRecord, expectedOldRecord))
10089
assertTrue { called }
10190
called = false
10291
if(event != "*") {
10392
cm.triggerPostgresChange(listOf(2), actionFromEvent(events.filter { it != event && it != "*" }.random(), expectedRecord, expectedOldRecord))
10493
assertFalse { called }
10594
}
10695
cm.removeCallbackById(id)
107-
cm.triggerPostgresChange(listOf(id), actionFromEvent(event, expectedRecord, expectedOldRecord))
96+
cm.triggerPostgresChange(listOf(id.id), actionFromEvent(event, expectedRecord, expectedOldRecord))
10897
assertFalse { called }
10998
}
11099
}

gradle/libs.versions.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[versions]
2-
kotlin = "2.2.20-RC2"
2+
kotlin = "2.2.20"
33
accompanist-permissions = "0.37.3"
44
ktor = "3.2.3"
55
dokka = "2.0.0"

0 commit comments

Comments
 (0)