@@ -45,7 +45,8 @@ func NewManager(exposed ExposedPortsInterface, served ServedPortsObserver, confi
4545 C : config ,
4646 T : tunneled ,
4747
48- forceUpdates : make (chan struct {}, 1 ),
48+ forceUpdates : make (chan struct {}, 1 ),
49+ closeSubscriptions : make (chan * Subscription , maxSubscriptions ),
4950
5051 internal : internal ,
5152 proxies : make (map [uint32 ]* localhostProxy ),
@@ -80,7 +81,8 @@ type Manager struct {
8081 C ConfigInterace
8182 T TunneledPortsInterface
8283
83- forceUpdates chan struct {}
84+ forceUpdates chan struct {}
85+ closeSubscriptions chan * Subscription
8486
8587 internal map [uint32 ]struct {}
8688 proxies map [uint32 ]* localhostProxy
@@ -172,6 +174,10 @@ func (pm *Manager) Run(ctx context.Context, wg *sync.WaitGroup) {
172174 select {
173175 case <- pm .forceUpdates :
174176 forceUpdate = true
177+ case sub := <- pm .closeSubscriptions :
178+ pm .mu .Lock ()
179+ delete (pm .subscriptions , sub )
180+ pm .mu .Unlock ()
175181 case exposed = <- exposedUpdates :
176182 if exposed == nil {
177183 if ctx .Err () == nil {
@@ -772,14 +778,14 @@ func (pm *Manager) Subscribe() (*Subscription, error) {
772778 sub := & Subscription {updates : make (chan []* api.PortsStatus , 5 )}
773779 var once sync.Once
774780 sub .Close = func () error {
775- pm .mu .Lock ()
776- defer pm .mu .Unlock ()
777-
778781 once .Do (func () {
779782 close (sub .updates )
780783 })
781- delete (pm .subscriptions , sub )
782-
784+ select {
785+ case pm .closeSubscriptions <- sub :
786+ default :
787+ log .Error ("closeSubscriptions channel is full" )
788+ }
783789 return nil
784790 }
785791 pm .subscriptions [sub ] = struct {}{}
0 commit comments