@@ -13,16 +13,20 @@ import com.intellij.util.application
1313import  com.jetbrains.rd.platform.codeWithMe.portForwarding.* 
1414import  com.jetbrains.rd.util.URI 
1515import  com.jetbrains.rd.util.lifetime.Lifetime 
16+ import  fleet.util.async.throttleLatest 
1617import  io.gitpod.supervisor.api.Status 
1718import  io.gitpod.supervisor.api.Status.PortsStatus 
1819import  io.gitpod.supervisor.api.StatusServiceGrpc 
1920import  io.grpc.stub.ClientCallStreamObserver 
2021import  io.grpc.stub.ClientResponseObserver 
2122import  kotlinx.coroutines.* 
2223import  kotlinx.coroutines.future.asDeferred 
24+ import  kotlinx.coroutines.flow.MutableSharedFlow 
2325import  org.apache.http.client.utils.URIBuilder 
2426import  java.util.* 
2527import  java.util.concurrent.CompletableFuture 
28+ import  kotlinx.coroutines.Dispatchers 
29+ import  kotlinx.coroutines.withContext 
2630
2731@Suppress(" UnstableApiUsage"  )
2832abstract  class  AbstractGitpodPortForwardingService  : GitpodPortForwardingService  {
@@ -34,8 +38,22 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
3438    private  val  perClientPortForwardingManager =  service<PerClientPortForwardingManager >()
3539    private  val  ignoredPortsForNotificationService =  service<GitpodIgnoredPortsForNotificationService >()
3640    private  val  lifetime =  Lifetime .Eternal .createNested()
41+     private  val  portStatusFlow =  MutableSharedFlow <Status .PortsStatusResponse >()
42+ 
43+     init  {
44+         //  Start collecting port status updates with throttling
45+         runJob(lifetime) {
46+             portStatusFlow
47+                 .throttleLatest(1000 ) //  Throttle to 1 second
48+                 .collect { response -> 
49+                     withContext(Dispatchers .IO ) {
50+                         syncPortsListWithClient(response)
51+                     }
52+                 }
53+         }
3754
38-     init  { start() }
55+         start()
56+     }
3957
4058    private  fun  start () {
4159        if  (application.isHeadlessEnvironment) return 
@@ -86,7 +104,11 @@ abstract class AbstractGitpodPortForwardingService : GitpodPortForwardingService
86104            }
87105
88106            override  fun  onNext (response :  Status .PortsStatusResponse ) {
89-                 application.invokeLater { syncPortsListWithClient(response) }
107+                 application.invokeLater {
108+                     runJob(lifetime) {
109+                         portStatusFlow.emit(response)
110+                     }
111+                 }
90112            }
91113
92114            override  fun  onCompleted () {
0 commit comments