diff --git a/sdks/community/go/pkg/encoding/sse/writer.go b/sdks/community/go/pkg/encoding/sse/writer.go index 78df4e137..f44fd3a40 100644 --- a/sdks/community/go/pkg/encoding/sse/writer.go +++ b/sdks/community/go/pkg/encoding/sse/writer.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "log/slog" + "net/http" "strings" "sync" "time" @@ -67,6 +68,9 @@ func (w *SSEWriter) WriteBytes(ctx context.Context, writer io.Writer, event []by return fmt.Errorf("SSE flush failed: %w", err) } } + if flusher, ok := writer.(flusherWithoutError); ok { + flusher.Flush() + } return nil } @@ -116,6 +120,9 @@ func (w *SSEWriter) WriteEventWithType(ctx context.Context, writer io.Writer, ev return fmt.Errorf("SSE flush failed: %w", err) } } + if flusher, ok := writer.(flusherWithoutError); ok { + flusher.Flush() + } return nil } @@ -187,6 +194,10 @@ type flusher interface { Flush() error } +// flusherWithoutError is a type alias for http.Flusher. +// It is used to flush the writer without returning an error. +type flusherWithoutError = http.Flusher + // CustomEvent is a simple implementation of events.Event for error and custom events type CustomEvent struct { events.BaseEvent diff --git a/sdks/community/go/pkg/encoding/sse/writer_test.go b/sdks/community/go/pkg/encoding/sse/writer_test.go index 056662e44..a50b8fb56 100644 --- a/sdks/community/go/pkg/encoding/sse/writer_test.go +++ b/sdks/community/go/pkg/encoding/sse/writer_test.go @@ -68,6 +68,15 @@ func (fw *flushWriter) Flush() error { return fw.flushError } +type httpFlushWriter struct { + bytes.Buffer + flushCalled bool +} + +func (fw *httpFlushWriter) Flush() { + fw.flushCalled = true +} + func TestNewSSEWriter(t *testing.T) { writer := NewSSEWriter() if writer == nil { @@ -451,6 +460,39 @@ func TestSSEWriter_Flushing(t *testing.T) { } } +func TestSSEWriter_HTTPFlusherFallback(t *testing.T) { + ctx := context.Background() + writer := NewSSEWriter() + + t.Run("WriteEvent", func(t *testing.T) { + fw := &httpFlushWriter{} + event := &mockEvent{ + BaseEvent: events.BaseEvent{ + EventType: events.EventTypeCustom, + }, + } + + if err := writer.WriteEvent(ctx, fw, event); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !fw.flushCalled { + t.Error("expected fallback flusher to be called") + } + }) + + t.Run("WriteBytes", func(t *testing.T) { + fw := &httpFlushWriter{} + if err := writer.WriteBytes(ctx, fw, []byte(`{"test":"data"}`)); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !fw.flushCalled { + t.Error("expected fallback flusher to be called") + } + }) +} + func TestCustomEvent(t *testing.T) { t.Run("Data operations", func(t *testing.T) { event := &CustomEvent{}