added built in SSE support in GoFr#3044
added built in SSE support in GoFr#3044IronwallxR5 wants to merge 10 commits intogofr-dev:developmentfrom
Conversation
- Use static errors instead of dynamic ones (err113) - Extract drainLoop method to reduce cyclomatic complexity (gocyclo) - Replace http.Get with http.NewRequestWithContext (noctx) - Rename unused parameters to _ (revive/unused-parameter) - Use require.Error instead of assert.Error (testifylint) - Fix import ordering and whitespace (gci, wsl_v5)
|
@IronwallxR5, Let's finalise on the structure and implement accordingly. Feel free to reopen this pull request after that. |
|
Hey @Umang01-hash, I have made the latest changes according to the sequential callback design. Everything appears to work as planned, but I'm having trouble getting one specific CI check to pass. Can you tell me if this is a blocker or if you have any suggestions for fixing that failing case? Thanks! |
There was a problem hiding this comment.
Hey @IronwallxR5 Thanks for making the requested changes as per the issue discussion. After re-revieiwing i found a few gaps that needs to be filled before going ahead with the PR:
1. Missing tests:
response.SSEwith nil Stream function (should test panic safety or nil guard)- Behavior when
sse.Streamreturns an error (currently discarded) Send(someNonSSEEventValue)— the fallback behavior of the any parameter
2. Send(event any) must be Send(event SSEEvent) — blocking API design issue (see inline comment on sse.go)
3. Silence on callback error in handleSSEResponse — see inline on responder.go
coolwednesday
left a comment
There was a problem hiding this comment.
Thanks for the PR @IronwallxR5. The overall direction is solid — returning response.SSE from a standard app.GET handler is the right approach. Umang's existing feedback on Send(any) → Send(SSEEvent), nil-guard on Stream, error logging, concurrent test, and countdown ticker are all valid and should be addressed.
I have a few additional concerns, mostly around the request timeout interaction and the streaming lifecycle:
Critical: Request timeout will break long-lived SSE streams
Looking at handler.go:63-68, when requestTimeout != 0, the handler's context gets wrapped with context.WithTimeout. The handler goroutine returns quickly (returning the SSEResponse struct), and close(done) fires on line 85. The select on line 88 picks up <-done and proceeds to call c.responder.Respond(result, err) on line 112.
At this point, handleSSEResponse calls sse.Stream(r.w) which runs the user's callback — a potentially infinite loop. The user's callback uses c.Context.Done() to detect disconnect. But c.Context has the timeout attached. So if the request timeout is, say, 5 seconds, the SSE stream dies after 5 seconds regardless of client state.
WebSocket already has a carve-out for this (line 60-62). SSE needs the same treatment. The problem is that we don't know it's SSE until after the handler returns (unlike WebSocket which is detectable from the request headers). Options:
- Detect SSE in the response type and re-derive the context from
r.Context()before callingRespond() - Pass the raw
r.Context()into theSSE.Streamfunction instead of relying onc.Context - Add a way for handlers to opt out of the timeout (e.g., a header hint or a context value)
This is a design decision that needs alignment before merging.
|
@IronwallxR5, Can you please resolve merge conflicts ? |
There was a problem hiding this comment.
Re-review — Checking Previous Feedback
@IronwallxR5, thanks for the updates. I've gone through the latest code against all previous review comments from both Umang and myself. Here's where things stand:
Previous Review Comments — Status
Umang's Comments
| # | Feedback | Status |
|---|---|---|
| 1 | response.SSE embeds a function (anomaly) + nil Stream panics |
Callback any is a type-safety regression (see below) |
| 2 | Concurrent test documents data race | ✅ Addressed — sync.Mutex added, test asserts exact count |
| 3 | Send(any) → Send(SSEEvent) |
✅ Addressed |
| 4 | Countdown example: use ticker | ✅ Addressed |
| 5 | Callback error swallowed | ✅ Addressed — error now logged via Debugf |
| 6 | Missing tests (nil Stream, error, fallback) | TestSSEResponse_NilCallback tests SSEStream directly, not response.SSE{Callback: nil} through the responder path |
coolwednesday's Comments
| # | Feedback | Status |
|---|---|---|
| 1 | Critical: Request timeout kills SSE | ✅ Addressed — handler.go:112-114 replaces context after detecting response.SSE |
| 2 | Logging/histogram skewed by SSE lifetime | |
| 3 | SSEStream needs mutex |
✅ Addressed |
| 4 | Two ResponseController instances |
✅ Addressed — single RC created in responder, passed to callback |
| 5 | Missing heartbeat | ✅ Addressed — 15s heartbeat goroutine with proper cleanup |
| 6 | Integration test leaks app.Run() |
/alive now, but no t.Cleanup shutdown |
| 7 | Example: show graceful shutdown | ✅ Addressed |
Score: 9/13 fully addressed, 4 partially.
New Issues Found
🔴 1. Callback any is a type-safety regression
File: pkg/gofr/http/response/sse.go:8
type SSE struct {
// Typed as any to avoid circular imports
Callback any
}The original Stream func(http.ResponseWriter) error was at least typed. Changing to any means:
- A user could set
Callbackto anything (string, int, struct) — no compile-time error responder.go:310does a runtime assertion that silently fails (returns without SSE output, just a debug log)- This is a public API that we'll have to support long-term
The "avoid circular imports" justification doesn't hold — the callback type func(http.ResponseWriter, *http.ResponseController) error uses only stdlib types and can live in the response package without circular imports:
// In response/sse.go
type SSECallback func(w http.ResponseWriter, rc *http.ResponseController) error
type SSE struct {
Callback SSECallback
}This restores type safety and still avoids circular imports since it doesn't reference anything from gofr or http/responder.
🔴 2. Timeout bypass race window
File: pkg/gofr/handler.go:111-114
if _, ok := result.(response.SSE); ok {
c.Context = r.Context()
}This runs AFTER the select block. If requestTimeout is very short (e.g., 1ms), the timeout fires before the handler goroutine closes done. The select picks c.Context.Done(), sets err = ErrorRequestTimeout, and result is nil. The SSE type check on line 112 fails → SSE stream never starts, user gets a timeout error.
WebSocket avoids this by detecting from request headers BEFORE the handler runs (line 60-62). SSE can't do that — it's detected from the response.
Suggested fix: Set a context value in SSEResponse before returning, and check it in the handler alongside the WebSocket check:
// In SSEResponse:
// Set a flag in context before handler returns
// In handler.go, alongside WebSocket check:
if websocket.IsWebSocketUpgrade(r) || isSSEFromContext(c.Context) {
c.Context = r.Context()
}Or at minimum, document that SSE handlers should return near-instantly (the SSEResponse call is just struct creation, which is fast — but the race exists in theory).
🟡 3. Debugf(fmt.Sprintf(...)) double-formatting
File: pkg/gofr/http/responder.go:330
r.logger.Debugf(fmt.Sprintf("SSE stream error: %v", err))Debugf already calls fmt.Sprintf internally. This should be:
r.logger.Debugf("SSE stream error: %v", err)🟡 4. Integration tests leak goroutines
File: pkg/gofr/sse_test.go:342, 401
Both TestSSEResponse_Integration and TestSSEResponse_Integration_ClientDisconnect call go app.Run() without ever shutting down. The HTTP server, heartbeat goroutines, and listener leak for the rest of the test process. Add t.Cleanup with shutdown.
🟡 5. Missing test: nil/invalid callback through responder
TestSSEResponse_NilCallback (line 440) tests SSEStream directly — it never exercises the nil guard in handleSSEResponse (responder.go:311). Add:
func TestHandleSSEResponse_NilCallback(t *testing.T) {
w := httptest.NewRecorder()
r := NewResponder(w, http.MethodGet)
r.Respond(response.SSE{}, nil) // zero-value SSE, Callback is nil
// Should not panic, should return empty response
}🟡 6. Missing test: callback error is logged
The error logging at responder.go:328-332 is untested. Add a test where the callback returns an error and assert the debug log fires.
Summary
The core architecture is solid — SSE via standard app.GET() returning response.SSE, sequential callback pattern, mutex-protected writes, heartbeat goroutine, proxy headers. Most review feedback was addressed well.
Blocking before merge:
- Fix
Callback any→ use a typed callback in theresponsepackage - Address the timeout race (at minimum document it; ideally detect SSE before handler timeout)
Should fix:
3. Debugf(fmt.Sprintf(...)) → Debugf(...)
4. Integration test shutdown cleanup
5. Add missing responder-level tests (nil callback, error logging)
Please also resolve the merge conflicts flagged in the latest comment.
|
@coolwednesday I have resolved the merge conflict. Thanks for the re-review! I will address the feedback and update the PR shortly. |
Pull Request Template
Description:
Adds built-in Server-Sent Events (SSE) support to the GoFr framework.
Fixes #3038
This introduces a seamless way to handle SSE by returning a new custom response type (
response.SSE) from standard HTTP handlers. It utilizes an idiomatic closure-based streaming architecture that aligns with GoFr’s(any, error)handler pattern.Architecture:
No specialized routing methods: SSE is supported via standard registration (e.g.,
app.GET()), keeping routing consistent with HTTP verbs and avoiding framework bloat.Sequential Network Writes:
SSEStream.Send()writes directly to the underlying http.ResponseWriter and flushes using http.ResponseController. This minimizes memory overhead and relies on standard library blocking for natural backpressure (removing the need for complex channel brokers).Separation of Concerns: Introduced gofr.SSEResponse as a helper that returns the framework's native response.SSE. This keeps user-facing APIs in the top-level package while keeping internal HTTP definitions in pkg/gofr/http/response.
Middleware and Proxy compatibility:
Added
Flush()andUnwrap()toStatusResponseWriterto allowhttp.ResponseControllerto reach the underlying flusher through the middleware chain.Sets
X-Accel-Buffering: noto ensure real-time delivery through NGINX/Ingress proxies.Sets required headers:
Content-Type: text/event-stream,Cache-Control: no-cache, andConnection: keep-alive.Breaking Changes (if applicable):
None
Additional Information:
Clean Lifecycle: Leverages standard Go context (
<-c.Context.Done()) for idiomatic client disconnect detection within the handler closure.Comprehensive Testing: 26 tests covering event serialization, multi-line data formatting, direct flushing through middleware, and full HTTP integration.
Documentation: Included an example application showing both infinite time-streaming and finite countdown streams.
Checklist:
goimportandgolangci-lint.Thank you for your contribution!