Skip to content
This repository was archived by the owner on Dec 10, 2025. It is now read-only.

Commit e5232fc

Browse files
committed
feat(sync): add network synchronizing phase and sync registry
Introduce a new synchronizing protocol phase to support efficient, batched synchronization of arbitrary values and sets over Netty. - Define `SyncRegistry` API and `CommonSyncRegistryImpl` for creation, registration and freezing of `SyncValue` and `SyncSet` instances. - Implement `BasicSyncValue`, `RateLimitedSyncValue` (using `MutableStateFlow.debounce`) and `SyncSetImpl` for change tracking and listener notification. - Add `SyncValueChangePacket`, `SyncSetDeltaPacket` and their batch counterparts (`ClientboundBatchSyncValuePacket`, `ClientboundBatchSyncSetPacket`) with optimized in-place `SurfByteBuf` encoding (reserve‐length placeholder + patch length) to eliminate temporary buffer allocations. - Introduce a new `ConnectionProtocol.SYNCHRONIZING` phase with corresponding packet listeners, protocol builders (`SynchronizingProtocols`) and transition logic in `NettyPacketProcessor` and `ConnectionImpl`. - Refactor the Pre-Running phase to transition into Synchronizing (replace “ReadyToRun” packets with “ProceedToSynchronizing” equivalents). - Add `BeforeStartTaskScope` and Spring integration (`CloudBeforeStartTaskHandler`, `SyncRegistryFreezeHandler`, `CloudSynchronizeTaskManager`) to register and execute plugin-defined initial synchronization tasks in order. - Centralize coroutine exception handling in `BaseScope`, name before-start tasks, and establish proper `equals`/`hashCode` for `ConnectionImpl`. - Update client and standalone server implementations to await synchronizing completion before entering the Running phase. This patch establishes a robust, low-GC-overhead framework for keeping shared state in sync between client and server.
1 parent b35281a commit e5232fc

File tree

66 files changed

+1816
-83
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+1816
-83
lines changed

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/meta/SurfNettyPacket.kt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ annotation class SurfNettyPacket(
1717
val id: String,
1818
val flow: PacketFlow,
1919
@property:InternalApi
20-
vararg val protocols: ConnectionProtocol = [ConnectionProtocol.RUNNING]
20+
vararg val protocols: ConnectionProtocol = [ConnectionProtocol.RUNNING, ConnectionProtocol.SYNCHRONIZING]
2121
)
2222

2323
/**
@@ -43,6 +43,7 @@ object DefaultIds {
4343
// Login
4444
const val SERVERBOUND_LOGIN_START_PACKET = "cloud:serverbound:login_start"
4545
const val CLIENTBOUND_LOGIN_FINISHED_PACKET = "cloud:clientbound:login_finished"
46+
const val SERVERBOUND_WAIT_FOR_SERVER_TO_START_PACKET = "cloud:serverbound:wait_for_server_to_start"
4647
const val SERVERBOUND_LOGIN_ACKNOWLEDGED_PACKET = "cloud:serverbound:login_acknowledged"
4748
const val CLIENTBOUND_LOGIN_DISCONNECT_PACKET = "cloud:clientbound:login_disconnect"
4849
const val SERVERBOUND_KEY_PACKET =

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/netty/network/ConnectionProtocol.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ enum class ConnectionProtocol {
88
INITIALIZE,
99
LOGIN,
1010
PRE_RUNNING,
11+
SYNCHRONIZING,
1112
RUNNING,
1213
SHUTDOWN
1314
}

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/netty/network/codec/kotlinx/SurfCloudBufSerializer.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,11 @@ import dev.slne.surf.cloud.api.common.netty.network.codec.kotlinx.cloud.OfflineC
99
import dev.slne.surf.cloud.api.common.netty.network.codec.kotlinx.java.*
1010
import dev.slne.surf.cloud.api.common.netty.network.codec.kotlinx.kotlin.DurationSerializer
1111
import dev.slne.surf.cloud.api.common.netty.network.codec.kotlinx.nbt.CompoundTagSerializer
12+
import dev.slne.surf.cloud.api.common.util.annotation.InternalApi
1213
import kotlinx.serialization.modules.SerializersModule
1314
import kotlinx.serialization.modules.contextual
1415

16+
@InternalApi
1517
object SurfCloudBufSerializer {
1618
val serializerModule = SerializersModule {
1719
// Adventure

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/netty/packet/packet-extension.kt

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ private val codecCache = mutableObject2ObjectMapOf<KClass<out NettyPacket>, Stre
6868

6969

7070
@OptIn(InternalSerializationApi::class)
71-
fun <P : NettyPacket> KClass<out P>.createCodec(): StreamCodec<SurfByteBuf, P> {
71+
fun <P : Any> KClass<out P>.createCodec(): StreamCodec<SurfByteBuf, P> {
7272
val serializer = serializer()
7373
return object : StreamCodec<SurfByteBuf, P> {
7474
override fun decode(buf: SurfByteBuf): P {
@@ -102,7 +102,11 @@ fun <B : ByteBuf, V : NettyPacket> KClass<out V>.findPacketCodec(): StreamCodec<
102102
}
103103

104104
override fun encode(buf: B, value: V) {
105-
SurfCloudBufSerializer.serializer.encodeToBuf(buf, serializer as KSerializer<V>, value)
105+
SurfCloudBufSerializer.serializer.encodeToBuf(
106+
buf,
107+
serializer as KSerializer<V>,
108+
value
109+
)
106110
}
107111
}
108112
}

surf-cloud-api/surf-cloud-api-common/src/main/kotlin/dev/slne/surf/cloud/api/common/netty/protocol/buffer/SurfByteBuf.kt

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import com.google.gson.Gson
66
import com.google.gson.JsonElement
77
import com.mojang.serialization.Codec
88
import com.mojang.serialization.JsonOps
9+
import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec
10+
import dev.slne.surf.cloud.api.common.netty.network.codec.kotlinx.SurfCloudBufSerializer
911
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf.Companion
1012
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.decoder.DecodeFactory
1113
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.decoder.DecodeFactory.DecodeLongFactory
@@ -18,10 +20,13 @@ import dev.slne.surf.cloud.api.common.util.codec.ExtraCodecs
1820
import dev.slne.surf.cloud.api.common.util.createUnresolvedInetSocketAddress
1921
import dev.slne.surf.cloud.api.common.util.fromJson
2022
import io.netty.buffer.ByteBuf
23+
import io.netty.buffer.ByteBufAllocator
24+
import io.netty.buffer.PooledByteBufAllocator
2125
import io.netty.handler.codec.DecoderException
2226
import io.netty.handler.codec.EncoderException
2327
import it.unimi.dsi.fastutil.objects.Object2ObjectOpenHashMap
2428
import it.unimi.dsi.fastutil.objects.ObjectArrayList
29+
import kotlinx.serialization.KSerializer
2530
import net.kyori.adventure.key.Key
2631
import net.kyori.adventure.sound.Sound
2732
import net.kyori.adventure.text.Component
@@ -88,8 +93,28 @@ private const val NUMBER_DOUBLE: Byte = 5
8893
*/
8994
open class SurfByteBuf(source: ByteBuf) : WrappedByteBuf(source) {
9095
companion object {
96+
val alloc: ByteBufAllocator = PooledByteBufAllocator.DEFAULT
97+
9198
private val GSON = Gson()
9299

100+
fun<T> streamCodecFromKotlin(serializer: KSerializer<T>): StreamCodec<SurfByteBuf, T> {
101+
return SerializerCodec(serializer)
102+
}
103+
104+
private class SerializerCodec<T>(private val serializer: KSerializer<T>) :
105+
StreamCodec<SurfByteBuf, T> {
106+
override fun decode(buf: SurfByteBuf): T {
107+
return SurfCloudBufSerializer.serializer.decodeFromBuf(buf, serializer)
108+
}
109+
110+
override fun encode(
111+
buf: SurfByteBuf,
112+
value: T
113+
) {
114+
SurfCloudBufSerializer.serializer.encodeToBuf(buf, serializer, value)
115+
}
116+
}
117+
93118
/**
94119
* Read key key.
95120
*
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
package dev.slne.surf.cloud.api.common.plugin.spring.task
2+
3+
import dev.slne.surf.cloud.api.common.netty.NettyClient
4+
import org.jetbrains.annotations.ApiStatus
5+
6+
interface CloudInitialSynchronizeTask {
7+
val name: String
8+
get() = this::class.simpleName ?: "Unknown"
9+
10+
@ApiStatus.OverrideOnly
11+
suspend fun execute(client: NettyClient)
12+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package dev.slne.surf.cloud.api.common.sync
2+
3+
import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec
4+
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf
5+
import dev.slne.surf.cloud.api.common.util.annotation.InternalApi
6+
import dev.slne.surf.surfapi.core.api.util.requiredService
7+
8+
@InternalApi
9+
interface SyncRegistry {
10+
11+
fun <T> createSyncValue(
12+
id: String,
13+
defaultValue: T,
14+
codec: StreamCodec<SurfByteBuf, T>
15+
): SyncValue<T>
16+
17+
fun <T> createSyncSet(
18+
id: String,
19+
codec: StreamCodec<SurfByteBuf, T>
20+
): SyncSet<T>
21+
22+
companion object {
23+
@InternalApi
24+
val instance = requiredService<SyncRegistry>()
25+
}
26+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package dev.slne.surf.cloud.api.common.sync
2+
3+
import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec
4+
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf
5+
import it.unimi.dsi.fastutil.objects.ObjectSet
6+
import kotlinx.serialization.KSerializer
7+
import kotlinx.serialization.serializer
8+
import org.jetbrains.annotations.ApiStatus
9+
import org.jetbrains.annotations.UnmodifiableView
10+
11+
typealias SyncSetListener<T> = (added: Boolean, element: T) -> Unit
12+
13+
interface SyncSet<T> : ObjectSet<T> {
14+
val id: String
15+
16+
/**
17+
* There is generally no need to use this codec directly, as this sync set should always
18+
* be automatically synchronized across the network.
19+
*/
20+
@get:ApiStatus.Obsolete
21+
val codec: StreamCodec<SurfByteBuf, Set<T>>
22+
23+
fun subscribe(listener: SyncSetListener<T>): Boolean
24+
fun snapshot(): @UnmodifiableView ObjectSet<T>
25+
26+
companion object {
27+
operator fun <T> invoke(
28+
id: String,
29+
codec: StreamCodec<SurfByteBuf, T>
30+
): SyncSet<T> = of(id, codec)
31+
32+
inline operator fun <reified T> invoke(id: String): SyncSet<T> = serializable(id)
33+
operator fun <T> invoke(
34+
id: String,
35+
serializer: KSerializer<T>
36+
): SyncSet<T> = serializable(id, serializer)
37+
38+
inline fun <reified T> serializable(
39+
id: String,
40+
): SyncSet<T> = serializable(id, serializer())
41+
42+
fun <T> serializable(
43+
id: String,
44+
serializer: KSerializer<T>,
45+
): SyncSet<T> = of(
46+
id,
47+
SurfByteBuf.streamCodecFromKotlin(serializer)
48+
)
49+
50+
fun <T> of(id: String, codec: StreamCodec<SurfByteBuf, T>): SyncSet<T> =
51+
SyncRegistry.instance.createSyncSet(id, codec)
52+
}
53+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
package dev.slne.surf.cloud.api.common.sync
2+
3+
import dev.slne.surf.cloud.api.common.netty.network.codec.StreamCodec
4+
import dev.slne.surf.cloud.api.common.netty.protocol.buffer.SurfByteBuf
5+
import kotlinx.serialization.KSerializer
6+
import kotlinx.serialization.serializer
7+
import kotlin.reflect.KProperty
8+
import kotlin.time.Duration
9+
10+
typealias SyncValueListener<T> = (old: T, new: T) -> Unit
11+
12+
interface SyncValue<T> {
13+
14+
val id: String
15+
16+
val codec: StreamCodec<SurfByteBuf, T>
17+
18+
/**
19+
* Returns the current value of this sync value.
20+
*/
21+
fun get(): T
22+
23+
/**
24+
* Sets a new value for this sync value.
25+
*/
26+
fun set(newValue: T)
27+
28+
/**
29+
* Subscribes to changes in this sync value.
30+
*/
31+
fun subscribe(listener: SyncValueListener<T>): Boolean
32+
33+
operator fun getValue(thisRef: Any?, property: KProperty<*>): T = get()
34+
operator fun setValue(thisRef: Any?, property: KProperty<*>, newValue: T) = set(newValue)
35+
36+
fun rateLimited(minInterval: Duration): SyncValue<T>
37+
38+
companion object {
39+
operator fun <T> invoke(
40+
id: String,
41+
defaultValue: T,
42+
codec: StreamCodec<SurfByteBuf, T>
43+
): SyncValue<T> = of(id, defaultValue, codec)
44+
45+
inline operator fun <reified T> invoke(id: String, defaultValue: T): SyncValue<T> = serializable(id, defaultValue)
46+
operator fun <T> invoke(
47+
id: String,
48+
defaultValue: T,
49+
serializer: KSerializer<T>
50+
): SyncValue<T> = serializable(id, defaultValue, serializer)
51+
52+
inline fun <reified T> serializable(
53+
id: String,
54+
defaultValue: T
55+
): SyncValue<T> = serializable(id, defaultValue, serializer())
56+
57+
fun <T> serializable(
58+
id: String,
59+
defaultValue: T,
60+
serializer: KSerializer<T>,
61+
): SyncValue<T> = of(
62+
id,
63+
defaultValue,
64+
SurfByteBuf.streamCodecFromKotlin(serializer)
65+
)
66+
67+
fun <T> of(id: String, defaultValue: T, codec: StreamCodec<SurfByteBuf, T>): SyncValue<T> =
68+
SyncRegistry.instance.createSyncValue(id, defaultValue, codec)
69+
}
70+
}

surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/ClientNettyClientImpl.kt

Lines changed: 8 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@ import dev.slne.surf.cloud.api.common.exceptions.FatalSurfError
66
import dev.slne.surf.cloud.api.common.netty.network.protocol.PacketFlow
77
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket
88
import dev.slne.surf.cloud.api.common.server.CloudServerConstants
9-
import dev.slne.surf.cloud.api.common.util.mutableObjectListOf
10-
import dev.slne.surf.cloud.core.client.netty.network.*
9+
import dev.slne.surf.cloud.core.client.netty.network.ClientHandshakePacketListenerImpl
10+
import dev.slne.surf.cloud.core.client.netty.network.ClientRunningPacketListenerImpl
11+
import dev.slne.surf.cloud.core.client.netty.network.PlatformSpecificPacketListenerExtension
12+
import dev.slne.surf.cloud.core.client.netty.network.StatusUpdate
1113
import dev.slne.surf.cloud.core.client.server.serverManagerImpl
1214
import dev.slne.surf.cloud.core.common.config.cloudConfig
1315
import dev.slne.surf.cloud.core.common.coroutines.ConnectionTickScope
@@ -53,8 +55,9 @@ class ClientNettyClientImpl(
5355
val listener get() = _listener ?: error("listener not yet set")
5456
val connected get() = _listener?.connection?.connected ?: false
5557

56-
private val awaitPreRunning = CompletableDeferred<Unit>()
57-
private val finalizeHandler = mutableObjectListOf<suspend () -> Unit>()
58+
val preRunningCallback = CompletableDeferred<Unit>()
59+
val synchronizeCallback = CompletableDeferred<Unit>()
60+
lateinit var startSynchronizeTask: suspend () -> Unit
5861

5962
private val statusUpdate: StatusUpdate = {
6063
log.atInfo().log(it)
@@ -66,20 +69,9 @@ class ClientNettyClientImpl(
6669
suspend fun bootstrap() {
6770
val config = cloudConfig.connectionConfig.nettyConfig
6871
connectToServer(ServerAddress(config.host, config.port))
69-
awaitPreRunning.await() // Wait until the connection is in the PreRunning state
72+
preRunningCallback.await() // Wait until the connection is in the PreRunning state
7073
}
7174

72-
/**
73-
* Finalizes the client.
74-
* This method is called after all other plugins have registered their packets.
75-
* Switches to the Running state.
76-
*/
77-
suspend fun finalize() {
78-
finalizeHandler.forEach { it() }
79-
finalizeHandler.clear()
80-
}
81-
82-
fun finalizeHandler(handler: suspend () -> Unit) = finalizeHandler.add(handler)
8375

8476
suspend fun stop() {
8577
doShutdown()
@@ -119,7 +111,6 @@ class ClientNettyClientImpl(
119111
connection,
120112
platformExtension,
121113
statusUpdate,
122-
awaitPreRunning
123114
),
124115
false
125116
)

0 commit comments

Comments
 (0)