@@ -13,6 +13,7 @@ import com.intellij.util.application
1313import com.jetbrains.rd.platform.codeWithMe.portForwarding.*
1414import com.jetbrains.rd.util.URI
1515import com.jetbrains.rd.util.lifetime.Lifetime
16+ import com.jetbrains.rd.util.lifetime.LifetimeDefinition
1617import io.gitpod.supervisor.api.Status
1718import io.gitpod.supervisor.api.Status.PortsStatus
1819import io.gitpod.supervisor.api.StatusServiceGrpc
@@ -23,18 +24,34 @@ import kotlinx.coroutines.future.asDeferred
2324import org.apache.http.client.utils.URIBuilder
2425import java.util.*
2526import java.util.concurrent.CompletableFuture
27+ import java.util.concurrent.ConcurrentHashMap
28+ import kotlinx.coroutines.sync.Semaphore
29+ import kotlinx.coroutines.sync.withPermit
2630
2731@Suppress(" UnstableApiUsage" )
2832abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService {
2933 companion object {
3034 const val FORWARDED_PORT_LABEL = " ForwardedByGitpod"
3135 const val EXPOSED_PORT_LABEL = " ExposedByGitpod"
36+ private const val MAX_CONCURRENT_OPERATIONS = 10
37+ private const val BATCH_SIZE = 10
38+ private const val BATCH_DELAY = 100L
39+ private const val DEBOUNCE_DELAY = 500L
3240 }
3341
3442 private val perClientPortForwardingManager = service<PerClientPortForwardingManager >()
3543 private val ignoredPortsForNotificationService = service<GitpodIgnoredPortsForNotificationService >()
3644 private val lifetime = Lifetime .Eternal .createNested()
3745
46+ // Store current observed ports and their lifetime references
47+ private val portLifetimes = ConcurrentHashMap <Int , LifetimeDefinition >()
48+
49+ // Debounce job for port updates
50+ private var debounceJob: Job ? = null
51+
52+ // Semaphore to limit concurrent operations
53+ private val operationSemaphore = Semaphore (MAX_CONCURRENT_OPERATIONS )
54+
3855 init { start() }
3956
4057 private fun start () {
@@ -58,7 +75,6 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
5875 is InterruptedException , is CancellationException -> {
5976 cancel(" gitpod: Stopped observing ports list due to an expected interruption." )
6077 }
61-
6278 else -> {
6379 thisLogger().warn(
6480 " gitpod: Got an error while trying to get ports list from Supervisor. " +
@@ -86,7 +102,17 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
86102 }
87103
88104 override fun onNext (response : Status .PortsStatusResponse ) {
89- application.invokeLater { syncPortsListWithClient(response) }
105+ debounceJob?.cancel()
106+ debounceJob = runJob(lifetime) {
107+ delay(DEBOUNCE_DELAY )
108+ try {
109+ syncPortsListWithClient(response)
110+ } catch (e: Exception ) {
111+ thisLogger().error(" gitpod: Error during port observation" , e)
112+ } finally {
113+ debounceJob = null
114+ }
115+ }
90116 }
91117
92118 override fun onCompleted () {
@@ -114,6 +140,9 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
114140 val servedPorts = portsList.filter { it.served }
115141 val exposedPorts = servedPorts.filter { it.exposed?.url?.isNotBlank() ? : false }
116142 val portsNumbersFromNonServedPorts = portsList.filter { ! it.served }.map { it.localPort }
143+
144+ val allPortsToKeep = mutableSetOf<Int >()
145+
117146 val servedPortsToStartForwarding = servedPorts.filter {
118147 perClientPortForwardingManager.getPorts(it.localPort).none { p -> p.labels.contains(FORWARDED_PORT_LABEL ) }
119148 }
@@ -127,27 +156,91 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
127156 .map { it.hostPortNumber }
128157 .filter { portsNumbersFromNonServedPorts.contains(it) || ! portsNumbersFromPortsList.contains(it) }
129158
130- servedPortsToStartForwarding.forEach { startForwarding(it) }
159+ runJob(lifetime) {
160+ processPortsInBatches(servedPortsToStartForwarding) { port ->
161+ operationSemaphore.withPermit {
162+ startForwarding(port)
163+ allPortsToKeep.add(port.localPort)
164+ }
165+ }
166+
167+ processPortsInBatches(exposedPortsToStartExposingOnClient) { port ->
168+ operationSemaphore.withPermit {
169+ startExposingOnClient(port)
170+ allPortsToKeep.add(port.localPort)
171+ }
172+ }
131173
132- exposedPortsToStartExposingOnClient.forEach { startExposingOnClient(it) }
174+ processPortsInBatches(forwardedPortsToStopForwarding) { port ->
175+ operationSemaphore.withPermit { stopForwarding(port) }
176+ }
133177
134- forwardedPortsToStopForwarding.forEach { stopForwarding(it) }
178+ processPortsInBatches(exposedPortsToStopExposingOnClient) { port ->
179+ operationSemaphore.withPermit { stopExposingOnClient(port) }
180+ }
135181
136- exposedPortsToStopExposingOnClient.forEach { stopExposingOnClient(it) }
182+ processPortsInBatches(portsList) { port ->
183+ application.invokeLater {
184+ updatePortsPresentation(port)
185+ allPortsToKeep.add(port.localPort)
186+ }
187+ }
137188
138- portsList.forEach { updatePortsPresentation(it) }
189+ cleanupUnusedLifetimes(allPortsToKeep)
190+ }
139191 }
140192
141- private fun startForwarding (portStatus : PortsStatus ) {
142- if (isLocalPortForwardingDisabled()) {
143- return
193+ private suspend fun <T > processPortsInBatches (ports : List <T >, action : suspend (T ) -> Unit ) {
194+ ports.chunked(BATCH_SIZE ).forEach { batch ->
195+ try {
196+ batch.forEach { port ->
197+ try {
198+ withTimeout(5000 ) { // Add timeout to prevent hanging operations
199+ action(port)
200+ }
201+ } catch (e: Exception ) {
202+ thisLogger().warn(" gitpod: Error processing port in batch" , e)
203+ }
204+ }
205+ delay(BATCH_DELAY )
206+ } catch (e: Exception ) {
207+ thisLogger().error(" gitpod: Error processing batch" , e)
208+ delay(BATCH_DELAY * 2 ) // Double delay on error
209+ }
210+ }
211+ }
212+
213+ private fun cleanupUnusedLifetimes (portsToKeep : Set <Int >) {
214+ portLifetimes.keys.filter { ! portsToKeep.contains(it) }.forEach { port ->
215+ portLifetimes[port]?.let { lifetime ->
216+ thisLogger().debug(" gitpod: Terminating lifetime for port $port " )
217+ lifetime.terminate()
218+ portLifetimes.remove(port)
219+ }
144220 }
221+ }
222+
223+ private fun startForwarding (portStatus : PortsStatus ) {
224+ if (isLocalPortForwardingDisabled()) return
225+
226+ val portLifetime = getOrCreatePortLifetime(portStatus.localPort)
227+
145228 try {
146- perClientPortForwardingManager.forwardPort(
229+ thisLogger().debug(" gitpod: Starting forwarding for port ${portStatus.localPort} " )
230+ val port = perClientPortForwardingManager.forwardPort(
147231 portStatus.localPort,
148232 PortType .TCP ,
149233 setOf (FORWARDED_PORT_LABEL ),
150234 )
235+
236+ portLifetime.onTerminationOrNow {
237+ thisLogger().debug(" gitpod: Cleaning up port ${portStatus.localPort} due to lifetime termination" )
238+ try {
239+ perClientPortForwardingManager.removePort(port)
240+ } catch (e: Exception ) {
241+ thisLogger().warn(" gitpod: Failed to remove port on lifetime termination" , e)
242+ }
243+ }
151244 } catch (throwable: Throwable ) {
152245 if (throwable !is PortAlreadyForwardedException ) {
153246 thisLogger().warn(" gitpod: Caught an exception while forwarding port: ${throwable.message} " )
@@ -156,62 +249,113 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
156249 }
157250
158251 private fun stopForwarding (hostPort : Int ) {
159- perClientPortForwardingManager.getPorts(hostPort)
252+ thisLogger().debug(" gitpod: Stopping forwarding for port $hostPort " )
253+ val portsToRemove = perClientPortForwardingManager.getPorts(hostPort)
160254 .filter { it.labels.contains(FORWARDED_PORT_LABEL ) }
161- .forEach { perClientPortForwardingManager.removePort(it) }
255+
256+ terminatePortLifetime(hostPort)
257+
258+ portsToRemove.forEach {
259+ try {
260+ perClientPortForwardingManager.removePort(it)
261+ } catch (e: Exception ) {
262+ thisLogger().warn(" gitpod: Failed to remove forwarded port $hostPort " , e)
263+ }
264+ }
162265 }
163266
164267 private fun startExposingOnClient (portStatus : PortsStatus ) {
165- perClientPortForwardingManager.exposePort(
268+ val portLifetime = getOrCreatePortLifetime(portStatus.localPort)
269+
270+ thisLogger().debug(" gitpod: Starting exposing for port ${portStatus.localPort} " )
271+ val port = perClientPortForwardingManager.exposePort(
166272 portStatus.localPort,
167273 portStatus.exposed.url,
168274 setOf (EXPOSED_PORT_LABEL ),
169275 )
276+
277+ portLifetime.onTerminationOrNow {
278+ thisLogger().debug(" gitpod: Cleaning up exposed port ${portStatus.localPort} due to lifetime termination" )
279+ try {
280+ perClientPortForwardingManager.removePort(port)
281+ } catch (e: Exception ) {
282+ thisLogger().warn(" gitpod: Failed to remove exposed port on lifetime termination" , e)
283+ }
284+ }
170285 }
171286
172287 private fun stopExposingOnClient (hostPort : Int ) {
173- perClientPortForwardingManager.getPorts(hostPort)
288+ thisLogger().debug(" gitpod: Stopping exposing for port $hostPort " )
289+ val portsToRemove = perClientPortForwardingManager.getPorts(hostPort)
174290 .filter { it.labels.contains(EXPOSED_PORT_LABEL ) }
175- .forEach { perClientPortForwardingManager.removePort(it) }
176- }
177291
178- private fun updatePortsPresentation (portStatus : PortsStatus ) {
179- perClientPortForwardingManager.getPorts(portStatus.localPort).forEach {
180- if (it.configuration.isForwardedPort()) {
181- it.presentation.name = portStatus.name
182- it.presentation.description = portStatus.description
183- it.presentation.tooltip = " Forwarded"
184- it.presentation.icon = RowIcon (AllIcons .Actions .Commit )
185- } else if (it.configuration.isExposedPort()) {
186- val isPubliclyExposed = (portStatus.exposed.visibility == Status .PortVisibility .public_visibility)
187-
188- it.presentation.name = portStatus.name
189- it.presentation.description = portStatus.description
190- it.presentation.tooltip = " Exposed (${if (isPubliclyExposed) " Public" else " Private" } )"
191- it.presentation.icon = if (isPubliclyExposed) {
192- RowIcon (AllIcons .Actions .Commit )
193- } else {
194- RowIcon (AllIcons .Actions .Commit , AllIcons .Diff .Lock )
195- }
292+ terminatePortLifetime(hostPort)
293+
294+ portsToRemove.forEach {
295+ try {
296+ perClientPortForwardingManager.removePort(it)
297+ } catch (e: Exception ) {
298+ thisLogger().warn(" gitpod: Failed to remove exposed port $hostPort " , e)
196299 }
197300 }
198301 }
199302
200- override fun getLocalHostUriFromHostPort (hostPort : Int ): Optional <URI > {
201- val forwardedPort = perClientPortForwardingManager.getPorts(hostPort).firstOrNull {
202- it.configuration.isForwardedPort()
203- } ? : return Optional .empty()
303+ private fun getOrCreatePortLifetime (port : Int ): Lifetime =
304+ portLifetimes.computeIfAbsent(port) {
305+ thisLogger().debug(" gitpod: Creating new lifetime for port $port " )
306+ lifetime.createNested()
307+ }
308+
309+ private fun terminatePortLifetime (port : Int ) {
310+ portLifetimes[port]?.let { portLifetime ->
311+ thisLogger().debug(" gitpod: Terminating lifetime for port $port " )
312+ portLifetime.terminate()
313+ portLifetimes.remove(port)
314+ }
315+ }
204316
205- (forwardedPort.configuration as PortConfiguration .PerClientTcpForwarding ).clientPortState.let {
206- return if (it is ClientPortState .Assigned ) {
207- Optional .of(URIBuilder ().setScheme(" http" ).setHost(it.clientInterface).setPort(it.clientPort).build())
208- } else {
209- Optional .empty()
317+ private fun updatePortsPresentation (portStatus : PortsStatus ) {
318+ perClientPortForwardingManager.getPorts(portStatus.localPort).forEach {
319+ when {
320+ it.configuration.isForwardedPort() -> {
321+ it.presentation.name = portStatus.name
322+ it.presentation.description = portStatus.description
323+ it.presentation.tooltip = " Forwarded"
324+ it.presentation.icon = RowIcon (AllIcons .Actions .Commit )
325+ }
326+ it.configuration.isExposedPort() -> {
327+ val isPubliclyExposed = (portStatus.exposed.visibility == Status .PortVisibility .public_visibility)
328+ it.presentation.name = portStatus.name
329+ it.presentation.description = portStatus.description
330+ it.presentation.tooltip = " Exposed (${if (isPubliclyExposed) " Public" else " Private" } )"
331+ it.presentation.icon = if (isPubliclyExposed) {
332+ RowIcon (AllIcons .Actions .Commit )
333+ } else {
334+ RowIcon (AllIcons .Actions .Commit , AllIcons .Diff .Lock )
335+ }
336+ }
210337 }
211338 }
212339 }
213340
341+ override fun getLocalHostUriFromHostPort (hostPort : Int ): Optional <URI > =
342+ perClientPortForwardingManager.getPorts(hostPort)
343+ .firstOrNull { it.configuration.isForwardedPort() }
344+ ?.let { forwardedPort ->
345+ (forwardedPort.configuration as PortConfiguration .PerClientTcpForwarding )
346+ .clientPortState
347+ .let {
348+ if (it is ClientPortState .Assigned ) {
349+ Optional .of(URIBuilder ().setScheme(" http" ).setHost(it.clientInterface).setPort(it.clientPort).build())
350+ } else {
351+ Optional .empty()
352+ }
353+ }
354+ } ? : Optional .empty()
355+
214356 override fun dispose () {
357+ portLifetimes.values.forEach { it.terminate() }
358+ portLifetimes.clear()
215359 lifetime.terminate()
216360 }
217361}
0 commit comments