Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ abstract class ServerEnvironment(
/**
* Get a [Server] by unique id (returns null if the server is not running on this environment)
*/
abstract fun getServer(uniqueId: String): Server?
abstract suspend fun getServer(uniqueId: String): Server?

/**
* Reattaches a [Server] and returns true if the server is successfully reattached
Expand Down Expand Up @@ -78,11 +78,7 @@ abstract class ServerEnvironment(
/**
* Returns all servers currently known to this environment.
*/
abstract fun getServers(): List<Server>

open fun getServerCache(): MutableMap<Server, *> {
return mutableMapOf<Server, String>()
}
abstract suspend fun getServers(): List<Server>

open fun executeTemplate(
dir: Path,
Expand All @@ -102,22 +98,6 @@ abstract class ServerEnvironment(
return null
}

/**
* Updates the cached server for this environment.
*/
open fun updateServerCache(uniqueId: String, server: Server) {
@Suppress("UNCHECKED_CAST")
val cache = getServerCache() as MutableMap<Server, Any>
val key = cache.keys.find { it.uniqueId == uniqueId }
if (key == null) {
logger.warn("Server ${server.group}-${server.numericalId} could not be updated in cache")
return
}
val value = cache[key]!!
cache.remove(key)
cache[server] = value
}

fun getEnvironment(server: Server): EnvironmentConfig? {
return environmentRepository.get(runtimeRepository.get(server.group))
}
Expand Down Expand Up @@ -150,8 +130,6 @@ abstract class ServerEnvironment(
if (serverDefinition.cloudProperties.containsKey("player-count-ping") && serverDefinition.cloudProperties["player-count-ping"] == "skip") {
if (serverDefinition.playerCount != ping.players.online.toLong()) {
logger.warn("Player count mismatch for ${serverDefinition.uniqueId}: ${serverDefinition.playerCount} != ${ping.players.online}")
} else {
logger.info("Player count skipped for ${serverDefinition.uniqueId}")
}

return serverDefinition.playerCount
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ class ServerEnvironments(
/**
* Gets the environment a server is running on or null if the server is not running in any environment
*/
fun of(server: Server): ServerEnvironment? {
suspend fun of(server: Server): ServerEnvironment? {
return of(server.uniqueId)
}

/**
* Gets the environment a server is running on or null if the server is not running in any environment
*/
fun of(uniqueId: String): ServerEnvironment? {
suspend fun of(uniqueId: String): ServerEnvironment? {
return envs.firstOrNull {
val server = it.getServer(uniqueId) ?: return@firstOrNull false
val env = it.getEnvironment(server) ?: return@firstOrNull false
Expand Down Expand Up @@ -106,27 +106,29 @@ class ServerEnvironments(
return CoroutineScope(Dispatchers.IO).launch {
while (isActive) {
envs.forEach { env ->
env.getServers().toList().forEach {
var delete = false
var server = it
try {
val updated = env.updateServer(it)
if (updated == null) {
delete = true
env.stopServer(server)
} else {
server = updated
env.updateServerCache(updated.uniqueId, updated)
env.getServers()
.filter { it.port > 0 && it.state != ServerState.PREPARING}
.forEach {
var delete = false
var server = it
try {
val updated = env.updateServer(it)
if (updated == null) {
delete = true
env.stopServer(server)
} else {
server = updated
// env.updateServerCache(updated.uniqueId, updated)
}
controllerStub.updateServer(
UpdateServerRequest.newBuilder()
.setServer(server.toDefinition())
.setDeleted(delete).build()
)
} catch (e: Exception) {
logger.error("An error occurred whilst updating the server:", e)
}
controllerStub.updateServer(
UpdateServerRequest.newBuilder()
.setServer(server.toDefinition())
.setDeleted(delete).build()
)
} catch (e: Exception) {
logger.error("An error occurred whilst updating the server:", e)
}
}
}
delay(5000L)
}
Expand All @@ -153,4 +155,5 @@ class ServerEnvironments(
).build()
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,6 @@ class DockerServerEnvironment(
return serverToContainer.any { it.key.uniqueId == server.uniqueId }
}

override fun getServerCache(): MutableMap<Server, *> {
return serverToContainer
}

/**
* The reason this method exists is so no errors are thrown if docker is not installed until the environment is used
*/
Expand Down Expand Up @@ -212,7 +208,7 @@ class DockerServerEnvironment(
return server.properties["docker-container-id"]!!
}

override fun getServer(uniqueId: String): Server? {
override suspend fun getServer(uniqueId: String): Server? {
return serverToContainer.keys.find { it.uniqueId == uniqueId }
}

Expand Down Expand Up @@ -335,7 +331,8 @@ class DockerServerEnvironment(
return server.properties.containsKey("docker-image") && server.properties.containsKey("docker-tag")
}

override fun getServers(): List<Server> {
override suspend fun getServers(): List<Server> {
return serverToContainer.keys.toList()
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,7 @@ import app.simplecloud.droplet.serverhost.shared.logs.DefaultLogStreamer
import app.simplecloud.droplet.serverhost.shared.logs.ScreenCommandExecutor
import app.simplecloud.droplet.serverhost.shared.logs.ScreenConfigurer
import app.simplecloud.droplet.serverhost.shared.process.ProcessFinder
import build.buf.gen.simplecloud.controller.v1.ControllerServerServiceGrpcKt
import build.buf.gen.simplecloud.controller.v1.ServerHostStreamServerLogsResponse
import build.buf.gen.simplecloud.controller.v1.copy
import build.buf.gen.simplecloud.controller.v1.updateServerRequest
import build.buf.gen.simplecloud.controller.v1.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
Expand Down Expand Up @@ -61,22 +58,24 @@ class ProcessServerEnvironment(

private val logger = LogManager.getLogger(ServerHostRuntime::class.java)

private val serverToProcessHandle = mutableMapOf<Server, ProcessHandle>()
private val serverToProcessHandle = mutableMapOf<String, ProcessHandle>()

private fun containsServer(server: Server): Boolean {
return serverToProcessHandle.any { it.key.uniqueId == server.uniqueId }
return serverToProcessHandle.any { it.key == server.uniqueId }
}

private fun containsServer(uniqueId: String): Boolean {
return serverToProcessHandle.any { it.key.uniqueId == uniqueId }
return serverToProcessHandle.any { it.key == uniqueId }
}

override fun getServer(uniqueId: String): Server? {
return serverToProcessHandle.keys.find { it.uniqueId == uniqueId }
}

override fun getServerCache(): MutableMap<Server, *> {
return serverToProcessHandle
override suspend fun getServer(uniqueId: String): Server? {
try {
return Server.fromDefinition(controllerStub.getServerById(getServerByIdRequest {
this.serverId = uniqueId
}))
} catch (e: Exception) {
return null
}
}

override suspend fun updateServer(server: Server): Server? {
Expand All @@ -90,11 +89,11 @@ class ProcessServerEnvironment(
exe
).orElse(null)
}
if (realProcess != null && serverToProcessHandle[server] != realProcess) {
if (realProcess != null && serverToProcessHandle[server.uniqueId] != realProcess) {
logger.info("Found updated process handle with PID ${realProcess.pid()} for ${server.group}-${server.numericalId}")
serverToProcessHandle[server] = realProcess
serverToProcessHandle[server.uniqueId] = realProcess
}
return@let serverToProcessHandle[server]
return@let serverToProcessHandle[server.uniqueId]
}

val address = InetSocketAddress(server.ip, server.port.toInt())
Expand Down Expand Up @@ -201,7 +200,7 @@ class ProcessServerEnvironment(
this.deleted = false
})
}
serverToProcessHandle[updatedServer] = process.toHandle()
serverToProcessHandle[updatedServer.uniqueId] = process.toHandle()
logger.info("Server ${server.uniqueId} of group ${server.group} now running on PID ${process.pid()}")
return true
} catch (e: Exception) {
Expand Down Expand Up @@ -237,7 +236,7 @@ class ProcessServerEnvironment(
return false
}

val process = serverToProcessHandle[server]
val process = serverToProcessHandle[server.uniqueId]
if (process == null) {
logger.error("Could not find server process of server ${server.group}-${server.numericalId}")
return false
Expand Down Expand Up @@ -269,7 +268,7 @@ class ProcessServerEnvironment(
stopTries[uniqueId] = stopTries.getOrDefault(uniqueId, 0) + 1
return try {
process.onExit().await()
serverToProcessHandle.remove(server)
serverToProcessHandle.remove(server.uniqueId)
stopTries.remove(uniqueId)
true
} catch (e: Exception) {
Expand All @@ -280,7 +279,7 @@ class ProcessServerEnvironment(

private fun getProcess(uniqueId: String): ProcessHandle? {
return serverToProcessHandle.getOrDefault(
serverToProcessHandle.keys.firstOrNull { it.uniqueId == uniqueId },
serverToProcessHandle.keys.firstOrNull { it == uniqueId },
null
)
}
Expand Down Expand Up @@ -309,7 +308,7 @@ class ProcessServerEnvironment(
return false
}

serverToProcessHandle[server] = handle
serverToProcessHandle[server.uniqueId] = handle
logger.info("Server ${server.uniqueId} of group ${server.group} successfully reattached on PID ${handle.pid()}")
return true
}
Expand Down Expand Up @@ -391,8 +390,8 @@ class ProcessServerEnvironment(
return builder
}

override fun getServers(): List<Server> {
return serverToProcessHandle.keys.toList()
override suspend fun getServers(): List<Server> {
return controllerStub.getAllServers(getAllServersRequest { }).serversList.map { Server.fromDefinition(it) }
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import io.grpc.Status
import io.grpc.StatusException
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.withContext
import java.io.FileInputStream
import java.io.FileWriter
Expand Down Expand Up @@ -71,13 +72,18 @@ class ServerHostService(
}

override fun streamServerLogs(request: ServerHostStreamServerLogsRequest): Flow<ServerHostStreamServerLogsResponse> {
val env = envs.of(request.serverId)
?: throw StatusException(Status.NOT_FOUND.withDescription("Server not found"))

try {
return env.streamLogs(env.getServer(request.serverId)!!)
} catch (e: Exception) {
throw StatusException(Status.INTERNAL.withDescription("Could not stream logs: ${e.message}"))
return flow {
val env = envs.of(request.serverId)
?: throw StatusException(Status.NOT_FOUND.withDescription("Server not found"))

try {
env.streamLogs(env.getServer(request.serverId)!!)
.collect { response ->
emit(response)
}
} catch (e: Exception) {
throw StatusException(Status.INTERNAL.withDescription("Could not stream logs: ${e.message}"))
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class ServerOperationReconciler(
} ?: logger.warn("Timeout waiting for server to come online: {}", server.uniqueId)
}

private fun isServerOnline(env: ServerEnvironment, server: ServerDefinition): Boolean {
private suspend fun isServerOnline(env: ServerEnvironment, server: ServerDefinition): Boolean {
return try {
val currentServer = env.getServer(server.uniqueId) ?: return false
currentServer.state == ServerState.AVAILABLE || currentServer.state == ServerState.INGAME
Expand Down Expand Up @@ -152,4 +152,5 @@ class ServerOperationReconciler(
logger.info("Successfully started server: {}", server.uniqueId)
return server
}

}