From 9351129c4f980c4f91fe2d6ae7b3f4363f9b1f80 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Wed, 26 Mar 2025 09:29:11 +0000 Subject: [PATCH 1/4] [JetBrains] Improve port forwarding logic Tool: gitpod/catfood.gitpod.cloud --- .../AbstractGitpodPortForwardingService.kt | 232 ++++++++++++++---- 1 file changed, 188 insertions(+), 44 deletions(-) diff --git a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt index b17993601bef78..735275d392fae9 100644 --- a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt +++ b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt @@ -13,6 +13,7 @@ import com.intellij.util.application import com.jetbrains.rd.platform.codeWithMe.portForwarding.* import com.jetbrains.rd.util.URI import com.jetbrains.rd.util.lifetime.Lifetime +import com.jetbrains.rd.util.lifetime.LifetimeDefinition import io.gitpod.supervisor.api.Status import io.gitpod.supervisor.api.Status.PortsStatus import io.gitpod.supervisor.api.StatusServiceGrpc @@ -23,18 +24,34 @@ import kotlinx.coroutines.future.asDeferred import org.apache.http.client.utils.URIBuilder import java.util.* import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.sync.Semaphore +import kotlinx.coroutines.sync.withPermit @Suppress("UnstableApiUsage") abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService { companion object { const val FORWARDED_PORT_LABEL = "ForwardedByGitpod" const val EXPOSED_PORT_LABEL = "ExposedByGitpod" + private const val MAX_CONCURRENT_OPERATIONS = 10 + private const val BATCH_SIZE = 10 + private const val BATCH_DELAY = 100L + private const val DEBOUNCE_DELAY = 500L } private val perClientPortForwardingManager = service() private val ignoredPortsForNotificationService = service() private val lifetime = Lifetime.Eternal.createNested() + // Store current observed ports and their lifetime references + private val portLifetimes = ConcurrentHashMap() + + // Debounce job for port updates + private var debounceJob: Job? = null + + // Semaphore to limit concurrent operations + private val operationSemaphore = Semaphore(MAX_CONCURRENT_OPERATIONS) + init { start() } private fun start() { @@ -58,7 +75,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService is InterruptedException, is CancellationException -> { cancel("gitpod: Stopped observing ports list due to an expected interruption.") } - else -> { thisLogger().warn( "gitpod: Got an error while trying to get ports list from Supervisor. " + @@ -86,7 +102,17 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } override fun onNext(response: Status.PortsStatusResponse) { - application.invokeLater { syncPortsListWithClient(response) } + debounceJob?.cancel() + debounceJob = runJob(lifetime) { + delay(DEBOUNCE_DELAY) + try { + syncPortsListWithClient(response) + } catch (e: Exception) { + thisLogger().error("gitpod: Error during port observation", e) + } finally { + debounceJob = null + } + } } override fun onCompleted() { @@ -114,6 +140,9 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService val servedPorts = portsList.filter { it.served } val exposedPorts = servedPorts.filter { it.exposed?.url?.isNotBlank() ?: false } val portsNumbersFromNonServedPorts = portsList.filter { !it.served }.map { it.localPort } + + val allPortsToKeep = mutableSetOf() + val servedPortsToStartForwarding = servedPorts.filter { perClientPortForwardingManager.getPorts(it.localPort).none { p -> p.labels.contains(FORWARDED_PORT_LABEL) } } @@ -127,27 +156,91 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService .map { it.hostPortNumber } .filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) } - servedPortsToStartForwarding.forEach { startForwarding(it) } + runJob(lifetime) { + processPortsInBatches(servedPortsToStartForwarding) { port -> + operationSemaphore.withPermit { + startForwarding(port) + allPortsToKeep.add(port.localPort) + } + } + + processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> + operationSemaphore.withPermit { + startExposingOnClient(port) + allPortsToKeep.add(port.localPort) + } + } - exposedPortsToStartExposingOnClient.forEach { startExposingOnClient(it) } + processPortsInBatches(forwardedPortsToStopForwarding) { port -> + operationSemaphore.withPermit { stopForwarding(port) } + } - forwardedPortsToStopForwarding.forEach { stopForwarding(it) } + processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> + operationSemaphore.withPermit { stopExposingOnClient(port) } + } - exposedPortsToStopExposingOnClient.forEach { stopExposingOnClient(it) } + processPortsInBatches(portsList) { port -> + application.invokeLater { + updatePortsPresentation(port) + allPortsToKeep.add(port.localPort) + } + } - portsList.forEach { updatePortsPresentation(it) } + cleanupUnusedLifetimes(allPortsToKeep) + } } - private fun startForwarding(portStatus: PortsStatus) { - if (isLocalPortForwardingDisabled()) { - return + private suspend fun processPortsInBatches(ports: List, action: suspend (T) -> Unit) { + ports.chunked(BATCH_SIZE).forEach { batch -> + try { + batch.forEach { port -> + try { + withTimeout(5000) { // Add timeout to prevent hanging operations + action(port) + } + } catch (e: Exception) { + thisLogger().warn("gitpod: Error processing port in batch", e) + } + } + delay(BATCH_DELAY) + } catch (e: Exception) { + thisLogger().error("gitpod: Error processing batch", e) + delay(BATCH_DELAY * 2) // Double delay on error + } + } + } + + private fun cleanupUnusedLifetimes(portsToKeep: Set) { + portLifetimes.keys.filter { !portsToKeep.contains(it) }.forEach { port -> + portLifetimes[port]?.let { lifetime -> + thisLogger().debug("gitpod: Terminating lifetime for port $port") + lifetime.terminate() + portLifetimes.remove(port) + } } + } + + private fun startForwarding(portStatus: PortsStatus) { + if (isLocalPortForwardingDisabled()) return + + val portLifetime = getOrCreatePortLifetime(portStatus.localPort) + try { - perClientPortForwardingManager.forwardPort( + thisLogger().debug("gitpod: Starting forwarding for port ${portStatus.localPort}") + val port = perClientPortForwardingManager.forwardPort( portStatus.localPort, PortType.TCP, setOf(FORWARDED_PORT_LABEL), ) + + portLifetime.onTerminationOrNow { + thisLogger().debug("gitpod: Cleaning up port ${portStatus.localPort} due to lifetime termination") + try { + perClientPortForwardingManager.removePort(port) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove port on lifetime termination", e) + } + } } catch (throwable: Throwable) { if (throwable !is PortAlreadyForwardedException) { thisLogger().warn("gitpod: Caught an exception while forwarding port: ${throwable.message}") @@ -156,62 +249,113 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } private fun stopForwarding(hostPort: Int) { - perClientPortForwardingManager.getPorts(hostPort) + thisLogger().debug("gitpod: Stopping forwarding for port $hostPort") + val portsToRemove = perClientPortForwardingManager.getPorts(hostPort) .filter { it.labels.contains(FORWARDED_PORT_LABEL) } - .forEach { perClientPortForwardingManager.removePort(it) } + + terminatePortLifetime(hostPort) + + portsToRemove.forEach { + try { + perClientPortForwardingManager.removePort(it) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove forwarded port $hostPort", e) + } + } } private fun startExposingOnClient(portStatus: PortsStatus) { - perClientPortForwardingManager.exposePort( + val portLifetime = getOrCreatePortLifetime(portStatus.localPort) + + thisLogger().debug("gitpod: Starting exposing for port ${portStatus.localPort}") + val port = perClientPortForwardingManager.exposePort( portStatus.localPort, portStatus.exposed.url, setOf(EXPOSED_PORT_LABEL), ) + + portLifetime.onTerminationOrNow { + thisLogger().debug("gitpod: Cleaning up exposed port ${portStatus.localPort} due to lifetime termination") + try { + perClientPortForwardingManager.removePort(port) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove exposed port on lifetime termination", e) + } + } } private fun stopExposingOnClient(hostPort: Int) { - perClientPortForwardingManager.getPorts(hostPort) + thisLogger().debug("gitpod: Stopping exposing for port $hostPort") + val portsToRemove = perClientPortForwardingManager.getPorts(hostPort) .filter { it.labels.contains(EXPOSED_PORT_LABEL) } - .forEach { perClientPortForwardingManager.removePort(it) } - } - private fun updatePortsPresentation(portStatus: PortsStatus) { - perClientPortForwardingManager.getPorts(portStatus.localPort).forEach { - if (it.configuration.isForwardedPort()) { - it.presentation.name = portStatus.name - it.presentation.description = portStatus.description - it.presentation.tooltip = "Forwarded" - it.presentation.icon = RowIcon(AllIcons.Actions.Commit) - } else if (it.configuration.isExposedPort()) { - val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility) - - it.presentation.name = portStatus.name - it.presentation.description = portStatus.description - it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})" - it.presentation.icon = if (isPubliclyExposed) { - RowIcon(AllIcons.Actions.Commit) - } else { - RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock) - } + terminatePortLifetime(hostPort) + + portsToRemove.forEach { + try { + perClientPortForwardingManager.removePort(it) + } catch (e: Exception) { + thisLogger().warn("gitpod: Failed to remove exposed port $hostPort", e) } } } - override fun getLocalHostUriFromHostPort(hostPort: Int): Optional { - val forwardedPort = perClientPortForwardingManager.getPorts(hostPort).firstOrNull { - it.configuration.isForwardedPort() - } ?: return Optional.empty() + private fun getOrCreatePortLifetime(port: Int): Lifetime = + portLifetimes.computeIfAbsent(port) { + thisLogger().debug("gitpod: Creating new lifetime for port $port") + lifetime.createNested() + } + + private fun terminatePortLifetime(port: Int) { + portLifetimes[port]?.let { portLifetime -> + thisLogger().debug("gitpod: Terminating lifetime for port $port") + portLifetime.terminate() + portLifetimes.remove(port) + } + } - (forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding).clientPortState.let { - return if (it is ClientPortState.Assigned) { - Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build()) - } else { - Optional.empty() + private fun updatePortsPresentation(portStatus: PortsStatus) { + perClientPortForwardingManager.getPorts(portStatus.localPort).forEach { + when { + it.configuration.isForwardedPort() -> { + it.presentation.name = portStatus.name + it.presentation.description = portStatus.description + it.presentation.tooltip = "Forwarded" + it.presentation.icon = RowIcon(AllIcons.Actions.Commit) + } + it.configuration.isExposedPort() -> { + val isPubliclyExposed = (portStatus.exposed.visibility == Status.PortVisibility.public_visibility) + it.presentation.name = portStatus.name + it.presentation.description = portStatus.description + it.presentation.tooltip = "Exposed (${if (isPubliclyExposed) "Public" else "Private"})" + it.presentation.icon = if (isPubliclyExposed) { + RowIcon(AllIcons.Actions.Commit) + } else { + RowIcon(AllIcons.Actions.Commit, AllIcons.Diff.Lock) + } + } } } } + override fun getLocalHostUriFromHostPort(hostPort: Int): Optional = + perClientPortForwardingManager.getPorts(hostPort) + .firstOrNull { it.configuration.isForwardedPort() } + ?.let { forwardedPort -> + (forwardedPort.configuration as PortConfiguration.PerClientTcpForwarding) + .clientPortState + .let { + if (it is ClientPortState.Assigned) { + Optional.of(URIBuilder().setScheme("http").setHost(it.clientInterface).setPort(it.clientPort).build()) + } else { + Optional.empty() + } + } + } ?: Optional.empty() + override fun dispose() { + portLifetimes.values.forEach { it.terminate() } + portLifetimes.clear() lifetime.terminate() } } From 16767b4835e65b032cd67841d73ea1a60422c9e0 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Wed, 26 Mar 2025 09:31:34 +0000 Subject: [PATCH 2/4] Fixup Tool: gitpod/catfood.gitpod.cloud --- .../AbstractGitpodPortForwardingService.kt | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt index 735275d392fae9..27064a3f68aac0 100644 --- a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt +++ b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt @@ -157,36 +157,56 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService .filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) } runJob(lifetime) { - processPortsInBatches(servedPortsToStartForwarding) { port -> - operationSemaphore.withPermit { - startForwarding(port) - allPortsToKeep.add(port.localPort) + coroutineScope { + // Stop operations first to free up resources + launch { + processPortsInBatches(forwardedPortsToStopForwarding) { port -> + operationSemaphore.withPermit { stopForwarding(port) } + } } - } - - processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> - operationSemaphore.withPermit { - startExposingOnClient(port) - allPortsToKeep.add(port.localPort) + launch { + processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> + operationSemaphore.withPermit { stopExposingOnClient(port) } + } } - } - processPortsInBatches(forwardedPortsToStopForwarding) { port -> - operationSemaphore.withPermit { stopForwarding(port) } - } + // Wait for stop operations to complete + awaitAll() - processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> - operationSemaphore.withPermit { stopExposingOnClient(port) } - } + // Start new operations + launch { + processPortsInBatches(servedPortsToStartForwarding) { port -> + operationSemaphore.withPermit { + startForwarding(port) + allPortsToKeep.add(port.localPort) + } + } + } + launch { + processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> + operationSemaphore.withPermit { + startExposingOnClient(port) + allPortsToKeep.add(port.localPort) + } + } + } - processPortsInBatches(portsList) { port -> - application.invokeLater { - updatePortsPresentation(port) - allPortsToKeep.add(port.localPort) + // Update presentation in parallel with start operations + launch { + processPortsInBatches(portsList) { port -> + application.invokeLater { + updatePortsPresentation(port) + allPortsToKeep.add(port.localPort) + } + } } - } - cleanupUnusedLifetimes(allPortsToKeep) + // Wait for all operations to complete + awaitAll() + + // Clean up after all operations are done + cleanupUnusedLifetimes(allPortsToKeep) + } } } @@ -195,7 +215,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService try { batch.forEach { port -> try { - withTimeout(5000) { // Add timeout to prevent hanging operations + withTimeout(5000) { action(port) } } catch (e: Exception) { @@ -205,7 +225,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService delay(BATCH_DELAY) } catch (e: Exception) { thisLogger().error("gitpod: Error processing batch", e) - delay(BATCH_DELAY * 2) // Double delay on error + delay(BATCH_DELAY * 2) } } } From 6f40cdbad0bf4894bb32dfb95a7f11daf05db56b Mon Sep 17 00:00:00 2001 From: Huiwen Date: Wed, 26 Mar 2025 09:34:58 +0000 Subject: [PATCH 3/4] Fixup Tool: gitpod/catfood.gitpod.cloud --- .../AbstractGitpodPortForwardingService.kt | 80 ++++++++++--------- 1 file changed, 41 insertions(+), 39 deletions(-) diff --git a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt index 27064a3f68aac0..3ac898aaea5018 100644 --- a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt +++ b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt @@ -37,6 +37,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService private const val BATCH_SIZE = 10 private const val BATCH_DELAY = 100L private const val DEBOUNCE_DELAY = 500L + private const val UI_UPDATE_BATCH_SIZE = 50 // Batch size for UI updates } private val perClientPortForwardingManager = service() @@ -92,7 +93,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService val completableFuture = CompletableFuture() val statusServiceStub = StatusServiceGrpc.newStub(GitpodManager.supervisorChannel) - val portsStatusRequest = Status.PortsStatusRequest.newBuilder().setObserve(true).build() val portsStatusResponseObserver = object : @@ -104,8 +104,8 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService override fun onNext(response: Status.PortsStatusResponse) { debounceJob?.cancel() debounceJob = runJob(lifetime) { - delay(DEBOUNCE_DELAY) try { + delay(DEBOUNCE_DELAY) syncPortsListWithClient(response) } catch (e: Exception) { thisLogger().error("gitpod: Error during port observation", e) @@ -125,7 +125,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } statusServiceStub.portsStatus(portsStatusRequest, portsStatusResponseObserver) - return completableFuture } @@ -133,7 +132,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService return System.getenv("GITPOD_DISABLE_JETBRAINS_LOCAL_PORT_FORWARDING")?.toBoolean() ?: false } - private fun syncPortsListWithClient(response: Status.PortsStatusResponse) { + private suspend fun syncPortsListWithClient(response: Status.PortsStatusResponse) { val ignoredPorts = ignoredPortsForNotificationService.getIgnoredPorts() val portsList = response.portsList.filter { !ignoredPorts.contains(it.localPort) } val portsNumbersFromPortsList = portsList.map { it.localPort } @@ -156,57 +155,60 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService .map { it.hostPortNumber } .filter { portsNumbersFromNonServedPorts.contains(it) || !portsNumbersFromPortsList.contains(it) } - runJob(lifetime) { - coroutineScope { - // Stop operations first to free up resources - launch { - processPortsInBatches(forwardedPortsToStopForwarding) { port -> - operationSemaphore.withPermit { stopForwarding(port) } - } + coroutineScope { + // Stop operations first to free up resources + launch { + processPortsInBatches(forwardedPortsToStopForwarding) { port -> + operationSemaphore.withPermit { stopForwarding(port) } } - launch { - processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> - operationSemaphore.withPermit { stopExposingOnClient(port) } - } + } + launch { + processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> + operationSemaphore.withPermit { stopExposingOnClient(port) } } + } - // Wait for stop operations to complete - awaitAll() + // Wait for stop operations to complete + awaitAll() - // Start new operations - launch { - processPortsInBatches(servedPortsToStartForwarding) { port -> - operationSemaphore.withPermit { - startForwarding(port) - allPortsToKeep.add(port.localPort) - } + // Start new operations + launch { + processPortsInBatches(servedPortsToStartForwarding) { port -> + operationSemaphore.withPermit { + startForwarding(port) + allPortsToKeep.add(port.localPort) } } - launch { - processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> - operationSemaphore.withPermit { - startExposingOnClient(port) - allPortsToKeep.add(port.localPort) - } + } + launch { + processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> + operationSemaphore.withPermit { + startExposingOnClient(port) + allPortsToKeep.add(port.localPort) } } + } - // Update presentation in parallel with start operations - launch { - processPortsInBatches(portsList) { port -> - application.invokeLater { + // Wait for all operations to complete + awaitAll() + + // Update presentation in batches to avoid UI thread blocking + launch { + portsList.chunked(UI_UPDATE_BATCH_SIZE).forEach { batch -> + application.invokeLater { + batch.forEach { port -> updatePortsPresentation(port) allPortsToKeep.add(port.localPort) } } + delay(50) // Small delay between UI updates to prevent overwhelming the EDT } + } - // Wait for all operations to complete - awaitAll() + awaitAll() - // Clean up after all operations are done - cleanupUnusedLifetimes(allPortsToKeep) - } + // Clean up after all operations are done + cleanupUnusedLifetimes(allPortsToKeep) } } From 1cc4344359b9caaf81b58d6e4f633366dfae3cf7 Mon Sep 17 00:00:00 2001 From: Huiwen Date: Wed, 26 Mar 2025 09:41:07 +0000 Subject: [PATCH 4/4] Fix Tool: gitpod/catfood.gitpod.cloud --- .../AbstractGitpodPortForwardingService.kt | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt index 3ac898aaea5018..63a64c495f77e9 100644 --- a/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt +++ b/components/ide/jetbrains/backend-plugin/src/main/kotlin/io/gitpod/jetbrains/remote/AbstractGitpodPortForwardingService.kt @@ -157,22 +157,23 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService coroutineScope { // Stop operations first to free up resources - launch { + val stopForwardingJob = launch { processPortsInBatches(forwardedPortsToStopForwarding) { port -> operationSemaphore.withPermit { stopForwarding(port) } } } - launch { + val stopExposingJob = launch { processPortsInBatches(exposedPortsToStopExposingOnClient) { port -> operationSemaphore.withPermit { stopExposingOnClient(port) } } } // Wait for stop operations to complete - awaitAll() + stopForwardingJob.join() + stopExposingJob.join() // Start new operations - launch { + val startForwardingJob = launch { processPortsInBatches(servedPortsToStartForwarding) { port -> operationSemaphore.withPermit { startForwarding(port) @@ -180,7 +181,7 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } } } - launch { + val startExposingJob = launch { processPortsInBatches(exposedPortsToStartExposingOnClient) { port -> operationSemaphore.withPermit { startExposingOnClient(port) @@ -189,11 +190,12 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } } - // Wait for all operations to complete - awaitAll() + // Wait for start operations to complete + startForwardingJob.join() + startExposingJob.join() // Update presentation in batches to avoid UI thread blocking - launch { + val updatePresentationJob = launch { portsList.chunked(UI_UPDATE_BATCH_SIZE).forEach { batch -> application.invokeLater { batch.forEach { port -> @@ -205,7 +207,8 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService } } - awaitAll() + // Wait for UI updates to complete + updatePresentationJob.join() // Clean up after all operations are done cleanupUnusedLifetimes(allPortsToKeep)