@@ -253,20 +253,22 @@ func (c *Client) handleSubscribe(msg *v1.SubscribeMessage) {
253253 return
254254 }
255255
256+ // Ensure log streaming is active for this container
257+ if err := c .hub .logStreamer .StartStreaming (server .ContainerID ); err != nil {
258+ c .hub .log .Warn ("Failed to start log streaming for container %s: %v" , server .ContainerID , err )
259+ }
260+
256261 // Check if already subscribed
257262 c .subscriptionsMu .Lock ()
258- if _ , exists := c .subscriptions [msg .ServerId ]; exists {
259- c .subscriptionsMu .Unlock ()
260- c .sendSubscribed (msg .ServerId )
261- return
263+ if _ , exists := c .subscriptions [msg .ServerId ]; ! exists {
264+ // Subscribe to log streamer
265+ ch := c .hub .logStreamer .Subscribe (server .ContainerID )
266+ c .subscriptions [msg .ServerId ] = ch
267+ go c .forwardLogs (msg .ServerId , ch )
262268 }
263-
264- // Subscribe to log streamer
265- ch := c .hub .logStreamer .Subscribe (server .ContainerID )
266- c .subscriptions [msg .ServerId ] = ch
267269 c .subscriptionsMu .Unlock ()
268270
269- // Send initial logs
271+ // Always send initial logs
270272 tail := int (msg .Tail )
271273 if tail <= 0 {
272274 tail = 500
@@ -276,9 +278,6 @@ func (c *Client) handleSubscribe(msg *v1.SubscribeMessage) {
276278
277279 // Confirm subscription
278280 c .sendSubscribed (msg .ServerId )
279-
280- // Start forwarding logs for this subscription
281- go c .forwardLogs (msg .ServerId , ch )
282281}
283282
284283// forwardLogs forwards log entries from the log streamer to the client
@@ -298,15 +297,16 @@ func (c *Client) handleUnsubscribe(msg *v1.UnsubscribeMessage) {
298297 // Get server to find container ID
299298 ctx := context .Background ()
300299 server , err := c .hub .store .GetServer (ctx , msg .ServerId )
301- if err != nil {
302- c .sendError ("server not found" )
303- return
304- }
305300
301+ // Always clean up the subscription
306302 c .subscriptionsMu .Lock ()
307303 if ch , exists := c .subscriptions [msg .ServerId ]; exists {
308304 delete (c .subscriptions , msg .ServerId )
309- c .hub .logStreamer .Unsubscribe (server .ContainerID , ch )
305+ if err == nil && server .ContainerID != "" {
306+ c .hub .logStreamer .Unsubscribe (server .ContainerID , ch )
307+ } else {
308+ close (ch ) // Close the channel to stop the forwardLogs goroutine
309+ }
310310 }
311311 c .subscriptionsMu .Unlock ()
312312
0 commit comments