Skip to content
2 changes: 2 additions & 0 deletions examples/using-sse/configs/.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
APP_NAME=using-sse
HTTP_PORT=9000
64 changes: 64 additions & 0 deletions examples/using-sse/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
package main

import (
"fmt"
"time"

"gofr.dev/pkg/gofr"
)

func main() {
app := gofr.New()

// Stream the current time every second.
// c.Context.Done() fires on both client disconnect and server shutdown.
app.GET("/events", func(c *gofr.Context) (any, error) {
return gofr.SSEResponse(func(stream *gofr.SSEStream) error {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

i := 0

for {
select {
case <-c.Context.Done():
// Graceful cleanup: release resources, close DB cursors, etc.
return nil
case t := <-ticker.C:
if err := stream.Send(gofr.SSEEvent{
ID: fmt.Sprintf("%d", i),
Name: "time",
Data: map[string]string{"time": t.Format(time.RFC3339)},
}); err != nil {
return err
}

i++
}
}
}), nil
})

// A countdown that sends 11 events and closes.
app.GET("/countdown", func(c *gofr.Context) (any, error) {
return gofr.SSEResponse(func(stream *gofr.SSEStream) error {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()

for i := 10; i >= 0; i-- {
select {
case <-c.Context.Done():
return nil
case <-ticker.C:
if err := stream.SendEvent("countdown", map[string]int{"remaining": i}); err != nil {
return err
}
}
}

return stream.SendEvent("done", "Countdown complete!")
}), nil
})

app.Run()
}
8 changes: 6 additions & 2 deletions pkg/gofr/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (el *ErrorLogEntry) PrettyPrint(writer io.Writer) {
}

func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
c := newContext(gofrHTTP.NewResponder(w, r.Method), gofrHTTP.NewRequest(r), h.container)
c := newContext(gofrHTTP.NewResponder(w, r.Method, gofrHTTP.WithLogger(h.container.Logger)), gofrHTTP.NewRequest(r), h.container)

traceID := trace.SpanFromContext(r.Context()).SpanContext().TraceID().String()

Expand Down Expand Up @@ -108,7 +108,11 @@ func (h handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp.SetCustomHeaders(w)
}

// Handler function completed
// SSE streams are long-lived; bypass request timeout like WebSocket.
if _, ok := result.(response.SSE); ok {
c.Context = r.Context()
}

c.responder.Respond(result, err)
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/gofr/http/middleware/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,18 @@ func (w *StatusResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) {
return nil, nil, fmt.Errorf("%w: cannot hijack connection", errHijackNotSupported)
}

// Flush delegates to the underlying http.Flusher if supported.
func (w *StatusResponseWriter) Flush() {
if flusher, ok := w.ResponseWriter.(http.Flusher); ok {
flusher.Flush()
}
}

// Unwrap returns the underlying ResponseWriter for http.ResponseController.
func (w *StatusResponseWriter) Unwrap() http.ResponseWriter {
return w.ResponseWriter
}

// RequestLog represents a log entry for HTTP requests.
type RequestLog struct {
TraceID string `json:"trace_id,omitempty"`
Expand Down
40 changes: 40 additions & 0 deletions pkg/gofr/http/middleware/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,43 @@ type mockAddr struct{}

func (*mockAddr) Network() string { return "tcp" }
func (*mockAddr) String() string { return "127.0.0.1:8080" }

func Test_StatusResponseWriter_Flush_Supported(t *testing.T) {
rr := httptest.NewRecorder()
srw := &StatusResponseWriter{ResponseWriter: rr}

// httptest.ResponseRecorder implements http.Flusher.
assert.NotPanics(t, func() {
srw.Flush()
})

assert.True(t, rr.Flushed, "expected recorder to be flushed")
}

func Test_StatusResponseWriter_Flush_NotSupported(t *testing.T) {
writer := &nonFlushableWriter{header: http.Header{}}
srw := &StatusResponseWriter{ResponseWriter: writer}

// Should not panic even if the underlying writer doesn't support Flusher.
assert.NotPanics(t, func() {
srw.Flush()
})
}

func Test_StatusResponseWriter_Unwrap(t *testing.T) {
rr := httptest.NewRecorder()
srw := &StatusResponseWriter{ResponseWriter: rr}

unwrapped := srw.Unwrap()

assert.Equal(t, rr, unwrapped, "expected Unwrap to return the underlying ResponseWriter")
}

// nonFlushableWriter is a ResponseWriter that does NOT implement http.Flusher.
type nonFlushableWriter struct {
header http.Header
}

func (n *nonFlushableWriter) Header() http.Header { return n.header }
func (*nonFlushableWriter) Write([]byte) (int, error) { return 0, nil }
func (*nonFlushableWriter) WriteHeader(int) {}
59 changes: 57 additions & 2 deletions pkg/gofr/http/responder.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"

Expand All @@ -13,15 +14,35 @@ var (
errEmptyResponse = errors.New("internal server error")
)

// sseLogger is a minimal logging interface used only for SSE error reporting.
type sseLogger interface {
Debugf(format string, args ...any)
}

// ResponderOption configures optional Responder behavior.
type ResponderOption func(*Responder)

// WithLogger attaches a logger to the Responder for debug-level SSE error logging.
func WithLogger(l sseLogger) ResponderOption {
return func(r *Responder) { r.logger = l }
}

// NewResponder creates a new Responder instance from the given http.ResponseWriter.
func NewResponder(w http.ResponseWriter, method string) *Responder {
return &Responder{w: w, method: method}
func NewResponder(w http.ResponseWriter, method string, opts ...ResponderOption) *Responder {
r := &Responder{w: w, method: method}

for _, o := range opts {
o(r)
}

return r
}

// Responder encapsulates an http.ResponseWriter and is responsible for crafting structured responses.
type Responder struct {
w http.ResponseWriter
method string
logger sseLogger
}

// Respond sends a response with the given data and handles potential errors, setting appropriate
Expand Down Expand Up @@ -75,6 +96,10 @@ func (r Responder) handleSpecialResponseTypes(data any, err error) bool {
statusCode := r.getStatusCodeForSpecialResponse(data, err)

switch v := data.(type) {
case resTypes.SSE:
r.handleSSEResponse(v)
return true

case resTypes.File:
r.w.Header().Set("Content-Type", v.ContentType)
r.w.WriteHeader(statusCode)
Expand Down Expand Up @@ -276,3 +301,33 @@ func isNil(i any) bool {

return v.Kind() == reflect.Ptr && v.IsNil()
}

// handleSSEResponse handles Server-Sent Events responses.
//
// TODO: SSE connections block for the full connection lifetime, causing the logging middleware
// and response histogram to record the entire duration. Consider labeling SSE in the histogram.
func (r Responder) handleSSEResponse(sse resTypes.SSE) {
callback, ok := sse.Callback.(func(http.ResponseWriter, *http.ResponseController) error)
if !ok || callback == nil {
if r.logger != nil {
r.logger.Debugf("SSE response has nil or invalid callback")
}

return
}

r.w.Header().Set("Content-Type", "text/event-stream")
r.w.Header().Set("Cache-Control", "no-cache")
r.w.Header().Set("Connection", "keep-alive")
r.w.Header().Set("X-Accel-Buffering", "no")
r.w.WriteHeader(http.StatusOK)

rc := http.NewResponseController(r.w)
_ = rc.Flush()

if err := callback(r.w, rc); err != nil {
if r.logger != nil {
r.logger.Debugf(fmt.Sprintf("SSE stream error: %v", err))
}
}
}
9 changes: 9 additions & 0 deletions pkg/gofr/http/response/sse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package response

// SSE represents a Server-Sent Events response.
// Return this from a handler to stream events to the client.
type SSE struct {
// Callback holds the user's SSE streaming function.
// Typed as any to avoid circular imports; the Responder type-asserts it at call-site.
Callback any
}
Loading
Loading