Skip to content

Commit c22df18

Browse files
Add SSE stream closing logic.
1 parent a62f447 commit c22df18

File tree

1 file changed

+14
-2
lines changed

1 file changed

+14
-2
lines changed

modules/sse/server.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ const (
1717
logDir = "live" // Directory where <runId>.str files are stored.
1818
runIdHeader = "X-RUN-ID" // Header key for the run ID.
1919
retrySeconds = 3 // SSE retry interval suggestion for clients.
20+
sseDoneEvent = "done" // Event name for the end of the stream.
2021
)
2122

2223
// GetSSEHandler creates and returns an http.HandlerFunc for the SSE endpoint.
@@ -127,8 +128,19 @@ func serveSSE(logger util.Logger, w http.ResponseWriter, r *http.Request) {
127128

128129
// ---> CHECK FOR END MARKER <---
129130
if line.Text == "__END__" {
130-
logger.Info(fmt.Sprintf("[SSE Handler] Detected END marker for run %s. Closing stream.", runId))
131-
return
131+
logger.Info(fmt.Sprintf("[SSE Handler] Detected END marker for run %s. Sending '%s' event and closing stream.", runId, sseDoneEvent))
132+
// Send the specific "done" event
133+
_, writeErr := fmt.Fprintf(w, "event: %s\ndata: {\"message\": \"Stream ended.\"}\n\n", sseDoneEvent)
134+
if writeErr != nil {
135+
// Log error but still attempt to flush and return
136+
logger.Warn(fmt.Sprintf("[SSE Handler] Error writing '%s' event for runId %s: %v", sseDoneEvent, runId, writeErr))
137+
}
138+
// Attempt to flush the final event
139+
if err := rc.Flush(); err != nil {
140+
logger.Error(fmt.Sprintf("[SSE Handler] Error flushing final event for runId %s: %v", runId, err))
141+
return // Exit handler, defer will stop tailer
142+
}
143+
return // <<< EXIT HANDLER HERE after sending event
132144
}
133145

134146
// Format and send SSE message

0 commit comments

Comments
 (0)