@@ -13,16 +13,20 @@ import com.intellij.util.application
1313import com.jetbrains.rd.platform.codeWithMe.portForwarding.*
1414import com.jetbrains.rd.util.URI
1515import com.jetbrains.rd.util.lifetime.Lifetime
16+ import fleet.util.async.throttleLatest
1617import io.gitpod.supervisor.api.Status
1718import io.gitpod.supervisor.api.Status.PortsStatus
1819import io.gitpod.supervisor.api.StatusServiceGrpc
1920import io.grpc.stub.ClientCallStreamObserver
2021import io.grpc.stub.ClientResponseObserver
2122import kotlinx.coroutines.*
2223import kotlinx.coroutines.future.asDeferred
24+ import kotlinx.coroutines.flow.MutableSharedFlow
2325import org.apache.http.client.utils.URIBuilder
2426import java.util.*
2527import java.util.concurrent.CompletableFuture
28+ import kotlinx.coroutines.Dispatchers
29+ import kotlinx.coroutines.withContext
2630
2731@Suppress(" UnstableApiUsage" )
2832abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService {
@@ -34,8 +38,22 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
3438 private val perClientPortForwardingManager = service<PerClientPortForwardingManager >()
3539 private val ignoredPortsForNotificationService = service<GitpodIgnoredPortsForNotificationService >()
3640 private val lifetime = Lifetime .Eternal .createNested()
41+ private val portStatusFlow = MutableSharedFlow <Status .PortsStatusResponse >()
42+
43+ init {
44+ // Start collecting port status updates with throttling
45+ runJob(lifetime) {
46+ portStatusFlow
47+ .throttleLatest(1000 ) // Throttle to 1 second
48+ .collect { response ->
49+ withContext(Dispatchers .IO ) {
50+ syncPortsListWithClient(response)
51+ }
52+ }
53+ }
3754
38- init { start() }
55+ start()
56+ }
3957
4058 private fun start () {
4159 if (application.isHeadlessEnvironment) return
@@ -86,7 +104,11 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
86104 }
87105
88106 override fun onNext (response : Status .PortsStatusResponse ) {
89- application.invokeLater { syncPortsListWithClient(response) }
107+ application.invokeLater {
108+ runJob(lifetime) {
109+ portStatusFlow.emit(response)
110+ }
111+ }
90112 }
91113
92114 override fun onCompleted () {
0 commit comments