-
Notifications
You must be signed in to change notification settings - Fork 286
Description
Description
I have observed a potential issue with goroutine leaks in the streamableClientConn implementation when handling Server-Sent Events (SSE). Below is a description of the issue and the potential scenario that may lead to a goroutine leak.
Lines 751 to 770 in c037ba5
| func (s *streamableClientConn) handleSSE(resp *http.Response) { | |
| defer resp.Body.Close() | |
| done := make(chan struct{}) | |
| go func() { | |
| defer close(done) | |
| for evt, err := range scanEvents(resp.Body) { | |
| if err != nil { | |
| // TODO: surface this error; possibly break the stream | |
| return | |
| } | |
| s.incoming <- evt.data | |
| } | |
| }() | |
| select { | |
| case <-s.done: | |
| case <-done: | |
| } | |
| } |
Problem Scenario
- SSE Event Handling:
When the server sends SSE events, a goroutine is spawned to read events and write them to thes.incomingchannel:
Lines 744 to 748 in 6c6243c
if resp.Header.Get("Content-Type") == "text/event-stream" { go s.handleSSE(resp) } else { resp.Body.Close() } Read()as the Sole Consumer:
TheRead()method is the only consumer ofs.incoming. IfRead()exits (due to s.done being closed byClose()), there will be no remaining consumer fors.incoming.
Lines 665 to 674 in 6c6243c
func (s *streamableClientConn) Read(ctx context.Context) (jsonrpc.Message, error) { select { case <-ctx.Done(): return nil, ctx.Err() case <-s.done: return nil, io.EOF case data := <-s.incoming: return jsonrpc2.DecodeMessage(data) } } - Blocked Goroutine:
Ifs.incomingbecomes full (due to slow consumption or high-frequency events), the goroutine writing tos.incomingwill block and will not be able to proceed. Close()Function Call:
AfterClose()is called,s.doneis closed, which causes the goroutine runninghandleSSEto exit. However, the sub-goroutine writing tos.incomingremains blocked because the channel is full. Sinceresp.Body.Close()is deferred inhandleSSE, it is executed when thehandleSSEgoroutine exits, which leads to the closure of the underlying stream.- Goroutine Leak:
Althoughresp.Body.Close()will cause thefor rangeloop inside the goroutine to end when it completes a single iteration, the goroutine remains blocked ats.incoming <- evt.datawithin the loop. This prevents the goroutine from proceeding to the next iteration and exiting, thus causing the goroutine to remain in a blocked state, leading to a potential goroutine leak.
Proposed Solution:
To resolve this issue, I suggest implementing a solution where the child goroutine listens for both the done channel and the s.incoming channel, ensuring that the goroutine exits if either the done channel is closed or if there is a potential blockage on the s.incoming channel.
select {
case s.incoming <- evt.data:
case <-s.done:
return
}I am concerned that this could lead to unbounded goroutine accumulation, especially in high-load scenarios.
If I have misunderstood the behavior or missed any important context, I sincerely apologize for any confusion this might cause, and I would be grateful for further clarification. I’m happy to help address this issue if needed and contribute to improving the implementation.