From f00be82e6088c4e692a1add1dfe0dcb349ed82fb Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:01:23 -0800 Subject: [PATCH 01/12] Create go-bidi-design.md --- docs/go-bidi-design.md | 634 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 634 insertions(+) create mode 100644 docs/go-bidi-design.md diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md new file mode 100644 index 0000000000..356290526c --- /dev/null +++ b/docs/go-bidi-design.md @@ -0,0 +1,634 @@ +# Genkit Go Bidirectional Streaming Features - Design Document + +## Overview + +This document describes the design for bidirectional streaming features in Genkit Go. The implementation introduces three new primitives: + +1. **BidiAction** - Core primitive for bidirectional operations +2. **BidiFlow** - BidiAction with observability, intended for user definition +3. **SessionFlow** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics + +## Package Location + +All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` when stabilized: + +``` +go/core/x/ +├── bidi.go # BidiActionDef, BidiFunc, BidiConnection +├── bidi_flow.go # BidiFlow with tracing +├── bidi_options.go # Option types for bidi +├── session_flow.go # SessionFlow implementation +├── bidi_test.go # Tests +``` + +High-level wrappers in `go/genkit/bidi.go`. + +Import as `corex "github.com/firebase/genkit/go/core/x"`. + +--- + +## 1. Core Type Definitions + +### 1.1 BidiAction + +```go +// BidiActionDef represents a bidirectional streaming action. +// Type parameters: +// - In: Type of each message sent to the action +// - Out: Type of the final output +// - Init: Type of initialization data (use struct{} if not needed) +// - Stream: Type of each streamed output chunk +type BidiActionDef[In, Out, Init, Stream any] struct { + name string + fn BidiFunc[In, Out, Init, Stream] + registry api.Registry + desc *api.ActionDesc +} + +// BidiFunc is the function signature for bidi actions. +type BidiFunc[In, Out, Init, Stream any] func( + ctx context.Context, + inputStream <-chan In, + init Init, + streamCallback core.StreamCallback[Stream], +) (Out, error) +``` + +### 1.2 BidiConnection + +```go +// BidiConnection represents an active bidirectional streaming session. +type BidiConnection[In, Out, Stream any] struct { + inputCh chan In // Internal, accessed via Send() + streamCh chan Stream // Internal output stream channel + doneCh chan struct{} // Closed when action completes + output Out // Final output (valid after done) + err error // Error if any (valid after done) + ctx context.Context + cancel context.CancelFunc + span tracing.Span // Trace span, ended on completion + mu sync.Mutex + closed bool +} + +// Send sends an input message to the bidi action. +func (c *BidiConnection[In, Out, Stream]) Send(input In) error + +// Close signals that no more inputs will be sent. +func (c *BidiConnection[In, Out, Stream]) Close() error + +// Stream returns an iterator for receiving streamed chunks. +// Each call returns a new iterator over the same underlying channel. +// Breaking out of the loop does NOT close the connection - you can call Stream() +// again to continue receiving. The iterator completes when the action finishes. +func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] + +// Output returns the final output after the action completes. +// Blocks until done or context cancelled. +func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) + +// Done returns a channel closed when the connection completes. +func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} +``` + +**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a new channel for that turn. When the agent finishes responding to an input (loops back to wait for the next input), the stream channel for that turn closes, causing the user's `for range` loop to exit naturally. Call `Stream()` again after sending the next input to get the next turn's response. + +### 1.3 BidiFlow + +```go +// BidiFlow wraps a BidiAction with flow semantics (tracing, monitoring). +type BidiFlow[In, Out, Init, Stream any] struct { + *BidiActionDef[In, Out, Init, Stream] + // Uses BidiActionDef.Name() for flow name - no separate field needed +} +``` + +### 1.4 SessionFlow + +SessionFlow adds session state management on top of BidiFlow. + +```go +// SessionFlowOutput wraps the output with session info for persistence. +type SessionFlowOutput[State, Out any] struct { + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` +} + +// SessionFlow is a bidi flow with automatic session state management. +// Init = State: the initial state for new sessions (ignored when resuming an existing session). +type SessionFlow[State, In, Out, Stream any] struct { + *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] + store session.Store[State] + persistMode PersistMode +} + +// SessionFlowFunc is the function signature for session flows. +type SessionFlowFunc[State, In, Out, Stream any] func( + ctx context.Context, + inputStream <-chan In, + sess *session.Session[State], + cb core.StreamCallback[Stream], +) (Out, error) + +// PersistMode controls when session state is persisted. +type PersistMode int + +const ( + PersistOnClose PersistMode = iota // Persist only when connection closes (default) + PersistOnUpdate // Persist after each input message is processed +) +``` + +**Turn semantics**: The `SessionStreamCallback` includes a `turnDone` parameter. When the agent finishes responding to an input message, it calls `cb(ctx, lastChunk, true)` to signal the turn is complete. This allows clients to know when to prompt for the next user message. + +--- + +## 2. API Surface + +### 2.1 Defining Bidi Actions + +```go +// In go/core/x/bidi.go + +// NewBidiAction creates a BidiAction without registering it. +func NewBidiAction[In, Out, Init, Stream any]( + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiActionDef[In, Out, Init, Stream] + +// DefineBidiAction creates and registers a BidiAction. +func DefineBidiAction[In, Out, Init, Stream any]( + r api.Registry, + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiActionDef[In, Out, Init, Stream] +``` + +Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. + +### 2.2 Defining Bidi Flows + +```go +// In go/core/x/bidi_flow.go + +func DefineBidiFlow[In, Out, Init, Stream any]( + r api.Registry, + name string, + fn BidiFunc[In, Out, Init, Stream], +) *BidiFlow[In, Out, Init, Stream] +``` + +### 2.3 Defining Session Flows + +```go +// In go/core/x/session_flow.go + +func DefineSessionFlow[State, In, Out, Stream any]( + r api.Registry, + name string, + fn SessionFlowFunc[State, In, Out, Stream], + opts ...SessionFlowOption[State], +) *SessionFlow[State, In, Out, Stream] + +// SessionFlowOption configures a SessionFlow. +type SessionFlowOption[State any] interface { + applySessionFlow(*sessionFlowOptions[State]) error +} + +func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] +func WithPersistMode[State any](mode PersistMode) SessionFlowOption[State] +``` + +### 2.4 Starting Connections + +All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` method to start connections: + +```go +// BidiAction/BidiFlow +func (a *BidiActionDef[In, Out, Init, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[Init], +) (*BidiConnection[In, Out, Stream], error) + +// BidiOption for streaming +type BidiOption[Init any] interface { + applyBidi(*bidiOptions[Init]) error +} + +func WithInit[Init any](init Init) BidiOption[Init] + +// SessionFlow uses the same StreamBidi, with Init = State +// Additional option for session ID +func WithSessionID[Init any](id string) BidiOption[Init] + +func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[State], +) (*BidiConnection[In, SessionFlowOutput[State, Out], Stream], error) +``` + +### 2.5 High-Level Genkit API + +```go +// In go/genkit/bidi.go + +func DefineBidiFlow[In, Out, Init, Stream any]( + g *Genkit, + name string, + fn corex.BidiFunc[In, Out, Init, Stream], +) *corex.BidiFlow[In, Out, Init, Stream] + +func DefineSessionFlow[State, In, Out, Stream any]( + g *Genkit, + name string, + fn corex.SessionFlowFunc[State, In, Out, Stream], + opts ...corex.SessionFlowOption[State], +) *corex.SessionFlow[State, In, Out, Stream] +``` + +--- + +## 3. Session Flow Details + +### 3.1 Using StreamBidi with SessionFlow + +SessionFlow uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: + +```go +// Define once at startup +chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + myAgentFunc, + corex.WithSessionStore(store), +) + +// NEW USER: Start fresh session (generates new ID, zero state) +conn1, _ := chatAgent.StreamBidi(ctx) + +// RETURNING USER: Resume existing session by ID +conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState]("user-123-session")) + +// NEW USER WITH INITIAL STATE: Start with pre-populated state +conn3, _ := chatAgent.StreamBidi(ctx, corex.WithInit(ChatState{Messages: preloadedHistory})) + +// NEW USER WITH SPECIFIC ID AND INITIAL STATE +conn4, _ := chatAgent.StreamBidi(ctx, + corex.WithSessionID[ChatState]("custom-session-id"), + corex.WithInit(ChatState{Messages: preloadedHistory}), +) +``` + +The SessionFlow internally handles session creation/loading: +- If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) +- If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit +- If no `WithSessionID` → generate new UUID and create session with initial state from WithInit + +The session ID is returned in `SessionFlowOutput.SessionID`, so callers can retrieve it from the final output: + +```go +output, _ := conn.Output() +sessionID := output.SessionID // Save this to resume later +``` + +### 3.2 State Persistence + +Persistence mode is configurable: + +```go +// Usage: +chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + fn, + corex.WithSessionStore(store), + corex.WithPersistMode(corex.PersistOnUpdate), // or PersistOnClose (default) +) +``` + +- **PersistOnClose** (default): State is persisted only when the connection closes. Better performance. +- **PersistOnUpdate**: State is persisted after each input message is processed. More durable. + +**Note**: `PersistOnUpdate` persists after each input from `inputStream` is processed, not on every `sess.UpdateState()` call. This covers the main use case (persist after each conversation turn) without requiring interface changes to `session.Session`. + +--- + +## 4. Integration with Existing Infrastructure + +### 4.1 Tracing Integration + +BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. + +```go +func (f *BidiFlow[In, Out, Init, Stream]) StreamBidi( + ctx context.Context, + opts ...BidiOption[Init], +) (*BidiConnection[In, Out, Stream], error) { + // Inject flow context + fc := &flowContext{flowName: f.Name()} + ctx = flowContextKey.NewContext(ctx, fc) + + // Start span (NOT RunInNewSpan - we manage lifecycle manually) + spanMeta := &tracing.SpanMetadata{ + Name: f.Name(), + Type: "action", + Subtype: "bidiFlow", + } + ctx, span := tracing.StartSpan(ctx, spanMeta) + + // Create connection, passing span for lifecycle management + conn, err := f.BidiActionDef.streamBidiWithSpan(ctx, span, opts...) + if err != nil { + span.End() // End span on error + return nil, err + } + return conn, nil +} + +// Inside BidiConnection, the span is ended when the action completes: +func (c *BidiConnection[...]) run() { + defer c.span.End() // End span when bidi flow completes + + // Run the action, recording events/nested spans as needed + output, err := c.fn(c.ctx, c.inputCh, c.init, c.streamCallback) + // ... +} +``` + +**Important**: The span stays open while the connection is active, allowing: +- Streaming traces to the Dev UI in real-time +- Nested spans for sub-operations (e.g., each LLM call) +- Events recorded as they happen + +### 4.2 Action Registration + +Add new action type: + +```go +// In go/core/api/action.go +const ( + ActionTypeBidiFlow ActionType = "bidi-flow" +) +``` + +### 4.3 Session Integration + +Use existing `Session` and `Store` types from `go/core/x/session` (remains a separate subpackage): + +```go +import "github.com/firebase/genkit/go/core/x/session" + +// SessionFlow holds reference to session store +type SessionFlow[State, In, Out, Stream any] struct { + store session.Store[State] + // ... +} +``` + +--- + +## 5. Example Usage + +### 5.1 Basic Echo Bidi Flow + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/core" + "github.com/firebase/genkit/go/genkit" +) + +func main() { + ctx := context.Background() + g := genkit.Init(ctx) + + // Define echo bidi flow (low-level, no turn semantics) + echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", + func(ctx context.Context, inputStream <-chan string, init struct{}, cb core.StreamCallback[string]) (string, error) { + var count int + for input := range inputStream { + count++ + if err := cb(ctx, fmt.Sprintf("echo: %s", input)); err != nil { + return "", err + } + } + return fmt.Sprintf("processed %d messages", count), nil + }, + ) + + // Start streaming connection + conn, err := echoFlow.StreamBidi(ctx) + if err != nil { + panic(err) + } + + // Send messages + conn.Send("hello") + conn.Send("world") + conn.Close() + + // Consume stream via iterator + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Println(chunk) // "echo: hello", "echo: world" + } + + // Get final output + output, _ := conn.Output() + fmt.Println(output) // "processed 2 messages" +} +``` + +### 5.2 Chat Agent with Session Persistence + +```go +package main + +import ( + "context" + "fmt" + + "github.com/firebase/genkit/go/ai" + "github.com/firebase/genkit/go/core" + corex "github.com/firebase/genkit/go/core/x" + "github.com/firebase/genkit/go/core/x/session" + "github.com/firebase/genkit/go/genkit" + "github.com/firebase/genkit/go/plugins/googlegenai" +) + +type ChatState struct { + Messages []*ai.Message `json:"messages"` +} + +func main() { + ctx := context.Background() + store := session.NewInMemoryStore[ChatState]() + + g := genkit.Init(ctx, + genkit.WithPlugins(&googlegenai.GoogleAI{}), + genkit.WithDefaultModel("googleai/gemini-2.5-flash"), + ) + + // Define a session flow for multi-turn chat + chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], cb core.StreamCallback[string]) (string, error) { + state := sess.State() + messages := state.Messages + + for userInput := range inputStream { + messages = append(messages, ai.NewUserTextMessage(userInput)) + + var responseText string + for result, err := range genkit.GenerateStream(ctx, g, + ai.WithMessages(messages...), + ) { + if err != nil { + return "", err + } + if result.Done { + responseText = result.Response.Text() + } + cb(ctx, result.Chunk.Text()) + } + // Stream channel closes here when we loop back to wait for next input + + messages = append(messages, ai.NewModelTextMessage(responseText)) + sess.UpdateState(ctx, ChatState{Messages: messages}) + } + + return "conversation ended", nil + }, + corex.WithSessionStore(store), + corex.WithPersistMode(corex.PersistOnClose), + ) + + // Start new session (generates new session ID) + conn, _ := chatAgent.StreamBidi(ctx) + + // First turn + conn.Send("Hello! Tell me about Go programming.") + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + // Loop exits when stream closes (agent finished responding) + + // Second turn - call Stream() again for next response + conn.Send("What are channels used for?") + for chunk, err := range conn.Stream() { + if err != nil { + panic(err) + } + fmt.Print(chunk) + } + + conn.Close() + + // Get session ID from final output to resume later + output, _ := conn.Output() + sessionID := output.SessionID + + // Resume session later with the saved ID + conn2, _ := chatAgent.StreamBidi(ctx, corex.WithSessionID[ChatState](sessionID)) + conn2.Send("Continue our discussion") + // ... +} +``` + +### 5.3 Bidi Flow with Initialization Data + +```go +type ChatInit struct { + SystemPrompt string `json:"systemPrompt"` + Temperature float64 `json:"temperature"` +} + +configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", + func(ctx context.Context, inputStream <-chan string, init ChatInit, cb core.StreamCallback[string]) (string, error) { + // Use init.SystemPrompt and init.Temperature + for input := range inputStream { + resp, _ := genkit.GenerateText(ctx, g, + ai.WithSystem(init.SystemPrompt), + ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), + ai.WithPrompt(input), + ) + cb(ctx, resp) + } + return "done", nil + }, +) + +conn, _ := configuredChat.StreamBidi(ctx, + corex.WithInit(ChatInit{ + SystemPrompt: "You are a helpful assistant.", + Temperature: 0.7, + }), +) +``` + +--- + +## 6. Files to Create/Modify + +### New Files + +| File | Description | +|------|-------------| +| `go/core/x/bidi.go` | BidiActionDef, BidiFunc, BidiConnection | +| `go/core/x/bidi_flow.go` | BidiFlow with tracing | +| `go/core/x/bidi_options.go` | BidiOption types | +| `go/core/x/session_flow.go` | SessionFlow implementation | +| `go/core/x/bidi_test.go` | Tests | +| `go/genkit/bidi.go` | High-level API wrappers | + +### Modified Files + +| File | Change | +|------|--------| +| `go/core/api/action.go` | Add `ActionTypeBidiFlow` constant | + +--- + +## 7. Implementation Notes + +### Error Handling +- Errors from the bidi function propagate to both `Stream()` iterator and `Output()` +- Context cancellation closes all channels and terminates the action +- Send after Close returns an error +- Errors are yielded as the second value in the `iter.Seq2[Stream, error]` iterator + +### Goroutine Management +- BidiConnection spawns a goroutine to run the action +- Proper cleanup on context cancellation using `defer` and `sync.Once` +- Channel closure follows Go idioms (sender closes) +- Trace span is ended in the goroutine's defer + +### Thread Safety +- BidiConnection uses mutex for state (closed flag) +- Send is safe to call from multiple goroutines +- Session operations are thread-safe (from existing session package) + +### Channels and Backpressure +- Both input and output channels are **unbuffered** by default (size 0) +- This provides natural backpressure: `Send()` blocks until agent reads, `cb()` blocks until user consumes +- If needed, a `WithInputBufferSize` option could be added later for specific use cases + +### Iterator Implementation and Turn Semantics +- `Stream()` returns `iter.Seq2[Stream, error]` - a Go 1.23 iterator +- Each call to `Stream()` returns an iterator over a **new channel** for that turn +- When the agent finishes responding (loops back to wait for next input), the stream channel closes +- The user's `for range` loop exits naturally when the channel closes +- Call `Stream()` again after sending the next input to get the next turn's response +- The iterator yields `(chunk, nil)` for each streamed value +- On error, the iterator yields `(zero, err)` and stops + +### Tracing +- Span is started when connection is created, ended when action completes +- Nested spans work normally within the bidi function +- Events can be recorded throughout the connection lifecycle +- Dev UI can show traces in real-time as they stream From 6c6afb7bc687c520e99a2b6bfcb756b97929781d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:48:57 -0800 Subject: [PATCH 02/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 183 +++++++++++++++++++++-------------------- 1 file changed, 93 insertions(+), 90 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 356290526c..2f7a76ad2c 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -14,7 +14,7 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` ``` go/core/x/ -├── bidi.go # BidiActionDef, BidiFunc, BidiConnection +├── bidi.go # BidiAction, BidiFunc, BidiConnection ├── bidi_flow.go # BidiFlow with tracing ├── bidi_options.go # Option types for bidi ├── session_flow.go # SessionFlow implementation @@ -32,13 +32,13 @@ Import as `corex "github.com/firebase/genkit/go/core/x"`. ### 1.1 BidiAction ```go -// BidiActionDef represents a bidirectional streaming action. +// BidiAction represents a bidirectional streaming action. // Type parameters: // - In: Type of each message sent to the action // - Out: Type of the final output // - Init: Type of initialization data (use struct{} if not needed) // - Stream: Type of each streamed output chunk -type BidiActionDef[In, Out, Init, Stream any] struct { +type BidiAction[In, Out, Init, Stream any] struct { name string fn BidiFunc[In, Out, Init, Stream] registry api.Registry @@ -91,15 +91,13 @@ func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ``` -**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a new channel for that turn. When the agent finishes responding to an input (loops back to wait for the next input), the stream channel for that turn closes, causing the user's `for range` loop to exit naturally. Call `Stream()` again after sending the next input to get the next turn's response. +**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a **new channel** created for that turn. When the agent finishes responding (loops back to read the next input), it closes that turn's stream channel, causing the user's `for range` loop to exit naturally. The user then calls `Send()` with the next input and `Stream()` again to get a new iterator for the next turn's responses. ### 1.3 BidiFlow ```go -// BidiFlow wraps a BidiAction with flow semantics (tracing, monitoring). type BidiFlow[In, Out, Init, Stream any] struct { - *BidiActionDef[In, Out, Init, Stream] - // Uses BidiActionDef.Name() for flow name - no separate field needed + *BidiAction[In, Out, Init, Stream] } ``` @@ -108,19 +106,32 @@ type BidiFlow[In, Out, Init, Stream any] struct { SessionFlow adds session state management on top of BidiFlow. ```go +// Artifact represents a named collection of parts produced during a session. +// Examples: generated files, images, code snippets, etc. +type Artifact struct { + Name string `json:"name"` + Parts []*ai.Part `json:"parts"` +} + // SessionFlowOutput wraps the output with session info for persistence. type SessionFlowOutput[State, Out any] struct { - SessionID string `json:"sessionId"` - Output Out `json:"output"` - State State `json:"state"` + SessionID string `json:"sessionId"` + Output Out `json:"output"` + State State `json:"state"` + Artifacts []Artifact `json:"artifacts,omitempty"` } // SessionFlow is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). type SessionFlow[State, In, Out, Stream any] struct { *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] - store session.Store[State] - persistMode PersistMode + store session.Store[State] +} + +// SessionFlowResult is the return type for session flow functions. +type SessionFlowResult[Out any] struct { + Output Out + Artifacts []Artifact } // SessionFlowFunc is the function signature for session flows. @@ -128,20 +139,10 @@ type SessionFlowFunc[State, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, sess *session.Session[State], - cb core.StreamCallback[Stream], -) (Out, error) - -// PersistMode controls when session state is persisted. -type PersistMode int - -const ( - PersistOnClose PersistMode = iota // Persist only when connection closes (default) - PersistOnUpdate // Persist after each input message is processed -) + sendChunk core.StreamCallback[Stream], +) (SessionFlowResult[Out], error) ``` -**Turn semantics**: The `SessionStreamCallback` includes a `turnDone` parameter. When the agent finishes responding to an input message, it calls `cb(ctx, lastChunk, true)` to signal the turn is complete. This allows clients to know when to prompt for the next user message. - --- ## 2. API Surface @@ -155,14 +156,14 @@ const ( func NewBidiAction[In, Out, Init, Stream any]( name string, fn BidiFunc[In, Out, Init, Stream], -) *BidiActionDef[In, Out, Init, Stream] +) *BidiAction[In, Out, Init, Stream] // DefineBidiAction creates and registers a BidiAction. func DefineBidiAction[In, Out, Init, Stream any]( r api.Registry, name string, fn BidiFunc[In, Out, Init, Stream], -) *BidiActionDef[In, Out, Init, Stream] +) *BidiAction[In, Out, Init, Stream] ``` Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. @@ -197,7 +198,6 @@ type SessionFlowOption[State any] interface { } func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] -func WithPersistMode[State any](mode PersistMode) SessionFlowOption[State] ``` ### 2.4 Starting Connections @@ -206,7 +206,7 @@ All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` met ```go // BidiAction/BidiFlow -func (a *BidiActionDef[In, Out, Init, Stream]) StreamBidi( +func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) @@ -292,21 +292,7 @@ sessionID := output.SessionID // Save this to resume later ### 3.2 State Persistence -Persistence mode is configurable: - -```go -// Usage: -chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - fn, - corex.WithSessionStore(store), - corex.WithPersistMode(corex.PersistOnUpdate), // or PersistOnClose (default) -) -``` - -- **PersistOnClose** (default): State is persisted only when the connection closes. Better performance. -- **PersistOnUpdate**: State is persisted after each input message is processed. More durable. - -**Note**: `PersistOnUpdate` persists after each input from `inputStream` is processed, not on every `sess.UpdateState()` call. This covers the main use case (persist after each conversation turn) without requiring interface changes to `session.Session`. +State is persisted automatically when `sess.UpdateState()` is called - the existing `session.Session` implementation already persists to the configured store. No special persistence mode is needed; the user controls when to persist by calling `UpdateState()`. --- @@ -316,41 +302,11 @@ chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "cha BidiFlows create spans that remain open for the lifetime of the connection, enabling streaming trace visualization in the Dev UI. -```go -func (f *BidiFlow[In, Out, Init, Stream]) StreamBidi( - ctx context.Context, - opts ...BidiOption[Init], -) (*BidiConnection[In, Out, Stream], error) { - // Inject flow context - fc := &flowContext{flowName: f.Name()} - ctx = flowContextKey.NewContext(ctx, fc) - - // Start span (NOT RunInNewSpan - we manage lifecycle manually) - spanMeta := &tracing.SpanMetadata{ - Name: f.Name(), - Type: "action", - Subtype: "bidiFlow", - } - ctx, span := tracing.StartSpan(ctx, spanMeta) - - // Create connection, passing span for lifecycle management - conn, err := f.BidiActionDef.streamBidiWithSpan(ctx, span, opts...) - if err != nil { - span.End() // End span on error - return nil, err - } - return conn, nil -} - -// Inside BidiConnection, the span is ended when the action completes: -func (c *BidiConnection[...]) run() { - defer c.span.End() // End span when bidi flow completes - - // Run the action, recording events/nested spans as needed - output, err := c.fn(c.ctx, c.inputCh, c.init, c.streamCallback) - // ... -} -``` +**Key behaviors:** +- Span starts when `StreamBidi()` is called +- Span ends when the bidi function returns (via `defer` in the connection goroutine) +- Flow context is injected so `core.Run()` works inside the bidi function +- Nested spans for sub-operations (e.g., each LLM call) work normally **Important**: The span stays open while the connection is active, allowing: - Streaming traces to the Dev UI in real-time @@ -359,13 +315,20 @@ func (c *BidiConnection[...]) run() { ### 4.2 Action Registration -Add new action type: +Add new action type and schema fields: ```go // In go/core/api/action.go const ( ActionTypeBidiFlow ActionType = "bidi-flow" ) + +// ActionDesc gets two new optional fields +type ActionDesc struct { + // ... existing fields ... + StreamSchema map[string]any `json:"streamSchema,omitempty"` // NEW: schema for streamed chunks + InitSchema map[string]any `json:"initSchema,omitempty"` // NEW: schema for initialization data +} ``` ### 4.3 Session Integration @@ -405,11 +368,11 @@ func main() { // Define echo bidi flow (low-level, no turn semantics) echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", - func(ctx context.Context, inputStream <-chan string, init struct{}, cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { var count int for input := range inputStream { count++ - if err := cb(ctx, fmt.Sprintf("echo: %s", input)); err != nil { + if err := sendChunk(ctx, fmt.Sprintf("echo: %s", input)); err != nil { return "", err } } @@ -474,7 +437,7 @@ func main() { // Define a session flow for multi-turn chat chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.SessionFlowResult[string], error) { state := sess.State() messages := state.Messages @@ -486,12 +449,12 @@ func main() { ai.WithMessages(messages...), ) { if err != nil { - return "", err + return corex.SessionFlowResult[string]{}, err } if result.Done { responseText = result.Response.Text() } - cb(ctx, result.Chunk.Text()) + sendChunk(ctx, result.Chunk.Text()) } // Stream channel closes here when we loop back to wait for next input @@ -499,10 +462,17 @@ func main() { sess.UpdateState(ctx, ChatState{Messages: messages}) } - return "conversation ended", nil + return corex.SessionFlowResult[string]{ + Output: "conversation ended", + Artifacts: []corex.Artifact{ + { + Name: "summary", + Parts: []*ai.Part{ai.NewTextPart("...")}, + }, + }, + }, nil }, corex.WithSessionStore(store), - corex.WithPersistMode(corex.PersistOnClose), ) // Start new session (generates new session ID) @@ -549,7 +519,7 @@ type ChatInit struct { } configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", - func(ctx context.Context, inputStream <-chan string, init ChatInit, cb core.StreamCallback[string]) (string, error) { + func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inputStream { resp, _ := genkit.GenerateText(ctx, g, @@ -557,7 +527,7 @@ configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "co ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), ai.WithPrompt(input), ) - cb(ctx, resp) + sendChunk(ctx, resp) } return "done", nil }, @@ -579,7 +549,7 @@ conn, _ := configuredChat.StreamBidi(ctx, | File | Description | |------|-------------| -| `go/core/x/bidi.go` | BidiActionDef, BidiFunc, BidiConnection | +| `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | | `go/core/x/session_flow.go` | SessionFlow implementation | @@ -615,7 +585,7 @@ conn, _ := configuredChat.StreamBidi(ctx, ### Channels and Backpressure - Both input and output channels are **unbuffered** by default (size 0) -- This provides natural backpressure: `Send()` blocks until agent reads, `cb()` blocks until user consumes +- This provides natural backpressure: `Send()` blocks until agent reads, `sendChunk()` blocks until user consumes - If needed, a `WithInputBufferSize` option could be added later for specific use cases ### Iterator Implementation and Turn Semantics @@ -632,3 +602,36 @@ conn, _ := configuredChat.StreamBidi(ctx, - Nested spans work normally within the bidi function - Events can be recorded throughout the connection lifecycle - Dev UI can show traces in real-time as they stream +- Implementation uses the existing tracer infrastructure (details left to implementation) + +### Shutdown Sequence +When `Close()` is called on a BidiConnection: +1. The input channel is closed, signaling no more inputs +2. The bidi function's `for range inputStream` loop exits +3. The function returns its final output +4. The stream channel is closed +5. The `Done()` channel is closed +6. `Output()` unblocks and returns the result + +On context cancellation: +1. Context error propagates to the bidi function +2. All channels are closed +3. `Output()` returns the context error + +### SessionFlow Internal Wrapping +The user's `SessionFlowFunc` returns `SessionFlowResult[Out]`, but `SessionFlow.StreamBidi()` returns `SessionFlowOutput[State, Out]`. Internally, SessionFlow wraps the user function: + +```go +// Simplified internal logic +result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) +if err != nil { + return SessionFlowOutput[State, Out]{}, err +} +return SessionFlowOutput[State, Out]{ + SessionID: sess.ID(), + Output: result.Output, + State: sess.State(), + Artifacts: result.Artifacts, +}, nil +``` + From e86893ba63356ee3dc0c0065f7419a3dcaecc475 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:56:28 -0800 Subject: [PATCH 03/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 2f7a76ad2c..a9d9e67121 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -173,6 +173,8 @@ Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred f ```go // In go/core/x/bidi_flow.go +// DefineBidiFlow creates a BidiFlow with tracing and registers it. +// Use this for user-defined bidirectional streaming operations. func DefineBidiFlow[In, Out, Init, Stream any]( r api.Registry, name string, @@ -185,6 +187,8 @@ func DefineBidiFlow[In, Out, Init, Stream any]( ```go // In go/core/x/session_flow.go +// DefineSessionFlow creates a SessionFlow with automatic session management and registers it. +// Use this for multi-turn conversational agents that need to persist state across turns. func DefineSessionFlow[State, In, Out, Stream any]( r api.Registry, name string, @@ -197,6 +201,8 @@ type SessionFlowOption[State any] interface { applySessionFlow(*sessionFlowOptions[State]) error } +// WithSessionStore sets the session store for persisting session state. +// If not provided, sessions exist only in memory for the connection lifetime. func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] ``` @@ -211,15 +217,19 @@ func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) -// BidiOption for streaming +// BidiOption configures a bidi connection. type BidiOption[Init any] interface { applyBidi(*bidiOptions[Init]) error } +// WithInit provides initialization data for the bidi action. +// For SessionFlow, this sets the initial state for new sessions. func WithInit[Init any](init Init) BidiOption[Init] -// SessionFlow uses the same StreamBidi, with Init = State -// Additional option for session ID +// WithSessionID specifies an existing session ID to resume. +// If the session exists in the store, it is loaded (WithInit is ignored). +// If the session doesn't exist, a new session is created with this ID. +// If not provided, a new UUID is generated for new sessions. func WithSessionID[Init any](id string) BidiOption[Init] func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( From f594c565cb0532a2d811e5d7815b23ada7d901db Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:58:33 -0800 Subject: [PATCH 04/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index a9d9e67121..38151bdc44 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -15,14 +15,12 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` ``` go/core/x/ ├── bidi.go # BidiAction, BidiFunc, BidiConnection -├── bidi_flow.go # BidiFlow with tracing -├── bidi_options.go # Option types for bidi +├── bidi_flow.go # BidiFlow ├── session_flow.go # SessionFlow implementation +├── option.go # Options ├── bidi_test.go # Tests ``` -High-level wrappers in `go/genkit/bidi.go`. - Import as `corex "github.com/firebase/genkit/go/core/x"`. --- From 05e415e5ae0669220e5f50e439540804d56163bd Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 15:59:27 -0800 Subject: [PATCH 05/12] Apply suggestion from @gemini-code-assist[bot] Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- docs/go-bidi-design.md | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 38151bdc44..2844c8dbac 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -78,9 +78,10 @@ func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. // Each call returns a new iterator over the same underlying channel. // Breaking out of the loop does NOT close the connection - you can call Stream() -// again to continue receiving. The iterator completes when the action finishes. -func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] - +// For multi-turn interactions, this should be called after each Send() to get an +// iterator for that turn's response. Each call provides an iterator over a new +// channel specific to that turn. The iterator completes when the agent is done +// responding for the turn. // Output returns the final output after the action completes. // Blocks until done or context cancelled. func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) From 4121ecca451c563572bd2727b031544f121d247d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 16:01:10 -0800 Subject: [PATCH 06/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 2844c8dbac..38151bdc44 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -78,10 +78,9 @@ func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. // Each call returns a new iterator over the same underlying channel. // Breaking out of the loop does NOT close the connection - you can call Stream() -// For multi-turn interactions, this should be called after each Send() to get an -// iterator for that turn's response. Each call provides an iterator over a new -// channel specific to that turn. The iterator completes when the agent is done -// responding for the turn. +// again to continue receiving. The iterator completes when the action finishes. +func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] + // Output returns the final output after the action completes. // Blocks until done or context cancelled. func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) From ffc923dcf6d5751dbc8d3bf702d625d2d0b4dd90 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 18:53:12 -0800 Subject: [PATCH 07/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 102 ++++++++++++++++++++--------------------- 1 file changed, 51 insertions(+), 51 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 38151bdc44..bbd83fc768 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -6,7 +6,7 @@ This document describes the design for bidirectional streaming features in Genki 1. **BidiAction** - Core primitive for bidirectional operations 2. **BidiFlow** - BidiAction with observability, intended for user definition -3. **SessionFlow** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics +3. **Agent** - Stateful, multi-turn agent interactions with automatic persistence and turn semantics ## Package Location @@ -16,7 +16,7 @@ All bidi types go in `go/core/x/` (experimental), which will move to `go/core/` go/core/x/ ├── bidi.go # BidiAction, BidiFunc, BidiConnection ├── bidi_flow.go # BidiFlow -├── session_flow.go # SessionFlow implementation +├── agent.go # Agent implementation ├── option.go # Options ├── bidi_test.go # Tests ``` @@ -99,9 +99,9 @@ type BidiFlow[In, Out, Init, Stream any] struct { } ``` -### 1.4 SessionFlow +### 1.4 Agent -SessionFlow adds session state management on top of BidiFlow. +Agent adds session state management on top of BidiFlow. ```go // Artifact represents a named collection of parts produced during a session. @@ -111,34 +111,34 @@ type Artifact struct { Parts []*ai.Part `json:"parts"` } -// SessionFlowOutput wraps the output with session info for persistence. -type SessionFlowOutput[State, Out any] struct { +// AgentOutput wraps the output with session info for persistence. +type AgentOutput[State, Out any] struct { SessionID string `json:"sessionId"` Output Out `json:"output"` State State `json:"state"` Artifacts []Artifact `json:"artifacts,omitempty"` } -// SessionFlow is a bidi flow with automatic session state management. +// Agent is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). -type SessionFlow[State, In, Out, Stream any] struct { - *BidiFlow[In, SessionFlowOutput[State, Out], State, Stream] +type Agent[State, In, Out, Stream any] struct { + *BidiFlow[In, AgentOutput[State, Out], State, Stream] store session.Store[State] } -// SessionFlowResult is the return type for session flow functions. -type SessionFlowResult[Out any] struct { +// AgentResult is the return type for agent functions. +type AgentResult[Out any] struct { Output Out Artifacts []Artifact } -// SessionFlowFunc is the function signature for session flows. -type SessionFlowFunc[State, In, Out, Stream any] func( +// AgentFunc is the function signature for agents. +type AgentFunc[State, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, sess *session.Session[State], sendChunk core.StreamCallback[Stream], -) (SessionFlowResult[Out], error) +) (AgentResult[Out], error) ``` --- @@ -180,33 +180,33 @@ func DefineBidiFlow[In, Out, Init, Stream any]( ) *BidiFlow[In, Out, Init, Stream] ``` -### 2.3 Defining Session Flows +### 2.3 Defining Agents ```go -// In go/core/x/session_flow.go +// In go/core/x/agent.go -// DefineSessionFlow creates a SessionFlow with automatic session management and registers it. +// DefineAgent creates an Agent with automatic session management and registers it. // Use this for multi-turn conversational agents that need to persist state across turns. -func DefineSessionFlow[State, In, Out, Stream any]( +func DefineAgent[State, In, Out, Stream any]( r api.Registry, name string, - fn SessionFlowFunc[State, In, Out, Stream], - opts ...SessionFlowOption[State], -) *SessionFlow[State, In, Out, Stream] + fn AgentFunc[State, In, Out, Stream], + opts ...AgentOption[State], +) *Agent[State, In, Out, Stream] -// SessionFlowOption configures a SessionFlow. -type SessionFlowOption[State any] interface { - applySessionFlow(*sessionFlowOptions[State]) error +// AgentOption configures an Agent. +type AgentOption[State any] interface { + applyAgent(*agentOptions[State]) error } // WithSessionStore sets the session store for persisting session state. // If not provided, sessions exist only in memory for the connection lifetime. -func WithSessionStore[State any](store session.Store[State]) SessionFlowOption[State] +func WithSessionStore[State any](store session.Store[State]) AgentOption[State] ``` ### 2.4 Starting Connections -All bidi types (BidiAction, BidiFlow, SessionFlow) use the same `StreamBidi` method to start connections: +All bidi types (BidiAction, BidiFlow, Agent) use the same `StreamBidi` method to start connections: ```go // BidiAction/BidiFlow @@ -221,7 +221,7 @@ type BidiOption[Init any] interface { } // WithInit provides initialization data for the bidi action. -// For SessionFlow, this sets the initial state for new sessions. +// For Agent, this sets the initial state for new sessions. func WithInit[Init any](init Init) BidiOption[Init] // WithSessionID specifies an existing session ID to resume. @@ -230,10 +230,10 @@ func WithInit[Init any](init Init) BidiOption[Init] // If not provided, a new UUID is generated for new sessions. func WithSessionID[Init any](id string) BidiOption[Init] -func (sf *SessionFlow[State, In, Out, Stream]) StreamBidi( +func (a *Agent[State, In, Out, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[State], -) (*BidiConnection[In, SessionFlowOutput[State, Out], Stream], error) +) (*BidiConnection[In, AgentOutput[State, Out], Stream], error) ``` ### 2.5 High-Level Genkit API @@ -247,25 +247,25 @@ func DefineBidiFlow[In, Out, Init, Stream any]( fn corex.BidiFunc[In, Out, Init, Stream], ) *corex.BidiFlow[In, Out, Init, Stream] -func DefineSessionFlow[State, In, Out, Stream any]( +func DefineAgent[State, In, Out, Stream any]( g *Genkit, name string, - fn corex.SessionFlowFunc[State, In, Out, Stream], - opts ...corex.SessionFlowOption[State], -) *corex.SessionFlow[State, In, Out, Stream] + fn corex.AgentFunc[State, In, Out, Stream], + opts ...corex.AgentOption[State], +) *corex.Agent[State, In, Out, Stream] ``` --- -## 3. Session Flow Details +## 3. Agent Details -### 3.1 Using StreamBidi with SessionFlow +### 3.1 Using StreamBidi with Agent -SessionFlow uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: +Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID is a connection option, and initial state is passed via `WithInit`: ```go // Define once at startup -chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", +chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", myAgentFunc, corex.WithSessionStore(store), ) @@ -286,12 +286,12 @@ conn4, _ := chatAgent.StreamBidi(ctx, ) ``` -The SessionFlow internally handles session creation/loading: +The Agent internally handles session creation/loading: - If `WithSessionID` is provided and session exists in store → load existing session (WithInit ignored) - If `WithSessionID` is provided but session doesn't exist → create new session with that ID and initial state from WithInit - If no `WithSessionID` → generate new UUID and create session with initial state from WithInit -The session ID is returned in `SessionFlowOutput.SessionID`, so callers can retrieve it from the final output: +The session ID is returned in `AgentOutput.SessionID`, so callers can retrieve it from the final output: ```go output, _ := conn.Output() @@ -346,8 +346,8 @@ Use existing `Session` and `Store` types from `go/core/x/session` (remains a sep ```go import "github.com/firebase/genkit/go/core/x/session" -// SessionFlow holds reference to session store -type SessionFlow[State, In, Out, Stream any] struct { +// Agent holds reference to session store +type Agent[State, In, Out, Stream any] struct { store session.Store[State] // ... } @@ -443,9 +443,9 @@ func main() { genkit.WithDefaultModel("googleai/gemini-2.5-flash"), ) - // Define a session flow for multi-turn chat - chatAgent := genkit.DefineSessionFlow[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.SessionFlowResult[string], error) { + // Define an agent for multi-turn chat + chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", + func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages @@ -457,7 +457,7 @@ func main() { ai.WithMessages(messages...), ) { if err != nil { - return corex.SessionFlowResult[string]{}, err + return corex.AgentResult[string]{}, err } if result.Done { responseText = result.Response.Text() @@ -470,7 +470,7 @@ func main() { sess.UpdateState(ctx, ChatState{Messages: messages}) } - return corex.SessionFlowResult[string]{ + return corex.AgentResult[string]{ Output: "conversation ended", Artifacts: []corex.Artifact{ { @@ -560,7 +560,7 @@ conn, _ := configuredChat.StreamBidi(ctx, | `go/core/x/bidi.go` | BidiAction, BidiFunc, BidiConnection | | `go/core/x/bidi_flow.go` | BidiFlow with tracing | | `go/core/x/bidi_options.go` | BidiOption types | -| `go/core/x/session_flow.go` | SessionFlow implementation | +| `go/core/x/agent.go` | Agent implementation | | `go/core/x/bidi_test.go` | Tests | | `go/genkit/bidi.go` | High-level API wrappers | @@ -626,16 +626,16 @@ On context cancellation: 2. All channels are closed 3. `Output()` returns the context error -### SessionFlow Internal Wrapping -The user's `SessionFlowFunc` returns `SessionFlowResult[Out]`, but `SessionFlow.StreamBidi()` returns `SessionFlowOutput[State, Out]`. Internally, SessionFlow wraps the user function: +### Agent Internal Wrapping +The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` returns `AgentOutput[State, Out]`. Internally, Agent wraps the user function: ```go // Simplified internal logic result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) if err != nil { - return SessionFlowOutput[State, Out]{}, err + return AgentOutput[State, Out]{}, err } -return SessionFlowOutput[State, Out]{ +return AgentOutput[State, Out]{ SessionID: sess.ID(), Output: result.Output, State: sess.State(), From 64e1f4c36c6205068e2be6703f2b87a44fb501ff Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Fri, 16 Jan 2026 18:59:08 -0800 Subject: [PATCH 08/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 44 +++++++++++++++++++++--------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index bbd83fc768..4b56e3e7c6 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -32,19 +32,19 @@ Import as `corex "github.com/firebase/genkit/go/core/x"`. ```go // BidiAction represents a bidirectional streaming action. // Type parameters: +// - Init: Type of initialization data (use struct{} if not needed) // - In: Type of each message sent to the action // - Out: Type of the final output -// - Init: Type of initialization data (use struct{} if not needed) // - Stream: Type of each streamed output chunk -type BidiAction[In, Out, Init, Stream any] struct { +type BidiAction[Init, In, Out, Stream any] struct { name string - fn BidiFunc[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream] registry api.Registry desc *api.ActionDesc } // BidiFunc is the function signature for bidi actions. -type BidiFunc[In, Out, Init, Stream any] func( +type BidiFunc[Init, In, Out, Stream any] func( ctx context.Context, inputStream <-chan In, init Init, @@ -94,8 +94,8 @@ func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ### 1.3 BidiFlow ```go -type BidiFlow[In, Out, Init, Stream any] struct { - *BidiAction[In, Out, Init, Stream] +type BidiFlow[Init, In, Out, Stream any] struct { + *BidiAction[Init, In, Out, Stream] } ``` @@ -122,7 +122,7 @@ type AgentOutput[State, Out any] struct { // Agent is a bidi flow with automatic session state management. // Init = State: the initial state for new sessions (ignored when resuming an existing session). type Agent[State, In, Out, Stream any] struct { - *BidiFlow[In, AgentOutput[State, Out], State, Stream] + *BidiFlow[State, In, AgentOutput[State, Out], Stream] store session.Store[State] } @@ -151,17 +151,17 @@ type AgentFunc[State, In, Out, Stream any] func( // In go/core/x/bidi.go // NewBidiAction creates a BidiAction without registering it. -func NewBidiAction[In, Out, Init, Stream any]( +func NewBidiAction[Init, In, Out, Stream any]( name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiAction[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] // DefineBidiAction creates and registers a BidiAction. -func DefineBidiAction[In, Out, Init, Stream any]( +func DefineBidiAction[Init, In, Out, Stream any]( r api.Registry, name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiAction[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiAction[Init, In, Out, Stream] ``` Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred from the type parameters using the existing JSON schema inference in `go/internal/base/json.go`. @@ -173,11 +173,11 @@ Schemas for `In`, `Out`, `Init`, and `Stream` types are automatically inferred f // DefineBidiFlow creates a BidiFlow with tracing and registers it. // Use this for user-defined bidirectional streaming operations. -func DefineBidiFlow[In, Out, Init, Stream any]( +func DefineBidiFlow[Init, In, Out, Stream any]( r api.Registry, name string, - fn BidiFunc[In, Out, Init, Stream], -) *BidiFlow[In, Out, Init, Stream] + fn BidiFunc[Init, In, Out, Stream], +) *BidiFlow[Init, In, Out, Stream] ``` ### 2.3 Defining Agents @@ -210,7 +210,7 @@ All bidi types (BidiAction, BidiFlow, Agent) use the same `StreamBidi` method to ```go // BidiAction/BidiFlow -func (a *BidiAction[In, Out, Init, Stream]) StreamBidi( +func (a *BidiAction[Init, In, Out, Stream]) StreamBidi( ctx context.Context, opts ...BidiOption[Init], ) (*BidiConnection[In, Out, Stream], error) @@ -241,11 +241,11 @@ func (a *Agent[State, In, Out, Stream]) StreamBidi( ```go // In go/genkit/bidi.go -func DefineBidiFlow[In, Out, Init, Stream any]( +func DefineBidiFlow[Init, In, Out, Stream any]( g *Genkit, name string, - fn corex.BidiFunc[In, Out, Init, Stream], -) *corex.BidiFlow[In, Out, Init, Stream] + fn corex.BidiFunc[Init, In, Out, Stream], +) *corex.BidiFlow[Init, In, Out, Stream] func DefineAgent[State, In, Out, Stream any]( g *Genkit, @@ -375,7 +375,7 @@ func main() { g := genkit.Init(ctx) // Define echo bidi flow (low-level, no turn semantics) - echoFlow := genkit.DefineBidiFlow[string, string, struct{}, string](g, "echo", + echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { var count int for input := range inputStream { @@ -526,7 +526,7 @@ type ChatInit struct { Temperature float64 `json:"temperature"` } -configuredChat := genkit.DefineBidiFlow[string, string, ChatInit, string](g, "configuredChat", +configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inputStream { From 8a8f19337d295df86985e01a131aa9b91247608d Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:08:45 -0800 Subject: [PATCH 09/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 121 +++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 42 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4b56e3e7c6..6a3c69bc0b 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -46,9 +46,9 @@ type BidiAction[Init, In, Out, Stream any] struct { // BidiFunc is the function signature for bidi actions. type BidiFunc[Init, In, Out, Stream any] func( ctx context.Context, - inputStream <-chan In, init Init, - streamCallback core.StreamCallback[Stream], + inCh <-chan In, + outCh chan<- Stream, ) (Out, error) ``` @@ -76,9 +76,9 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error func (c *BidiConnection[In, Out, Stream]) Close() error // Stream returns an iterator for receiving streamed chunks. -// Each call returns a new iterator over the same underlying channel. -// Breaking out of the loop does NOT close the connection - you can call Stream() -// again to continue receiving. The iterator completes when the action finishes. +// For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing +// multi-turn conversations. Call Stream() again after Send() for the next turn. +// The iterator completes permanently when the action finishes. func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] // Output returns the final output after the action completes. @@ -89,8 +89,6 @@ func (c *BidiConnection[In, Out, Stream]) Output() (Out, error) func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{} ``` -**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a **new channel** created for that turn. When the agent finishes responding (loops back to read the next input), it closes that turn's stream channel, causing the user's `for range` loop to exit naturally. The user then calls `Send()` with the next input and `Stream()` again to get a new iterator for the next turn's responses. - ### 1.3 BidiFlow ```go @@ -99,9 +97,27 @@ type BidiFlow[Init, In, Out, Stream any] struct { } ``` -### 1.4 Agent +### 1.4 Responder + +`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries. + +```go +// Responder wraps the output channel with turn signaling for multi-turn agents. +type Responder[T any] struct { + ch chan<- streamChunk[T] // internal, unexported +} + +// Send sends a streamed chunk to the consumer. +func (r *Responder[T]) Send(data T) + +// EndTurn signals that the agent has finished responding to the current input. +// The consumer's Stream() iterator will exit, allowing them to send the next input. +func (r *Responder[T]) EndTurn() +``` + +### 1.5 Agent -Agent adds session state management on top of BidiFlow. +Agent adds session state management on top of BidiFlow with turn semantics. ```go // Artifact represents a named collection of parts produced during a session. @@ -135,9 +151,9 @@ type AgentResult[Out any] struct { // AgentFunc is the function signature for agents. type AgentFunc[State, In, Out, Stream any] func( ctx context.Context, - inputStream <-chan In, sess *session.Session[State], - sendChunk core.StreamCallback[Stream], + inCh <-chan In, + resp *Responder[Stream], ) (AgentResult[Out], error) ``` @@ -366,7 +382,6 @@ import ( "context" "fmt" - "github.com/firebase/genkit/go/core" "github.com/firebase/genkit/go/genkit" ) @@ -376,13 +391,11 @@ func main() { // Define echo bidi flow (low-level, no turn semantics) echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", - func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) { + func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int - for input := range inputStream { + for input := range inCh { count++ - if err := sendChunk(ctx, fmt.Sprintf("echo: %s", input)); err != nil { - return "", err - } + outCh <- fmt.Sprintf("echo: %s", input) } return fmt.Sprintf("processed %d messages", count), nil }, @@ -423,7 +436,6 @@ import ( "fmt" "github.com/firebase/genkit/go/ai" - "github.com/firebase/genkit/go/core" corex "github.com/firebase/genkit/go/core/x" "github.com/firebase/genkit/go/core/x/session" "github.com/firebase/genkit/go/genkit" @@ -445,14 +457,14 @@ func main() { // Define an agent for multi-turn chat chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", - func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.AgentResult[string], error) { + func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages - for userInput := range inputStream { + for userInput := range inCh { messages = append(messages, ai.NewUserTextMessage(userInput)) - var responseText string + var respText string for result, err := range genkit.GenerateStream(ctx, g, ai.WithMessages(messages...), ) { @@ -460,13 +472,13 @@ func main() { return corex.AgentResult[string]{}, err } if result.Done { - responseText = result.Response.Text() + respText = result.Response.Text() } - sendChunk(ctx, result.Chunk.Text()) + resp.Send(result.Chunk.Text()) } - // Stream channel closes here when we loop back to wait for next input + resp.EndTurn() // Signal turn complete, consumer's Stream() exits - messages = append(messages, ai.NewModelTextMessage(responseText)) + messages = append(messages, ai.NewModelTextMessage(respText)) sess.UpdateState(ctx, ChatState{Messages: messages}) } @@ -494,9 +506,9 @@ func main() { } fmt.Print(chunk) } - // Loop exits when stream closes (agent finished responding) + // Loop exits when agent calls resp.EndTurn() - // Second turn - call Stream() again for next response + // Second turn conn.Send("What are channels used for?") for chunk, err := range conn.Stream() { if err != nil { @@ -527,15 +539,15 @@ type ChatInit struct { } configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", - func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) { + func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { // Use init.SystemPrompt and init.Temperature - for input := range inputStream { + for input := range inCh { resp, _ := genkit.GenerateText(ctx, g, ai.WithSystem(init.SystemPrompt), ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}), ai.WithPrompt(input), ) - sendChunk(ctx, resp) + outCh <- resp } return "done", nil }, @@ -593,17 +605,42 @@ conn, _ := configuredChat.StreamBidi(ctx, ### Channels and Backpressure - Both input and output channels are **unbuffered** by default (size 0) -- This provides natural backpressure: `Send()` blocks until agent reads, `sendChunk()` blocks until user consumes -- If needed, a `WithInputBufferSize` option could be added later for specific use cases - -### Iterator Implementation and Turn Semantics -- `Stream()` returns `iter.Seq2[Stream, error]` - a Go 1.23 iterator -- Each call to `Stream()` returns an iterator over a **new channel** for that turn -- When the agent finishes responding (loops back to wait for next input), the stream channel closes -- The user's `for range` loop exits naturally when the channel closes -- Call `Stream()` again after sending the next input to get the next turn's response -- The iterator yields `(chunk, nil)` for each streamed value -- On error, the iterator yields `(zero, err)` and stops +- This provides natural backpressure: `Send()` blocks until agent reads, `resp.Send()` blocks until consumer reads +- If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases + +### Turn Signaling (Agents) + +For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next. + +**How it works internally:** + +1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type) +2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) +3. `resp.Send(data)` sends `streamChunk{data: data}` +4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` +5. `conn.Stream()` unwraps chunks, yielding only the data +6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding + +**From the agent's perspective:** +```go +for input := range inCh { + resp.Send("partial...") + resp.Send("more...") + resp.EndTurn() // Consumer's for loop exits here +} +``` + +**From the consumer's perspective:** +```go +conn.Send("question") +for chunk, err := range conn.Stream() { + fmt.Print(chunk) // Just gets string, not streamChunk +} +// Loop exited because agent called EndTurn() + +conn.Send("follow-up") +for chunk, err := range conn.Stream() { ... } +``` ### Tracing - Span is started when connection is created, ended when action completes @@ -631,7 +668,7 @@ The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` retu ```go // Simplified internal logic -result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk) +result, err := userFunc(ctx, wrappedInCh, outCh, sess) if err != nil { return AgentOutput[State, Out]{}, err } From 899e9f28fd63a4e1c360f2fa6c1e4a92c9964583 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:24:19 -0800 Subject: [PATCH 10/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 6a3c69bc0b..4e2a7bf4e4 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -281,7 +281,7 @@ Agent uses the same `StreamBidi` method as BidiAction and BidiFlow. Session ID i ```go // Define once at startup -chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", +chatAgent := genkit.DefineAgent(g, "chatAgent", myAgentFunc, corex.WithSessionStore(store), ) @@ -390,7 +390,7 @@ func main() { g := genkit.Init(ctx) // Define echo bidi flow (low-level, no turn semantics) - echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo", + echoFlow := genkit.DefineBidiFlow(g, "echo", func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) { var count int for input := range inCh { @@ -456,7 +456,7 @@ func main() { ) // Define an agent for multi-turn chat - chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent", + chatAgent := genkit.DefineAgent(g, "chatAgent", func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) { state := sess.State() messages := state.Messages @@ -538,7 +538,7 @@ type ChatInit struct { Temperature float64 `json:"temperature"` } -configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat", +configuredChat := genkit.DefineBidiFlow(g, "configuredChat", func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) { // Use init.SystemPrompt and init.Temperature for input := range inCh { From 157a5b1cbb1c2a4efd9cfb83639062a1df424d04 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:38:51 -0800 Subject: [PATCH 11/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4e2a7bf4e4..4d1dddee25 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -75,11 +75,11 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error // Close signals that no more inputs will be sent. func (c *BidiConnection[In, Out, Stream]) Close() error -// Stream returns an iterator for receiving streamed chunks. +// Responses returns an iterator for receiving streamed response chunks. // For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing -// multi-turn conversations. Call Stream() again after Send() for the next turn. +// multi-turn conversations. Call Responses() again after Send() for the next turn. // The iterator completes permanently when the action finishes. -func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error] +func (c *BidiConnection[In, Out, Stream]) Responses() iter.Seq2[Stream, error] // Output returns the final output after the action completes. // Blocks until done or context cancelled. @@ -413,7 +413,7 @@ func main() { conn.Close() // Consume stream via iterator - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -500,7 +500,7 @@ func main() { // First turn conn.Send("Hello! Tell me about Go programming.") - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -510,7 +510,7 @@ func main() { // Second turn conn.Send("What are channels used for?") - for chunk, err := range conn.Stream() { + for chunk, err := range conn.Responses() { if err != nil { panic(err) } @@ -618,7 +618,7 @@ For multi-turn conversations, the consumer needs to know when the agent has fini 2. `streamChunk` has `data T` and `endTurn bool` fields (unexported) 3. `resp.Send(data)` sends `streamChunk{data: data}` 4. `resp.EndTurn()` sends `streamChunk{endTurn: true}` -5. `conn.Stream()` unwraps chunks, yielding only the data +5. `conn.Responses()` unwraps chunks, yielding only the data 6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding **From the agent's perspective:** @@ -633,13 +633,13 @@ for input := range inCh { **From the consumer's perspective:** ```go conn.Send("question") -for chunk, err := range conn.Stream() { +for chunk, err := range conn.Responses() { fmt.Print(chunk) // Just gets string, not streamChunk } // Loop exited because agent called EndTurn() conn.Send("follow-up") -for chunk, err := range conn.Stream() { ... } +for chunk, err := range conn.Responses() { ... } ``` ### Tracing From 6c160e5aeb0181e275281e6584ecf08b1eb4bae3 Mon Sep 17 00:00:00 2001 From: Alex Pascal Date: Sat, 17 Jan 2026 19:39:17 -0800 Subject: [PATCH 12/12] Update go-bidi-design.md --- docs/go-bidi-design.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/go-bidi-design.md b/docs/go-bidi-design.md index 4d1dddee25..40259cb6a3 100644 --- a/docs/go-bidi-design.md +++ b/docs/go-bidi-design.md @@ -461,8 +461,8 @@ func main() { state := sess.State() messages := state.Messages - for userInput := range inCh { - messages = append(messages, ai.NewUserTextMessage(userInput)) + for input := range inCh { + messages = append(messages, ai.NewUserTextMessage(input)) var respText string for result, err := range genkit.GenerateStream(ctx, g,