Skip to content

Conversation

@apascal07
Copy link
Collaborator

@apascal07 apascal07 commented Jan 18, 2026

Adds genkitx.DefineStreamingFlow (experimental) as an alternative to the callback-based genkit.DefineStreamingFlow. The channel-based API is more idiomatic Go for streaming use cases and will replace the callback-based version in the next major version release.

Usage

import genkitx "github.com/firebase/genkit/go/genkit/x"

jokeFlow := genkitx.DefineStreamingFlow(g, "jokeFlow",
    func(ctx context.Context, topic string, streamCh chan<- string) (string, error) {
        for chunk, err := range genkit.GenerateStream(ctx, g,
            ai.WithPrompt("Tell me a joke about %s.", topic),
        ) {
            if err != nil {
                return "", err
            }
            select {
            case streamCh <- chunk.Text():
            case <-ctx.Done():
                return "", ctx.Err()
            }
        }
        return "done", nil
    })

Changes

  • genkit/x/genkit.go: New DefineStreamingFlow function with channel-based signature
  • core/flow.go: Fix panic when consumer breaks out of streaming loop early

Checklist (if applicable):

@gemini-code-assist
Copy link
Contributor

Summary of Changes

Hello @apascal07, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request enhances the Genkit framework by introducing a new, more Go-idiomatic channel-based API for defining streaming flows. This provides developers with a familiar and efficient way to handle streaming data, complementing the existing callback-based mechanism. Additionally, it includes a crucial bug fix to gracefully manage early termination by streaming consumers, preventing potential panics and improving overall stability.

Highlights

  • New Channel-Based Streaming Flow API: Introduced x.DefineStreamingFlow as an alternative to the existing callback-based genkit.DefineStreamingFlow. This new API leverages Go channels for streaming, offering a more idiomatic approach for Go developers.
  • Improved Streaming Consumer Termination Handling: Implemented a fix in core/flow.go to prevent a panic that could occur when a streaming consumer breaks out of a loop early, ensuring more robust error handling during streaming operations.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new channel-based streaming flow API, x.DefineStreamingFlow, which provides a more idiomatic Go approach for streaming use cases compared to the existing callback-based API. The implementation is well-structured, and the accompanying tests are thorough, covering various edge cases like early consumer termination. The fix in core/flow.go to prevent a panic on early loop exit is also a good improvement. I have one suggestion to prevent a potential goroutine leak in the non-streaming path. Overall, this is a valuable and well-implemented feature.

Comment on lines +95 to +97
output, err := fn(ctx, input, discardCh)
close(discardCh)
return output, err
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In the non-streaming case (sendChunk == nil), if the user-provided function fn panics, close(discardCh) will not be called. This will cause the goroutine that ranges over discardCh to leak because it will block forever.

Using defer ensures that the channel is closed even if a panic occurs, thus preventing the goroutine leak.

Suggested change
output, err := fn(ctx, input, discardCh)
close(discardCh)
return output, err
defer close(discardCh)
return fn(ctx, input, discardCh)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

1 participant