Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion py/packages/genkit/src/genkit/ai/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
from genkit.core.action.types import ActionKind

from ._aio import Genkit
from ._plugin import Plugin
from ._plugin import Plugin, PluginV2
from ._registry import FlowWrapper, GenkitRegistry

__all__ = [
Expand All @@ -47,6 +47,7 @@
GenkitRegistry.__name__,
Genkit.__name__,
Plugin.__name__,
PluginV2.__name__,
ToolRunContext.__name__,
tool_response.__name__,
FlowWrapper.__name__,
Expand Down
61 changes: 58 additions & 3 deletions py/packages/genkit/src/genkit/ai/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""Base/shared implementation for Genkit user-facing API."""

import asyncio
import inspect
import os
import threading
from collections.abc import Coroutine
Expand All @@ -28,11 +29,13 @@
from genkit.aio.loop import create_loop, run_async
from genkit.blocks.formats import built_in_formats
from genkit.blocks.generate import define_generate_action
from genkit.core.action import Action
from genkit.core.environment import is_dev_environment
from genkit.core.reflection import make_reflection_server
from genkit.core.registry import ActionKind
from genkit.web.manager import find_free_port_sync

from ._plugin import Plugin
from ._plugin import Plugin, PluginV2, is_plugin_v2
from ._registry import GenkitRegistry
from ._server import ServerSpec, init_default_runtime

Expand Down Expand Up @@ -120,7 +123,9 @@ def _initialize_registry(self, model: str | None, plugins: list[Plugin] | None)
logger.warning('No plugins provided to Genkit')
else:
for plugin in plugins:
if isinstance(plugin, Plugin):
if is_plugin_v2(plugin):
self._initialize_v2_plugin(plugin)
elif isinstance(plugin, Plugin):
plugin.initialize(ai=self)

def resolver(kind, name, plugin=plugin):
Expand All @@ -135,7 +140,57 @@ def action_resolver(plugin=plugin):
self.registry.register_action_resolver(plugin.plugin_name(), resolver)
self.registry.register_list_actions_resolver(plugin.plugin_name(), action_resolver)
else:
raise ValueError(f'Invalid {plugin=} provided to Genkit: must be of type `genkit.ai.Plugin`')
raise ValueError(f'Invalid {plugin=} provided to Genkit: must be of type `genkit.ai.Plugin` or `genkit.ai.PluginV2`')

def _initialize_v2_plugin(self, plugin: PluginV2) -> None:
"""Register a v2 plugin by calling its methods and registering returned actions.

Steps:
1. Call plugin.init() to get resolved actions
2. Register each action with automatic namespacing
3. Set up lazy resolver for on-demand actions

Args:
plugin: V2 plugin instance to register.
"""
if inspect.iscoroutinefunction(plugin.init):
resolved_actions = asyncio.run(plugin.init())
else:
resolved_actions = plugin.init()

for action in resolved_actions:
self._register_action(action, plugin)

def resolver(kind: ActionKind, name: str) -> None:
"""Lazy resolver for v2 plugin.

Called when framework needs an action not returned from init().
"""
if inspect.iscoroutinefunction(plugin.resolve):
action = asyncio.run(plugin.resolve(kind, name))
else:
action = plugin.resolve(kind, name)

if action:
self._register_action(action, plugin)

self.registry.register_action_resolver(plugin.name, resolver)

def _register_action(self, action: Any, plugin: PluginV2) -> None:
"""Register a single action from a v2 plugin.

Responsibilities:
1. Add plugin namespace to action name (if not already present)
2. Register action in the registry

Args:
action: Action instance from the plugin.
plugin: The v2 plugin that created this action.
"""
# Register the pre-constructed action instance and let the registry apply
# namespacing for v2 plugins.
self.registry.register_action_instance(action, namespace=plugin.name)


def _initialize_server(self, reflection_server_spec: ServerSpec | None) -> None:
"""Initialize the server for the Genkit instance.
Expand Down
80 changes: 73 additions & 7 deletions py/packages/genkit/src/genkit/ai/_base_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

"""Asynchronous server gateway interface implementation for Genkit."""

import asyncio
import inspect
from collections.abc import Coroutine
from typing import Any, TypeVar

Expand All @@ -25,11 +27,13 @@

from genkit.aio.loop import run_loop
from genkit.blocks.formats import built_in_formats
from genkit.core.action import Action
from genkit.core.environment import is_dev_environment
from genkit.core.reflection import create_reflection_asgi_app
from genkit.core.registry import ActionKind
from genkit.web.manager import find_free_port_sync

from ._plugin import Plugin
from ._plugin import Plugin, PluginV2, is_plugin_v1, is_plugin_v2
from ._registry import GenkitRegistry
from ._runtime import RuntimeManager
from ._server import ServerSpec
Expand All @@ -44,14 +48,14 @@ class GenkitBase(GenkitRegistry):

def __init__(
self,
plugins: list[Plugin] | None = None,
plugins: list[Plugin | PluginV2] | None = None,
model: str | None = None,
reflection_server_spec: ServerSpec | None = None,
) -> None:
"""Initialize a new Genkit instance.

Args:
plugins: List of plugins to initialize.
plugins: List of plugins to initialize (v1 or v2).
model: Model name to use.
reflection_server_spec: Server spec for the reflection
server. If not provided in dev mode, a default will be used.
Expand All @@ -60,12 +64,15 @@ def __init__(
self._reflection_server_spec = reflection_server_spec
self._initialize_registry(model, plugins)

def _initialize_registry(self, model: str | None, plugins: list[Plugin] | None) -> None:
def _initialize_registry(self, model: str | None, plugins: list[Plugin | PluginV2] | None) -> None:
"""Initialize the registry for the Genkit instance.

Supports both v1 (Plugin) and v2 (PluginV2) plugins. Detection is done
at runtime via is_plugin_v2().

Args:
model: Model name to use.
plugins: List of plugins to initialize.
plugins: List of plugins to initialize (v1 or v2).

Raises:
ValueError: If an invalid plugin is provided.
Expand All @@ -81,15 +88,74 @@ def _initialize_registry(self, model: str | None, plugins: list[Plugin] | None)
logger.warning('No plugins provided to Genkit')
else:
for plugin in plugins:
if isinstance(plugin, Plugin):
if is_plugin_v2(plugin):
logger.debug(f'Registering v2 plugin: {plugin.name}')
self._register_v2_plugin(plugin)
elif is_plugin_v1(plugin):
logger.debug(f'Registering v1 plugin: {plugin.plugin_name()}')
plugin.initialize(ai=self)

def resolver(kind, name, plugin=plugin):
return plugin.resolve_action(self, kind, name)

self.registry.register_action_resolver(plugin.plugin_name(), resolver)
else:
raise ValueError(f'Invalid {plugin=} provided to Genkit: must be of type `genkit.ai.Plugin`')
raise ValueError(
f'Invalid {plugin=} provided to Genkit: '
f'must implement either Plugin or PluginV2 interface'
)

def _register_v2_plugin(self, plugin: PluginV2) -> None:
"""Register a v2 plugin by calling its methods and registering returned actions.

Steps:
1. Call plugin.init() to get resolved actions
2. Register each action with automatic namespacing
3. Set up lazy resolver for on-demand actions

Args:
plugin: V2 plugin instance to register.
"""
if inspect.iscoroutinefunction(plugin.init):
resolved_actions = asyncio.run(plugin.init())
else:
Comment on lines +119 to +121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

asyncio.run() will raise a RuntimeError if called from within a running event loop. Since this code is in an async-focused module, it's likely that Genkit() will be instantiated within an async function, which would cause the application to crash. This same issue exists for plugin.resolve on line 134.

The initialization logic for V2 plugins needs to be async-aware. The initialization process should be made asynchronous. A common pattern is to have a lightweight __init__ and a separate async def _async_init(self) method that performs the actual async setup. Other methods like generate would then need to ensure _async_init has been called. This avoids blocking the event loop and prevents runtime errors.

resolved_actions = plugin.init()

for action in resolved_actions:
self._register_action_v2(action, plugin)

def resolver(kind: ActionKind, name: str) -> None:
"""Lazy resolver for v2 plugin.

Called when framework needs an action not returned from init().
"""
# Check if resolve method is async
if inspect.iscoroutinefunction(plugin.resolve):
action = asyncio.run(plugin.resolve(kind, name))
else:
action = plugin.resolve(kind, name)

if action:
self._register_action_v2(action, plugin)

self.registry.register_action_resolver(plugin.name, resolver)

def _register_action_v2(self, action: Action, plugin: PluginV2) -> None:
"""Register a single action from a v2 plugin.

Responsibilities:
1. Add plugin namespace to action name (if not already present)
2. Register action in the registry

Args:
action: Action instance from the plugin.
plugin: The v2 plugin that created this action.
"""
# Register the pre-constructed action instance and let the registry apply
# namespacing for v2 plugins.
self.registry.register_action_instance(action, namespace=plugin.name)

logger.debug(f'Registered v2 action: {action.name}')

def run_main(self, coro: Coroutine[Any, Any, T]) -> T:
"""Run the user's main coroutine.
Expand Down
Loading
Loading