Skip to content
This repository was archived by the owner on Dec 10, 2025. It is now read-only.

Commit aa23462

Browse files
committed
feat(observer): implement ObservableField and related utilities for value observation
1 parent da39cd1 commit aa23462

File tree

7 files changed

+100
-11
lines changed

7 files changed

+100
-11
lines changed

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/util/ObservableField.kt renamed to surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/util/observer/ObservableField.kt

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1-
package dev.slne.surf.cloud.api.common.util
1+
package dev.slne.surf.cloud.api.common.util.observer
22

3+
import dev.slne.surf.cloud.api.common.util.threadFactory
34
import dev.slne.surf.surfapi.core.api.util.logger
45
import kotlinx.coroutines.*
56
import kotlinx.coroutines.channels.Channel
67
import org.jetbrains.annotations.ApiStatus
8+
import java.util.concurrent.CopyOnWriteArrayList
79
import java.util.concurrent.Executors
810
import kotlin.time.Duration
911
import kotlin.time.Duration.Companion.seconds
@@ -15,16 +17,16 @@ import kotlin.time.Duration.Companion.seconds
1517
* @property getter A function to retrieve the current value.
1618
* @property cachedValue The initial cached value of the observed field, defaults to the value returned by [getter].
1719
* @property interval The time interval between checks for changes, defaults to 1 second.
18-
* @property customDispatcher An optional [CoroutineDispatcher] to use for scheduling tasks.
20+
* @property customDispatcher An optional [kotlinx.coroutines.CoroutineDispatcher] to use for scheduling tasks.
1921
*/
2022
class ObservableField<T>(
2123
private val getter: () -> T,
2224
private var cachedValue: T = getter(),
2325
private val interval: Duration = 1.seconds,
2426
customDispatcher: CoroutineDispatcher? = null
2527
) {
26-
private val channel = Channel<T>(Channel.CONFLATED)
27-
private val listener = mutableObjectSetOf<(T) -> Unit>()
28+
private val channel = Channel<T>(Channel.Factory.CONFLATED)
29+
private val listener = CopyOnWriteArrayList<(T) -> Unit>()
2830

2931
init {
3032
val dispatcher = customDispatcher?.let { CoroutineScope(it + SupervisorJob()) }
@@ -71,6 +73,10 @@ class ObservableField<T>(
7173
this.listener.add(listener)
7274
}
7375

76+
fun cachedValue(): T {
77+
return cachedValue
78+
}
79+
7480
/**
7581
* A singleton object that provides a shared [CoroutineScope] for [ObservableField] instances.
7682
* This scope is backed by a cached thread pool and includes exception handling for uncaught exceptions.
@@ -95,4 +101,4 @@ class ObservableField<T>(
95101
override val coroutineContext =
96102
dispatcher + CoroutineName("observable-field") + SupervisorJob()
97103
}
98-
}
104+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
package dev.slne.surf.cloud.api.common.util.observer
2+
3+
import kotlinx.coroutines.currentCoroutineContext
4+
import kotlinx.coroutines.delay
5+
import kotlinx.coroutines.flow.Flow
6+
import kotlinx.coroutines.flow.distinctUntilChanged
7+
import kotlinx.coroutines.flow.flow
8+
import kotlinx.coroutines.isActive
9+
import kotlin.time.Duration
10+
import kotlin.time.Duration.Companion.seconds
11+
12+
fun <T> observingFlow(getter: () -> T, interval: Duration = 1.seconds): Flow<T> = flow {
13+
emit(getter())
14+
while (currentCoroutineContext().isActive) {
15+
delay(interval)
16+
val current = getter()
17+
emit(current)
18+
}
19+
}.distinctUntilChanged()

surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/netty/network/BukkitSpecificPacketListenerExtension.kt

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,16 +6,21 @@ import dev.slne.surf.cloud.api.common.player.teleport.TeleportCause
66
import dev.slne.surf.cloud.api.common.player.teleport.TeleportFlag
77
import dev.slne.surf.cloud.api.common.player.teleport.TeleportLocation
88
import dev.slne.surf.cloud.api.common.player.toCloudPlayer
9+
import dev.slne.surf.cloud.api.common.util.observer.observingFlow
910
import dev.slne.surf.cloud.bukkit.listener.player.SilentDisconnectListener
1011
import dev.slne.surf.cloud.bukkit.plugin
1112
import dev.slne.surf.cloud.core.client.netty.network.PlatformSpecificPacketListenerExtension
1213
import dev.slne.surf.cloud.core.client.server.ClientCloudServerImpl
14+
import dev.slne.surf.cloud.core.common.coroutines.CommonObservableScope
1315
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.RegistrationInfo
1416
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ServerboundTransferPlayerPacketResponse
1517
import dev.slne.surf.surfapi.bukkit.api.extensions.server
1618
import dev.slne.surf.surfapi.bukkit.api.nms.NmsUseWithCaution
1719
import dev.slne.surf.surfapi.bukkit.api.nms.bridges.nmsCommonBridge
1820
import dev.slne.surf.surfapi.bukkit.api.util.dispatcher
21+
import kotlinx.coroutines.flow.launchIn
22+
import kotlinx.coroutines.flow.map
23+
import kotlinx.coroutines.flow.onEach
1924
import kotlinx.coroutines.future.await
2025
import net.kyori.adventure.text.Component
2126
import org.bukkit.Bukkit
@@ -105,8 +110,13 @@ class BukkitSpecificPacketListenerExtension : PlatformSpecificPacketListenerExte
105110

106111
@OptIn(NmsUseWithCaution::class)
107112
override fun setVelocitySecret(secret: ByteArray) {
108-
nmsCommonBridge.setVelocityEnabled(true)
109-
nmsCommonBridge.setVelocitySecret(secret.toString(StandardCharsets.UTF_8))
113+
// nmsCommonBridge.setVelocityEnabled(true)
114+
// nmsCommonBridge.setVelocitySecret(secret.toString(StandardCharsets.UTF_8))
115+
// nmsCommonBridge.setOnlineMode(false)
116+
117+
BukkitVelocitySecretManager.currentVelocityEnabled = true
118+
BukkitVelocitySecretManager.currentVelocitySecret = secret
119+
BukkitVelocitySecretManager.currentOnlineMode = false
110120
}
111121

112122
override fun triggerShutdown() {
@@ -122,4 +132,54 @@ class BukkitSpecificPacketListenerExtension : PlatformSpecificPacketListenerExte
122132
override fun shutdown() {
123133
server.shutdown()
124134
}
135+
136+
@OptIn(NmsUseWithCaution::class)
137+
object BukkitVelocitySecretManager {
138+
139+
@Volatile
140+
var currentVelocityEnabled = nmsCommonBridge.isVelocityEnabled()
141+
142+
@Volatile
143+
var currentVelocitySecret = nmsCommonBridge
144+
.getVelocitySecret()
145+
.toByteArray(StandardCharsets.UTF_8)
146+
147+
@Volatile
148+
var currentOnlineMode = server.onlineMode
149+
150+
init {
151+
observingFlow({ nmsCommonBridge.isVelocityEnabled() })
152+
.onEach { remote ->
153+
if (remote != currentVelocityEnabled) {
154+
println("Updating Velocity enabled state: $remote")
155+
nmsCommonBridge.setVelocityEnabled(currentVelocityEnabled)
156+
}
157+
}
158+
.launchIn(CommonObservableScope)
159+
160+
161+
observingFlow({ nmsCommonBridge.getVelocitySecret() })
162+
.map { it.toByteArray(StandardCharsets.UTF_8) }
163+
.onEach { remote ->
164+
if (!remote.contentEquals(currentVelocitySecret)) {
165+
println("Updating Velocity secret: ${remote.toString(StandardCharsets.UTF_8)}")
166+
nmsCommonBridge.setVelocitySecret(
167+
currentVelocitySecret.toString(
168+
StandardCharsets.UTF_8
169+
)
170+
)
171+
}
172+
}
173+
.launchIn(CommonObservableScope)
174+
175+
observingFlow({ server.onlineMode })
176+
.onEach { remote ->
177+
if (remote != currentOnlineMode) {
178+
println("Updating online mode: $remote")
179+
nmsCommonBridge.setOnlineMode(remote)
180+
}
181+
}
182+
.launchIn(CommonObservableScope)
183+
}
184+
}
125185
}

surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/netty/sync/ClientInformationUpdaterSyncer.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,8 @@ package dev.slne.surf.cloud.bukkit.netty.sync
33
import com.destroystokyo.paper.event.server.WhitelistToggleEvent
44
import dev.slne.surf.cloud.api.client.netty.packet.fireAndForget
55
import dev.slne.surf.cloud.api.common.server.state.ServerState
6-
import dev.slne.surf.cloud.api.common.util.ObservableField
76
import dev.slne.surf.cloud.api.common.util.TimeLogger
7+
import dev.slne.surf.cloud.api.common.util.observer.ObservableField
88
import dev.slne.surf.cloud.bukkit.util.ObservableFieldByEvent
99
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ClientInformation
1010
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.ServerboundClientInformationPacket

surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/util/ObservableFieldByEvent.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dev.slne.surf.cloud.bukkit.util
22

3-
import dev.slne.surf.cloud.api.common.util.ObservableField.ObservableCoroutineScope
43
import dev.slne.surf.cloud.api.common.util.mutableObjectSetOf
4+
import dev.slne.surf.cloud.api.common.util.observer.ObservableField.ObservableCoroutineScope
55
import dev.slne.surf.surfapi.bukkit.api.event.listen
66
import kotlinx.coroutines.CoroutineDispatcher
77
import kotlinx.coroutines.CoroutineScope

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,4 +191,9 @@ object BeforeStartTaskScope : BaseScope(
191191
object SyncValueScope : BaseScope(
192192
dispatcher = Dispatchers.Default,
193193
name = "sync-value"
194+
)
195+
196+
object CommonObservableScope : BaseScope(
197+
dispatcher = Dispatchers.Default,
198+
name = "common-observable"
194199
)

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/protocol/common/ClientboundSetVelocitySecretPacket.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import kotlinx.serialization.Serializable
99
@Serializable
1010
class ClientboundSetVelocitySecretPacket(val secret: ByteArray): NettyPacket() {
1111
override fun toString(): String {
12-
return "ClientboundSetVelocitySecretPacket(secret=***)" +
13-
" ${super.toString()}"
12+
return "ClientboundSetVelocitySecretPacket(secret=***)"
1413
}
1514
}

0 commit comments

Comments
 (0)