Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions sdks/community/go/pkg/encoding/sse/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -67,6 +68,9 @@ func (w *SSEWriter) WriteBytes(ctx context.Context, writer io.Writer, event []by
return fmt.Errorf("SSE flush failed: %w", err)
}
}
if flusher, ok := writer.(flusherWithoutError); ok {
flusher.Flush()
}
return nil
}

Expand Down Expand Up @@ -116,6 +120,9 @@ func (w *SSEWriter) WriteEventWithType(ctx context.Context, writer io.Writer, ev
return fmt.Errorf("SSE flush failed: %w", err)
}
}
if flusher, ok := writer.(flusherWithoutError); ok {
flusher.Flush()
}

return nil
}
Expand Down Expand Up @@ -187,6 +194,10 @@ type flusher interface {
Flush() error
}

// flusherWithoutError is a type alias for http.Flusher.
// It is used to flush the writer without returning an error.
type flusherWithoutError = http.Flusher

// CustomEvent is a simple implementation of events.Event for error and custom events
type CustomEvent struct {
events.BaseEvent
Expand Down
42 changes: 42 additions & 0 deletions sdks/community/go/pkg/encoding/sse/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,15 @@ func (fw *flushWriter) Flush() error {
return fw.flushError
}

type httpFlushWriter struct {
bytes.Buffer
flushCalled bool
}

func (fw *httpFlushWriter) Flush() {
fw.flushCalled = true
}

func TestNewSSEWriter(t *testing.T) {
writer := NewSSEWriter()
if writer == nil {
Expand Down Expand Up @@ -451,6 +460,39 @@ func TestSSEWriter_Flushing(t *testing.T) {
}
}

func TestSSEWriter_HTTPFlusherFallback(t *testing.T) {
ctx := context.Background()
writer := NewSSEWriter()

t.Run("WriteEvent", func(t *testing.T) {
fw := &httpFlushWriter{}
event := &mockEvent{
BaseEvent: events.BaseEvent{
EventType: events.EventTypeCustom,
},
}

if err := writer.WriteEvent(ctx, fw, event); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !fw.flushCalled {
t.Error("expected fallback flusher to be called")
}
})

t.Run("WriteBytes", func(t *testing.T) {
fw := &httpFlushWriter{}
if err := writer.WriteBytes(ctx, fw, []byte(`{"test":"data"}`)); err != nil {
t.Fatalf("unexpected error: %v", err)
}

if !fw.flushCalled {
t.Error("expected fallback flusher to be called")
}
})
}

func TestCustomEvent(t *testing.T) {
t.Run("Data operations", func(t *testing.T) {
event := &CustomEvent{}
Expand Down