Skip to content
This repository was archived by the owner on Dec 10, 2025. It is now read-only.

Commit 2585f77

Browse files
authored
Merge pull request #138 from SLNE-Development/fix/sync-initial-players
Fix/sync initial players
2 parents 93a71c0 + 6b982e9 commit 2585f77

File tree

31 files changed

+794
-152
lines changed

31 files changed

+794
-152
lines changed

surf-cloud-bukkit/src/main/kotlin/dev/slne/surf/cloud/bukkit/player/BukkitCloudPlayerManagerImpl.kt

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,15 +20,12 @@ class BukkitCloudPlayerManagerImpl :
2020
override fun createPlayer(
2121
uuid: UUID,
2222
name: String,
23-
proxy: Boolean,
24-
ip: Inet4Address,
25-
serverName: String
23+
proxyName: String?,
24+
serverName: String?,
25+
ip: Inet4Address
2626
) = BukkitClientCloudPlayerImpl(uuid, name).also {
27-
if (proxy) {
28-
it.proxyServerName = serverName
29-
} else {
30-
it.serverName = serverName
31-
}
27+
it.proxyServerName = serverName
28+
it.serverName = serverName
3229
}
3330

3431
override fun getAudience(uuid: UUID): Audience? = Bukkit.getPlayer(uuid)

surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/netty/network/ClientSynchronizingPacketListenerImpl.kt

Lines changed: 126 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,20 +4,26 @@ import dev.slne.surf.cloud.api.common.netty.network.ConnectionProtocol
44
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacket
55
import dev.slne.surf.cloud.api.common.netty.packet.NettyPacketInfo
66
import dev.slne.surf.cloud.core.client.netty.ClientNettyClientImpl
7+
import dev.slne.surf.cloud.core.client.player.commonPlayerManagerImpl
78
import dev.slne.surf.cloud.core.client.server.ClientCloudServerImpl
89
import dev.slne.surf.cloud.core.client.server.ClientProxyCloudServerImpl
910
import dev.slne.surf.cloud.core.client.server.serverManagerImpl
1011
import dev.slne.surf.cloud.core.client.sync.SyncRegistryImpl
11-
import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope
1212
import dev.slne.surf.cloud.core.common.coroutines.PacketHandlerScope
13+
import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope
1314
import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl
1415
import dev.slne.surf.cloud.core.common.netty.network.protocol.common.ClientboundSetVelocitySecretPacket
1516
import dev.slne.surf.cloud.core.common.netty.network.protocol.running.*
1617
import dev.slne.surf.cloud.core.common.netty.network.protocol.synchronizing.*
1718
import dev.slne.surf.cloud.core.common.netty.registry.listener.NettyListenerRegistry
1819
import dev.slne.surf.cloud.core.common.plugin.task.CloudSynchronizeTaskManager
1920
import dev.slne.surf.surfapi.core.api.util.logger
21+
import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf
22+
import dev.slne.surf.surfapi.core.api.util.mutableObjectSetOf
2023
import kotlinx.coroutines.launch
24+
import net.kyori.adventure.nbt.BinaryTagIO
25+
import java.util.*
26+
import java.util.concurrent.atomic.AtomicBoolean
2127

2228
class ClientSynchronizingPacketListenerImpl(
2329
override val client: ClientNettyClientImpl,
@@ -27,11 +33,18 @@ class ClientSynchronizingPacketListenerImpl(
2733
) : ClientCommonPacketListenerImpl(connection), ClientSynchronizingPacketListener {
2834

2935
private val log = logger()
36+
private val hydratingPlayers = AtomicBoolean(false)
37+
private val pendingHydrationPlayers =
38+
mutableObjectListOf<ClientboundSyncPlayerHydrationChunkPacket.Entry>()
39+
40+
private var currentLargePpdcUuid: UUID? = null
41+
private var currentLargePpdc: ByteArray? = null
42+
private val pendingLargePpdcs = mutableObjectSetOf<UUID>()
3043

3144
fun startSynchronizing() {
3245
statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZING)
3346

34-
BeforeStartTaskScope.launch {
47+
SynchronizeTasksScope.launch {
3548
CloudSynchronizeTaskManager.executeTasks(client)
3649

3750
statusUpdater.switchState(AbstractStatusUpdater.State.SYNCHRONIZE_WAIT_FOR_SERVER)
@@ -134,6 +147,117 @@ class ClientSynchronizingPacketListenerImpl(
134147
TODO("Not yet implemented")
135148
}
136149

150+
override fun handleSyncPlayerHydrationStart(packet: ClientboundSyncPlayerHydrationStartPacket) {
151+
if (!hydratingPlayers.compareAndSet(false, true)) {
152+
log.atWarning()
153+
.log("Tried to start player hydration twice")
154+
return
155+
}
156+
}
157+
158+
override fun handleSyncPlayerHydrationChunk(packet: ClientboundSyncPlayerHydrationChunkPacket) {
159+
if (!hydratingPlayers.get()) {
160+
log.atWarning()
161+
.log("Received player hydration chunk before start")
162+
return
163+
}
164+
165+
pendingHydrationPlayers.addAll(packet.entries)
166+
}
167+
168+
override fun handleSyncPlayerHydrationEnd(packet: ClientboundSyncPlayerHydrationEndPacket) {
169+
if (!hydratingPlayers.compareAndSet(true, false)) {
170+
log.atWarning()
171+
.log("Tried to end player hydration twice")
172+
return
173+
}
174+
175+
for (data in pendingHydrationPlayers) {
176+
val player = commonPlayerManagerImpl.createExistingPlayer(
177+
data.uuid,
178+
data.name,
179+
data.playerIp,
180+
data.serverName,
181+
data.proxyName
182+
)
183+
184+
data.pdcOrCallback.ifLeft { tag ->
185+
player.overwritePpdc(tag)
186+
}.ifRight { callback ->
187+
pendingLargePpdcs.add(callback)
188+
}
189+
}
190+
191+
pendingHydrationPlayers.clear()
192+
}
193+
194+
override fun handleSyncLargerPlayerPersistentDataContainerStart(packet: ClientboundSyncLargePlayerPersistentDataContainerStartPacket) {
195+
if (currentLargePpdcUuid != null) {
196+
log.atWarning()
197+
.log("Received start of large PPD container before end of previous one (%s)", currentLargePpdcUuid)
198+
return
199+
}
200+
201+
currentLargePpdcUuid = packet.playerUuid
202+
currentLargePpdc = null
203+
}
204+
205+
override fun handleSyncLargerPlayerPersistentDataContainerChunk(packet: ClientboundSyncLargePlayerPersistentDataContainerChunkPacket) {
206+
if (currentLargePpdcUuid == null) {
207+
log.atWarning()
208+
.log("Received chunk of large PPD container before start")
209+
return
210+
}
211+
212+
val existing = currentLargePpdc
213+
val payload = packet.payload
214+
215+
currentLargePpdc = if (existing == null) {
216+
payload
217+
} else {
218+
existing + payload
219+
}
220+
}
221+
222+
override fun handleSyncLargerPlayerPersistentDataContainerEnd(packet: ClientboundSyncLargePlayerPersistentDataContainerEndPacket) {
223+
val uuid = currentLargePpdcUuid
224+
val payload = currentLargePpdc
225+
if (uuid == null || payload == null) {
226+
log.atWarning()
227+
.log("Received end of large PPD container before start")
228+
return
229+
}
230+
231+
currentLargePpdcUuid = null
232+
currentLargePpdc = null
233+
234+
pendingLargePpdcs.remove(uuid)
235+
val player = commonPlayerManagerImpl.getPlayer(uuid)
236+
if (player == null) {
237+
log.atWarning()
238+
.log("Received large PPD container end for unknown player (%s)", uuid)
239+
return
240+
}
241+
242+
val tag = payload.inputStream().use { stream ->
243+
BinaryTagIO.reader().read(stream)
244+
}
245+
246+
player.overwritePpdc(tag)
247+
}
248+
249+
override fun handleSynchronizePlayerMutes(packet: ClientboundSynchronizePlayerMutes) {
250+
val player = commonPlayerManagerImpl.getPlayer(packet.playerUuid)
251+
252+
if (player == null) {
253+
log.atWarning()
254+
.log("Received mute update for unknown player (%s)", packet.playerUuid)
255+
return
256+
}
257+
258+
player.punishmentManager.updateMutes(packet.mutes)
259+
}
260+
137261
override fun handlePacket(packet: NettyPacket) {
138262
val listeners = NettyListenerRegistry.getListeners(packet.javaClass) ?: return
139263
if (listeners.isEmpty()) return

surf-cloud-core/surf-cloud-core-client/src/main/kotlin/dev/slne/surf/cloud/core/client/player/CommonClientCloudPlayerManagerImpl.kt

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,17 +25,11 @@ abstract class CommonClientCloudPlayerManagerImpl<Platform : Audience, P : Clien
2525
player.serverName = serverName
2626
}
2727

28-
override fun removeProxyServer(
29-
player: P,
30-
serverName: String
31-
) {
28+
override fun removeProxyServer(player: P) {
3229
player.proxyServerName = null
3330
}
3431

35-
override fun removeServer(
36-
player: P,
37-
serverName: String
38-
) {
32+
override fun removeServer(player: P) {
3933
player.serverName = null
4034
}
4135

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/coroutines/scopes.kt

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package dev.slne.surf.cloud.core.common.coroutines
22

33
import dev.slne.surf.cloud.api.common.util.threadFactory
4-
import dev.slne.surf.cloud.core.common.coroutines.BeforeStartTaskScope.unnamedTask
4+
import dev.slne.surf.cloud.core.common.coroutines.SynchronizeTasksScope.unnamedTask
55
import dev.slne.surf.surfapi.core.api.util.logger
66
import dev.slne.surf.surfapi.core.api.util.mutableObjectListOf
77
import kotlinx.coroutines.*
@@ -172,14 +172,14 @@ object PunishmentCacheRefreshScope : BaseScope(
172172
name = "punishment-cache-refresh"
173173
)
174174

175-
object BeforeStartTaskScope : BaseScope(
176-
dispatcher = Dispatchers.IO,
177-
name = "before-start-task",
175+
object SynchronizeTasksScope : BaseScope(
176+
dispatcher = Dispatchers.Default,
177+
name = "synchronize-tasks",
178178
coroutineExceptionHandler = CoroutineExceptionHandler { context, throwable ->
179179
val task = context[TaskName] ?: unnamedTask
180180
log.atWarning()
181181
.withCause(throwable)
182-
.log("Unhandled exception in before start task: $task")
182+
.log("Unhandled exception in synchronize task: $task")
183183
}
184184
) {
185185
@JvmStatic

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/CommonNettyClientImpl.kt

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -7,35 +7,18 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket
77
import dev.slne.surf.cloud.api.common.server.CloudServer
88
import dev.slne.surf.cloud.api.common.server.CommonCloudServer
99
import dev.slne.surf.cloud.core.common.netty.network.ConnectionImpl
10-
import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf
11-
import dev.slne.surf.surfapi.core.api.util.synchronize
1210
import kotlinx.coroutines.CompletableDeferred
1311
import java.net.InetSocketAddress
12+
import java.util.concurrent.ConcurrentLinkedQueue
1413
import kotlin.time.Duration
1514

1615
abstract class CommonNettyClientImpl(
1716
override val serverCategory: String,
1817
override val serverName: String
1918
) : NettyClient {
20-
private val packetQueue by lazy { mutableObject2ObjectMapOf<NettyPacket, CompletableDeferred<Boolean>?>().synchronize() }
21-
19+
@Volatile
2220
private var _connection: ConnectionImpl? = null
23-
set(value) {
24-
field = value
25-
26-
if (value != null) {
27-
synchronized(packetQueue) {
28-
packetQueue.forEach { (packet, deferred) ->
29-
if (deferred != null) {
30-
value.sendWithIndication(packet, deferred = deferred)
31-
} else {
32-
value.send(packet)
33-
}
34-
}
35-
packetQueue.clear()
36-
}
37-
}
38-
}
21+
private val packetQueue = ConcurrentLinkedQueue<QueuedPacket>()
3922

4023
override val connection get() = _connection ?: error("connection not yet set")
4124

@@ -47,29 +30,39 @@ abstract class CommonNettyClientImpl(
4730

4831
override fun fireAndForget(packet: NettyPacket) {
4932
val connection = _connection
50-
if (connection == null) {
51-
packetQueue[packet] = null
52-
} else {
33+
if (connection != null) {
5334
connection.send(packet)
35+
return
36+
}
37+
38+
packetQueue.add(QueuedPacket(packet, null))
39+
40+
val connectionNow = _connection
41+
if (connectionNow != null) {
42+
drainQueue(connectionNow)
5443
}
5544
}
5645

5746
override suspend fun fire(packet: NettyPacket, convertExceptions: Boolean): Boolean {
5847
val connection = _connection
59-
if (connection == null) {
60-
val result = runCatching {
61-
val deferred = CompletableDeferred<Boolean>()
62-
packetQueue[packet] = deferred
63-
deferred.await()
64-
}
48+
if (connection != null) {
49+
return connection.sendWithIndication(packet, convertExceptions)
50+
}
6551

66-
if (convertExceptions) {
67-
return result.getOrDefault(false)
68-
}
52+
val deferred = CompletableDeferred<Boolean>()
53+
packetQueue.add(QueuedPacket(packet, deferred))
6954

70-
return result.getOrThrow()
55+
val connectionNow = _connection
56+
if (connectionNow != null) {
57+
drainQueue(connectionNow)
58+
}
59+
60+
val result = runCatching { deferred.await() }
61+
62+
return if (convertExceptions) {
63+
result.getOrDefault(false)
7164
} else {
72-
return connection.sendWithIndication(packet, convertExceptions)
65+
result.getOrThrow()
7366
}
7467
}
7568

@@ -85,5 +78,24 @@ abstract class CommonNettyClientImpl(
8578
fun initConnection(connection: ConnectionImpl) {
8679
check(_connection == null) { "Connection already set" }
8780
_connection = connection
81+
drainQueue(connection)
82+
}
83+
84+
85+
private fun drainQueue(connection: ConnectionImpl) {
86+
while (true) {
87+
val queued = packetQueue.poll() ?: break
88+
val (packet, deferred) = queued
89+
if (deferred != null) {
90+
connection.sendWithIndication(packet, deferred = deferred)
91+
} else {
92+
connection.send(packet)
93+
}
94+
}
8895
}
96+
97+
private data class QueuedPacket(
98+
val packet: NettyPacket,
99+
val deferred: CompletableDeferred<Boolean>?
100+
)
89101
}

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/ConnectionImpl.kt

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -712,7 +712,6 @@ class ConnectionImpl(
712712
handleDisconnection()
713713
}
714714

715-
716715
this.averageSentPackets = lerp(
717716
0.75f, this.sentPackets.toFloat(),
718717
this.averageSentPackets

surf-cloud-core/surf-cloud-core-common/src/main/kotlin/dev/slne/surf/cloud/core/common/netty/network/RespondingPacketSendHandler.kt

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import dev.slne.surf.cloud.api.common.netty.packet.ResponseNettyPacket
66
import dev.slne.surf.cloud.api.common.util.netty.UnifiedReadOnlyChannelHandler
77
import dev.slne.surf.surfapi.core.api.util.logger
88
import dev.slne.surf.surfapi.core.api.util.mutableObject2ObjectMapOf
9-
import dev.slne.surf.surfapi.core.api.util.synchronize
109
import io.netty.channel.ChannelHandlerContext
1110
import io.netty.channel.ChannelPromise
1211
import kotlinx.coroutines.CompletableDeferred
@@ -16,7 +15,7 @@ import java.util.*
1615
class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler<NettyPacket>() {
1716
private val log = logger()
1817
private val respondingPackets =
19-
mutableObject2ObjectMapOf<UUID, CompletableDeferred<ResponseNettyPacket>>().synchronize()
18+
mutableObject2ObjectMapOf<UUID, CompletableDeferred<ResponseNettyPacket>>()
2019

2120
@Suppress("DEPRECATION")
2221
override fun handleRead(
@@ -49,7 +48,7 @@ class RespondingPacketSendHandler : UnifiedReadOnlyChannelHandler<NettyPacket>()
4948
}
5049
}
5150

52-
@Suppress("DEPRECATION")
51+
@Suppress("UNCHECKED_CAST")
5352
override fun handleWrite(
5453
ctx: ChannelHandlerContext,
5554
msg: NettyPacket,

0 commit comments

Comments
 (0)