Skip to content

Commit 8a8f193

Browse files
committed
Update go-bidi-design.md
1 parent 64e1f4c commit 8a8f193

File tree

1 file changed

+79
-42
lines changed

1 file changed

+79
-42
lines changed

docs/go-bidi-design.md

Lines changed: 79 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -46,9 +46,9 @@ type BidiAction[Init, In, Out, Stream any] struct {
4646
// BidiFunc is the function signature for bidi actions.
4747
type BidiFunc[Init, In, Out, Stream any] func(
4848
ctx context.Context,
49-
inputStream <-chan In,
5049
init Init,
51-
streamCallback core.StreamCallback[Stream],
50+
inCh <-chan In,
51+
outCh chan<- Stream,
5252
) (Out, error)
5353
```
5454

@@ -76,9 +76,9 @@ func (c *BidiConnection[In, Out, Stream]) Send(input In) error
7676
func (c *BidiConnection[In, Out, Stream]) Close() error
7777

7878
// Stream returns an iterator for receiving streamed chunks.
79-
// Each call returns a new iterator over the same underlying channel.
80-
// Breaking out of the loop does NOT close the connection - you can call Stream()
81-
// again to continue receiving. The iterator completes when the action finishes.
79+
// For Agents, the iterator exits when the agent calls resp.EndTurn(), allowing
80+
// multi-turn conversations. Call Stream() again after Send() for the next turn.
81+
// The iterator completes permanently when the action finishes.
8282
func (c *BidiConnection[In, Out, Stream]) Stream() iter.Seq2[Stream, error]
8383

8484
// Output returns the final output after the action completes.
@@ -89,8 +89,6 @@ func (c *BidiConnection[In, Out, Stream]) Output() (Out, error)
8989
func (c *BidiConnection[In, Out, Stream]) Done() <-chan struct{}
9090
```
9191

92-
**Why iterators work for multi-turn:** Each call to `Stream()` returns an iterator over a **new channel** created for that turn. When the agent finishes responding (loops back to read the next input), it closes that turn's stream channel, causing the user's `for range` loop to exit naturally. The user then calls `Send()` with the next input and `Stream()` again to get a new iterator for the next turn's responses.
93-
9492
### 1.3 BidiFlow
9593

9694
```go
@@ -99,9 +97,27 @@ type BidiFlow[Init, In, Out, Stream any] struct {
9997
}
10098
```
10199

102-
### 1.4 Agent
100+
### 1.4 Responder
101+
102+
`Responder` wraps the output channel for agents, providing methods to send data and signal turn boundaries.
103+
104+
```go
105+
// Responder wraps the output channel with turn signaling for multi-turn agents.
106+
type Responder[T any] struct {
107+
ch chan<- streamChunk[T] // internal, unexported
108+
}
109+
110+
// Send sends a streamed chunk to the consumer.
111+
func (r *Responder[T]) Send(data T)
112+
113+
// EndTurn signals that the agent has finished responding to the current input.
114+
// The consumer's Stream() iterator will exit, allowing them to send the next input.
115+
func (r *Responder[T]) EndTurn()
116+
```
117+
118+
### 1.5 Agent
103119

104-
Agent adds session state management on top of BidiFlow.
120+
Agent adds session state management on top of BidiFlow with turn semantics.
105121

106122
```go
107123
// Artifact represents a named collection of parts produced during a session.
@@ -135,9 +151,9 @@ type AgentResult[Out any] struct {
135151
// AgentFunc is the function signature for agents.
136152
type AgentFunc[State, In, Out, Stream any] func(
137153
ctx context.Context,
138-
inputStream <-chan In,
139154
sess *session.Session[State],
140-
sendChunk core.StreamCallback[Stream],
155+
inCh <-chan In,
156+
resp *Responder[Stream],
141157
) (AgentResult[Out], error)
142158
```
143159

@@ -366,7 +382,6 @@ import (
366382
"context"
367383
"fmt"
368384

369-
"github.com/firebase/genkit/go/core"
370385
"github.com/firebase/genkit/go/genkit"
371386
)
372387

@@ -376,13 +391,11 @@ func main() {
376391

377392
// Define echo bidi flow (low-level, no turn semantics)
378393
echoFlow := genkit.DefineBidiFlow[struct{}, string, string, string](g, "echo",
379-
func(ctx context.Context, inputStream <-chan string, init struct{}, sendChunk core.StreamCallback[string]) (string, error) {
394+
func(ctx context.Context, init struct{}, inCh <-chan string, outCh chan<- string) (string, error) {
380395
var count int
381-
for input := range inputStream {
396+
for input := range inCh {
382397
count++
383-
if err := sendChunk(ctx, fmt.Sprintf("echo: %s", input)); err != nil {
384-
return "", err
385-
}
398+
outCh <- fmt.Sprintf("echo: %s", input)
386399
}
387400
return fmt.Sprintf("processed %d messages", count), nil
388401
},
@@ -423,7 +436,6 @@ import (
423436
"fmt"
424437

425438
"github.com/firebase/genkit/go/ai"
426-
"github.com/firebase/genkit/go/core"
427439
corex "github.com/firebase/genkit/go/core/x"
428440
"github.com/firebase/genkit/go/core/x/session"
429441
"github.com/firebase/genkit/go/genkit"
@@ -445,28 +457,28 @@ func main() {
445457

446458
// Define an agent for multi-turn chat
447459
chatAgent := genkit.DefineAgent[ChatState, string, string, string](g, "chatAgent",
448-
func(ctx context.Context, inputStream <-chan string, sess *session.Session[ChatState], sendChunk core.StreamCallback[string]) (corex.AgentResult[string], error) {
460+
func(ctx context.Context, sess *session.Session[ChatState], inCh <-chan string, resp *corex.Responder[string]) (corex.AgentResult[string], error) {
449461
state := sess.State()
450462
messages := state.Messages
451463

452-
for userInput := range inputStream {
464+
for userInput := range inCh {
453465
messages = append(messages, ai.NewUserTextMessage(userInput))
454466

455-
var responseText string
467+
var respText string
456468
for result, err := range genkit.GenerateStream(ctx, g,
457469
ai.WithMessages(messages...),
458470
) {
459471
if err != nil {
460472
return corex.AgentResult[string]{}, err
461473
}
462474
if result.Done {
463-
responseText = result.Response.Text()
475+
respText = result.Response.Text()
464476
}
465-
sendChunk(ctx, result.Chunk.Text())
477+
resp.Send(result.Chunk.Text())
466478
}
467-
// Stream channel closes here when we loop back to wait for next input
479+
resp.EndTurn() // Signal turn complete, consumer's Stream() exits
468480

469-
messages = append(messages, ai.NewModelTextMessage(responseText))
481+
messages = append(messages, ai.NewModelTextMessage(respText))
470482
sess.UpdateState(ctx, ChatState{Messages: messages})
471483
}
472484

@@ -494,9 +506,9 @@ func main() {
494506
}
495507
fmt.Print(chunk)
496508
}
497-
// Loop exits when stream closes (agent finished responding)
509+
// Loop exits when agent calls resp.EndTurn()
498510

499-
// Second turn - call Stream() again for next response
511+
// Second turn
500512
conn.Send("What are channels used for?")
501513
for chunk, err := range conn.Stream() {
502514
if err != nil {
@@ -527,15 +539,15 @@ type ChatInit struct {
527539
}
528540

529541
configuredChat := genkit.DefineBidiFlow[ChatInit, string, string, string](g, "configuredChat",
530-
func(ctx context.Context, inputStream <-chan string, init ChatInit, sendChunk core.StreamCallback[string]) (string, error) {
542+
func(ctx context.Context, init ChatInit, inCh <-chan string, outCh chan<- string) (string, error) {
531543
// Use init.SystemPrompt and init.Temperature
532-
for input := range inputStream {
544+
for input := range inCh {
533545
resp, _ := genkit.GenerateText(ctx, g,
534546
ai.WithSystem(init.SystemPrompt),
535547
ai.WithConfig(&genai.GenerateContentConfig{Temperature: &init.Temperature}),
536548
ai.WithPrompt(input),
537549
)
538-
sendChunk(ctx, resp)
550+
outCh <- resp
539551
}
540552
return "done", nil
541553
},
@@ -593,17 +605,42 @@ conn, _ := configuredChat.StreamBidi(ctx,
593605

594606
### Channels and Backpressure
595607
- Both input and output channels are **unbuffered** by default (size 0)
596-
- This provides natural backpressure: `Send()` blocks until agent reads, `sendChunk()` blocks until user consumes
597-
- If needed, a `WithInputBufferSize` option could be added later for specific use cases
598-
599-
### Iterator Implementation and Turn Semantics
600-
- `Stream()` returns `iter.Seq2[Stream, error]` - a Go 1.23 iterator
601-
- Each call to `Stream()` returns an iterator over a **new channel** for that turn
602-
- When the agent finishes responding (loops back to wait for next input), the stream channel closes
603-
- The user's `for range` loop exits naturally when the channel closes
604-
- Call `Stream()` again after sending the next input to get the next turn's response
605-
- The iterator yields `(chunk, nil)` for each streamed value
606-
- On error, the iterator yields `(zero, err)` and stops
608+
- This provides natural backpressure: `Send()` blocks until agent reads, `resp.Send()` blocks until consumer reads
609+
- If needed, `WithInputBufferSize` / `WithOutputBufferSize` options could be added later for specific use cases
610+
611+
### Turn Signaling (Agents)
612+
613+
For multi-turn conversations, the consumer needs to know when the agent has finished responding to one input and is ready for the next.
614+
615+
**How it works internally:**
616+
617+
1. `BidiConnection.streamCh` is actually `chan streamChunk[Stream]` (internal type)
618+
2. `streamChunk` has `data T` and `endTurn bool` fields (unexported)
619+
3. `resp.Send(data)` sends `streamChunk{data: data}`
620+
4. `resp.EndTurn()` sends `streamChunk{endTurn: true}`
621+
5. `conn.Stream()` unwraps chunks, yielding only the data
622+
6. When `Stream()` sees `endTurn: true`, it exits the iterator without yielding
623+
624+
**From the agent's perspective:**
625+
```go
626+
for input := range inCh {
627+
resp.Send("partial...")
628+
resp.Send("more...")
629+
resp.EndTurn() // Consumer's for loop exits here
630+
}
631+
```
632+
633+
**From the consumer's perspective:**
634+
```go
635+
conn.Send("question")
636+
for chunk, err := range conn.Stream() {
637+
fmt.Print(chunk) // Just gets string, not streamChunk
638+
}
639+
// Loop exited because agent called EndTurn()
640+
641+
conn.Send("follow-up")
642+
for chunk, err := range conn.Stream() { ... }
643+
```
607644

608645
### Tracing
609646
- Span is started when connection is created, ended when action completes
@@ -631,7 +668,7 @@ The user's `AgentFunc` returns `AgentResult[Out]`, but `Agent.StreamBidi()` retu
631668

632669
```go
633670
// Simplified internal logic
634-
result, err := userFunc(ctx, wrappedInputStream, sess, sendChunk)
671+
result, err := userFunc(ctx, wrappedInCh, outCh, sess)
635672
if err != nil {
636673
return AgentOutput[State, Out]{}, err
637674
}

0 commit comments

Comments
 (0)