diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironment.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironment.kt index 3579ea8..b2e0049 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironment.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironment.kt @@ -46,7 +46,7 @@ abstract class ServerEnvironment( /** * Get a [Server] by unique id (returns null if the server is not running on this environment) */ - abstract fun getServer(uniqueId: String): Server? + abstract suspend fun getServer(uniqueId: String): Server? /** * Reattaches a [Server] and returns true if the server is successfully reattached @@ -78,11 +78,7 @@ abstract class ServerEnvironment( /** * Returns all servers currently known to this environment. */ - abstract fun getServers(): List - - open fun getServerCache(): MutableMap { - return mutableMapOf() - } + abstract suspend fun getServers(): List open fun executeTemplate( dir: Path, @@ -102,22 +98,6 @@ abstract class ServerEnvironment( return null } - /** - * Updates the cached server for this environment. - */ - open fun updateServerCache(uniqueId: String, server: Server) { - @Suppress("UNCHECKED_CAST") - val cache = getServerCache() as MutableMap - val key = cache.keys.find { it.uniqueId == uniqueId } - if (key == null) { - logger.warn("Server ${server.group}-${server.numericalId} could not be updated in cache") - return - } - val value = cache[key]!! - cache.remove(key) - cache[server] = value - } - fun getEnvironment(server: Server): EnvironmentConfig? { return environmentRepository.get(runtimeRepository.get(server.group)) } @@ -150,8 +130,6 @@ abstract class ServerEnvironment( if (serverDefinition.cloudProperties.containsKey("player-count-ping") && serverDefinition.cloudProperties["player-count-ping"] == "skip") { if (serverDefinition.playerCount != ping.players.online.toLong()) { logger.warn("Player count mismatch for ${serverDefinition.uniqueId}: ${serverDefinition.playerCount} != ${ping.players.online}") - } else { - logger.info("Player count skipped for ${serverDefinition.uniqueId}") } return serverDefinition.playerCount diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironments.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironments.kt index 07b9794..084d79b 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironments.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/ServerEnvironments.kt @@ -55,14 +55,14 @@ class ServerEnvironments( /** * Gets the environment a server is running on or null if the server is not running in any environment */ - fun of(server: Server): ServerEnvironment? { + suspend fun of(server: Server): ServerEnvironment? { return of(server.uniqueId) } /** * Gets the environment a server is running on or null if the server is not running in any environment */ - fun of(uniqueId: String): ServerEnvironment? { + suspend fun of(uniqueId: String): ServerEnvironment? { return envs.firstOrNull { val server = it.getServer(uniqueId) ?: return@firstOrNull false val env = it.getEnvironment(server) ?: return@firstOrNull false @@ -106,27 +106,29 @@ class ServerEnvironments( return CoroutineScope(Dispatchers.IO).launch { while (isActive) { envs.forEach { env -> - env.getServers().toList().forEach { - var delete = false - var server = it - try { - val updated = env.updateServer(it) - if (updated == null) { - delete = true - env.stopServer(server) - } else { - server = updated - env.updateServerCache(updated.uniqueId, updated) + env.getServers() + .filter { it.port > 0 && it.state != ServerState.PREPARING} + .forEach { + var delete = false + var server = it + try { + val updated = env.updateServer(it) + if (updated == null) { + delete = true + env.stopServer(server) + } else { + server = updated +// env.updateServerCache(updated.uniqueId, updated) + } + controllerStub.updateServer( + UpdateServerRequest.newBuilder() + .setServer(server.toDefinition()) + .setDeleted(delete).build() + ) + } catch (e: Exception) { + logger.error("An error occurred whilst updating the server:", e) } - controllerStub.updateServer( - UpdateServerRequest.newBuilder() - .setServer(server.toDefinition()) - .setDeleted(delete).build() - ) - } catch (e: Exception) { - logger.error("An error occurred whilst updating the server:", e) } - } } delay(5000L) } @@ -153,4 +155,5 @@ class ServerEnvironments( ).build() ) } + } \ No newline at end of file diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/docker/DockerServerEnvironment.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/docker/DockerServerEnvironment.kt index 52b7292..05340d5 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/docker/DockerServerEnvironment.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/docker/DockerServerEnvironment.kt @@ -51,10 +51,6 @@ class DockerServerEnvironment( return serverToContainer.any { it.key.uniqueId == server.uniqueId } } - override fun getServerCache(): MutableMap { - return serverToContainer - } - /** * The reason this method exists is so no errors are thrown if docker is not installed until the environment is used */ @@ -212,7 +208,7 @@ class DockerServerEnvironment( return server.properties["docker-container-id"]!! } - override fun getServer(uniqueId: String): Server? { + override suspend fun getServer(uniqueId: String): Server? { return serverToContainer.keys.find { it.uniqueId == uniqueId } } @@ -335,7 +331,8 @@ class DockerServerEnvironment( return server.properties.containsKey("docker-image") && server.properties.containsKey("docker-tag") } - override fun getServers(): List { + override suspend fun getServers(): List { return serverToContainer.keys.toList() } + } \ No newline at end of file diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/process/ProcessServerEnvironment.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/process/ProcessServerEnvironment.kt index dd6ca34..4027283 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/process/ProcessServerEnvironment.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/environment/process/ProcessServerEnvironment.kt @@ -22,10 +22,7 @@ import app.simplecloud.droplet.serverhost.shared.logs.DefaultLogStreamer import app.simplecloud.droplet.serverhost.shared.logs.ScreenCommandExecutor import app.simplecloud.droplet.serverhost.shared.logs.ScreenConfigurer import app.simplecloud.droplet.serverhost.shared.process.ProcessFinder -import build.buf.gen.simplecloud.controller.v1.ControllerServerServiceGrpcKt -import build.buf.gen.simplecloud.controller.v1.ServerHostStreamServerLogsResponse -import build.buf.gen.simplecloud.controller.v1.copy -import build.buf.gen.simplecloud.controller.v1.updateServerRequest +import build.buf.gen.simplecloud.controller.v1.* import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow @@ -61,22 +58,24 @@ class ProcessServerEnvironment( private val logger = LogManager.getLogger(ServerHostRuntime::class.java) - private val serverToProcessHandle = mutableMapOf() + private val serverToProcessHandle = mutableMapOf() private fun containsServer(server: Server): Boolean { - return serverToProcessHandle.any { it.key.uniqueId == server.uniqueId } + return serverToProcessHandle.any { it.key == server.uniqueId } } private fun containsServer(uniqueId: String): Boolean { - return serverToProcessHandle.any { it.key.uniqueId == uniqueId } + return serverToProcessHandle.any { it.key == uniqueId } } - override fun getServer(uniqueId: String): Server? { - return serverToProcessHandle.keys.find { it.uniqueId == uniqueId } - } - - override fun getServerCache(): MutableMap { - return serverToProcessHandle + override suspend fun getServer(uniqueId: String): Server? { + try { + return Server.fromDefinition(controllerStub.getServerById(getServerByIdRequest { + this.serverId = uniqueId + })) + } catch (e: Exception) { + return null + } } override suspend fun updateServer(server: Server): Server? { @@ -90,11 +89,11 @@ class ProcessServerEnvironment( exe ).orElse(null) } - if (realProcess != null && serverToProcessHandle[server] != realProcess) { + if (realProcess != null && serverToProcessHandle[server.uniqueId] != realProcess) { logger.info("Found updated process handle with PID ${realProcess.pid()} for ${server.group}-${server.numericalId}") - serverToProcessHandle[server] = realProcess + serverToProcessHandle[server.uniqueId] = realProcess } - return@let serverToProcessHandle[server] + return@let serverToProcessHandle[server.uniqueId] } val address = InetSocketAddress(server.ip, server.port.toInt()) @@ -201,7 +200,7 @@ class ProcessServerEnvironment( this.deleted = false }) } - serverToProcessHandle[updatedServer] = process.toHandle() + serverToProcessHandle[updatedServer.uniqueId] = process.toHandle() logger.info("Server ${server.uniqueId} of group ${server.group} now running on PID ${process.pid()}") return true } catch (e: Exception) { @@ -237,7 +236,7 @@ class ProcessServerEnvironment( return false } - val process = serverToProcessHandle[server] + val process = serverToProcessHandle[server.uniqueId] if (process == null) { logger.error("Could not find server process of server ${server.group}-${server.numericalId}") return false @@ -269,7 +268,7 @@ class ProcessServerEnvironment( stopTries[uniqueId] = stopTries.getOrDefault(uniqueId, 0) + 1 return try { process.onExit().await() - serverToProcessHandle.remove(server) + serverToProcessHandle.remove(server.uniqueId) stopTries.remove(uniqueId) true } catch (e: Exception) { @@ -280,7 +279,7 @@ class ProcessServerEnvironment( private fun getProcess(uniqueId: String): ProcessHandle? { return serverToProcessHandle.getOrDefault( - serverToProcessHandle.keys.firstOrNull { it.uniqueId == uniqueId }, + serverToProcessHandle.keys.firstOrNull { it == uniqueId }, null ) } @@ -309,7 +308,7 @@ class ProcessServerEnvironment( return false } - serverToProcessHandle[server] = handle + serverToProcessHandle[server.uniqueId] = handle logger.info("Server ${server.uniqueId} of group ${server.group} successfully reattached on PID ${handle.pid()}") return true } @@ -391,8 +390,8 @@ class ProcessServerEnvironment( return builder } - override fun getServers(): List { - return serverToProcessHandle.keys.toList() + override suspend fun getServers(): List { + return controllerStub.getAllServers(getAllServersRequest { }).serversList.map { Server.fromDefinition(it) } } } diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerHostService.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerHostService.kt index 174f4c1..ff4d470 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerHostService.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerHostService.kt @@ -12,6 +12,7 @@ import io.grpc.Status import io.grpc.StatusException import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow import kotlinx.coroutines.withContext import java.io.FileInputStream import java.io.FileWriter @@ -71,13 +72,18 @@ class ServerHostService( } override fun streamServerLogs(request: ServerHostStreamServerLogsRequest): Flow { - val env = envs.of(request.serverId) - ?: throw StatusException(Status.NOT_FOUND.withDescription("Server not found")) - - try { - return env.streamLogs(env.getServer(request.serverId)!!) - } catch (e: Exception) { - throw StatusException(Status.INTERNAL.withDescription("Could not stream logs: ${e.message}")) + return flow { + val env = envs.of(request.serverId) + ?: throw StatusException(Status.NOT_FOUND.withDescription("Server not found")) + + try { + env.streamLogs(env.getServer(request.serverId)!!) + .collect { response -> + emit(response) + } + } catch (e: Exception) { + throw StatusException(Status.INTERNAL.withDescription("Could not stream logs: ${e.message}")) + } } } diff --git a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerOperationReconciler.kt b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerOperationReconciler.kt index 3aa0f7f..c5a42ac 100644 --- a/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerOperationReconciler.kt +++ b/serverhost-runtime/src/main/kotlin/app/simplecloud/droplet/serverhost/runtime/host/ServerOperationReconciler.kt @@ -120,7 +120,7 @@ class ServerOperationReconciler( } ?: logger.warn("Timeout waiting for server to come online: {}", server.uniqueId) } - private fun isServerOnline(env: ServerEnvironment, server: ServerDefinition): Boolean { + private suspend fun isServerOnline(env: ServerEnvironment, server: ServerDefinition): Boolean { return try { val currentServer = env.getServer(server.uniqueId) ?: return false currentServer.state == ServerState.AVAILABLE || currentServer.state == ServerState.INGAME @@ -152,4 +152,5 @@ class ServerOperationReconciler( logger.info("Successfully started server: {}", server.uniqueId) return server } + }