Skip to content

Commit f74ee5d

Browse files
committed
feat(gateway): implement Inbox & Stepper pattern and Recipe Bot adapter
- Add asyncio queue inbox to GatewaySession for concurrency - Intercept and enqueue messages while session is executing - Resolve GatewaySession stepper background worker thread processing - Implement RecipeBotAdapter to bridge declarative recipes to conversational agents - Register support for --recipe flag in bot start commands - Refactor 'praisonai serve recipe' to boot WebSocketGateway instead
1 parent fe30293 commit f74ee5d

File tree

6 files changed

+168
-50
lines changed

6 files changed

+168
-50
lines changed

src/praisonai-agents/praisonaiagents/gateway/adapters/__init__.py

Whitespace-only changes.
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
"""
2+
Adapter to wrap a RecipeRuntime behind the standard Agent duck-typing for Gateway and BotOS.
3+
"""
4+
5+
import logging
6+
from typing import Optional, Any
7+
from praisonaiagents.streaming import StreamEventEmitter
8+
9+
logger = logging.getLogger(__name__)
10+
11+
class RecipeBotAdapter:
12+
"""Wraps RecipeRuntime to act like an Agent for gateway/bot compatibility."""
13+
14+
def __init__(self, recipe_name: str, **kwargs):
15+
self.recipe_name = recipe_name
16+
self.config = kwargs
17+
self._runtime = None
18+
self.stream_emitter = StreamEventEmitter()
19+
20+
@property
21+
def name(self) -> str:
22+
return f"recipe-{self.recipe_name}"
23+
24+
def _get_runtime(self):
25+
"""Lazy load and instantiate RecipeRuntime."""
26+
if self._runtime is None:
27+
try:
28+
from praisonai.recipe.core import RecipeRuntime
29+
self._runtime = RecipeRuntime(recipe_name=self.recipe_name, **self.config)
30+
except ImportError as e:
31+
raise ImportError(
32+
f"Failed to load PraisonAI recipe components: {e}. "
33+
"Make sure you have praisonai installed."
34+
) from e
35+
return self._runtime
36+
37+
def chat(self, message: str) -> str:
38+
"""Process the message via the recipe runtime."""
39+
try:
40+
runtime = self._get_runtime()
41+
42+
# Forward the user_input to the recipe's variables
43+
variables = runtime.context.variables if hasattr(runtime, 'context') else {}
44+
variables["user_input"] = message
45+
46+
# Currently we rely on string return matching standard Agent
47+
# Note: We don't have direct streaming from recipe to emitter yet; this binds them generically.
48+
result = runtime.run()
49+
50+
if hasattr(result, "final_output"):
51+
return str(result.final_output)
52+
return str(result)
53+
54+
except Exception as e:
55+
logger.error(f"Recipe execution failed: {e}")
56+
return f"Error executing recipe: {e}"

src/praisonai/praisonai/cli/commands/serve.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -478,6 +478,7 @@ def serve_scheduler(
478478
def serve_recipe(
479479
host: str = typer.Option("127.0.0.1", "--host", "-h", help="Host to bind to"),
480480
port: int = typer.Option(8765, "--port", "-p", help="Port to bind to"),
481+
recipe: Optional[str] = typer.Option(None, "--recipe", "-r", help="Recipe name to execute"),
481482
config: Optional[str] = typer.Option(None, "--config", "-c", help="Config file path"),
482483
reload: bool = typer.Option(False, "--reload", help="Enable auto-reload"),
483484
):
@@ -490,6 +491,8 @@ def serve_recipe(
490491
try:
491492
from ..features.serve import handle_serve_command
492493
args = ["recipe", "--host", host, "--port", str(port)]
494+
if recipe:
495+
args.extend(["--recipe", recipe])
493496
if config:
494497
args.extend(["--config", config])
495498
if reload:

src/praisonai/praisonai/cli/features/bots_cli.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,9 @@ class BotCapabilities:
5959
exec_enabled: bool = False
6060
auto_approve: bool = False
6161

62+
# Optional primitive override
63+
recipe: Optional[str] = None
64+
6265
# Model settings
6366
model: Optional[str] = None
6467
thinking: Optional[str] = None # off, minimal, low, medium, high
@@ -750,6 +753,11 @@ def _load_agent(
750753
"""
751754
from praisonaiagents import Agent
752755

756+
# Intercept Recipe Adapter
757+
if capabilities and capabilities.recipe:
758+
from praisonaiagents.gateway.adapters.recipe_adapter import RecipeBotAdapter
759+
return RecipeBotAdapter(recipe_name=capabilities.recipe)
760+
753761
# Build tools list from capabilities
754762
tools = self._build_tools(capabilities) if capabilities else []
755763

@@ -977,6 +985,7 @@ def _add_capability_args(parser) -> None:
977985
"""Add capability arguments to a parser."""
978986
# Agent configuration
979987
parser.add_argument("--agent", help="Path to agent YAML configuration file")
988+
parser.add_argument("--recipe", help="Recipe name to execute as the bot")
980989
parser.add_argument("--model", "-m", help="LLM model to use")
981990

982991
# Browser
@@ -1050,6 +1059,7 @@ def _build_capabilities_from_args(args) -> BotCapabilities:
10501059
stt_model=getattr(args, "stt_model", None),
10511060
session_id=getattr(args, "session_id", None),
10521061
user_id=getattr(args, "user_id", None),
1062+
recipe=getattr(args, "recipe", None),
10531063
)
10541064

10551065

src/praisonai/praisonai/cli/features/serve.py

Lines changed: 18 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -310,34 +310,33 @@ async def root():
310310
return app
311311

312312
def cmd_recipe(self, args: List[str]) -> int:
313-
"""Launch recipe runner server."""
313+
"""Launch recipe runner server via WebSocketGateway."""
314314
spec = {
315315
"host": {"default": self.DEFAULT_HOST},
316316
"port": {"default": self.DEFAULT_PORT, "type": "int"},
317-
"config": {"default": None},
317+
"recipe": {"default": None},
318318
"reload": {"flag": True, "default": False},
319-
"api_key": {"default": None},
320-
"preload": {"flag": True, "default": False},
321319
}
322320
parsed = self._parse_args(args, spec)
323321

324-
try:
325-
self._print_success(f"Starting recipe server on {parsed['host']}:{parsed['port']}")
326-
print(" Discovery: /__praisonai__/discovery")
327-
328-
from praisonai.recipe.serve import serve, load_config
322+
recipe_name = parsed.get("recipe")
323+
if not recipe_name:
324+
self._print_error("A recipe name is required (e.g. --recipe my-recipe)")
325+
return self.EXIT_VALIDATION_ERROR
329326

330-
# Load config
331-
config = load_config(parsed["config"]) if parsed["config"] else {}
332-
if parsed["api_key"]:
333-
config["api_key"] = parsed["api_key"]
327+
try:
328+
self._print_success(f"Starting recipe gateway on {parsed['host']}:{parsed['port']}")
329+
import asyncio
330+
from praisonai.gateway import WebSocketGateway
331+
from praisonaiagents.gateway.adapters.recipe_adapter import RecipeBotAdapter
334332

335-
serve(
336-
host=parsed["host"],
337-
port=parsed["port"],
338-
reload=parsed["reload"],
339-
config=config,
340-
)
333+
async def _run():
334+
gateway = WebSocketGateway(host=parsed['host'], port=parsed['port'])
335+
agent = RecipeBotAdapter(recipe_name=recipe_name)
336+
gateway.register_agent(agent)
337+
await gateway.start()
338+
339+
asyncio.run(_run())
341340

342341
except ImportError as e:
343342
self._print_error(f"Missing dependency: {e}")

src/praisonai/praisonai/gateway/server.py

Lines changed: 81 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class GatewaySession:
4444
_messages: List[GatewayMessage] = field(default_factory=list)
4545
_max_messages: int = 1000
4646

47+
# Stepper & Concurrency logic
48+
_inbox: asyncio.Queue = field(default_factory=asyncio.Queue)
49+
_is_executing: bool = False
50+
4751
@property
4852
def session_id(self) -> str:
4953
return self._session_id
@@ -89,6 +93,20 @@ def get_messages(self, limit: Optional[int] = None) -> List[GatewayMessage]:
8993
def close(self) -> None:
9094
self._is_active = False
9195

96+
async def queue_message(self, message: str) -> None:
97+
"""Queue a user message for execution after the current operation."""
98+
await self._inbox.put(message)
99+
100+
def get_next_message(self) -> Optional[str]:
101+
"""Fetch the next queued message if available without blocking."""
102+
if self._inbox.empty():
103+
return None
104+
return self._inbox.get_nowait()
105+
106+
def mark_executing(self, status: bool) -> None:
107+
"""Mark the session as currently executing an agent workflow."""
108+
self._is_executing = status
109+
92110

93111
class WebSocketGateway:
94112
"""WebSocket gateway server for multi-agent coordination.
@@ -557,40 +575,72 @@ async def _process_agent_message(
557575
return "Agent not available"
558576

559577
client_id = session.client_id
578+
content = message.content if isinstance(message.content, str) else str(message.content)
560579

561-
try:
562-
content = message.content if isinstance(message.content, str) else str(message.content)
563-
564-
# Wire streaming relay if agent has a stream_emitter
565-
relay_callback = None
566-
emitter = getattr(agent, 'stream_emitter', None)
567-
if emitter is not None and client_id:
568-
relay_callback = self._make_stream_relay(client_id, session)
569-
emitter.add_callback(relay_callback)
570-
571-
try:
572-
loop = asyncio.get_event_loop()
573-
response = await loop.run_in_executor(None, agent.chat, content)
574-
finally:
575-
# Always clean up the relay callback
576-
if relay_callback and emitter is not None:
577-
try:
578-
emitter.remove_callback(relay_callback)
579-
except (ValueError, AttributeError):
580-
pass
581-
582-
response_message = GatewayMessage(
583-
content=response,
584-
sender_id=session.agent_id,
585-
session_id=session.session_id,
586-
reply_to=message.message_id,
580+
# Inbox & Stepper logic
581+
if session._is_executing:
582+
# Send an ephemeral status event
583+
await self._send_to_client(
584+
client_id,
585+
{
586+
"type": "status",
587+
"source": session.agent_id,
588+
"message": "Thinking... (I've added your new message to the queue to process next)."
589+
}
587590
)
588-
session.add_message(response_message)
591+
await session.queue_message(content)
592+
return "Message queued."
589593

590-
return response
591-
except Exception as e:
592-
logger.error(f"Agent error: {e}")
593-
return f"Error: {str(e)}"
594+
session.mark_executing(True)
595+
await session.queue_message(content)
596+
597+
# Start background task to process the queue
598+
asyncio.create_task(self._run_session_queue(session, agent, client_id))
599+
return "Started processing."
600+
601+
async def _run_session_queue(self, session: GatewaySession, agent: Any, client_id: str) -> None:
602+
"""Background task loop that constantly pulls from `_inbox` and executes the agent task."""
603+
try:
604+
while True:
605+
content = session.get_next_message()
606+
if not content:
607+
break # Queue is empty, exit loop
608+
609+
# Wire streaming relay if agent has a stream_emitter
610+
relay_callback = None
611+
emitter = getattr(agent, 'stream_emitter', None)
612+
if emitter is not None and client_id:
613+
relay_callback = self._make_stream_relay(client_id, session)
614+
emitter.add_callback(relay_callback)
615+
616+
try:
617+
loop = asyncio.get_event_loop()
618+
response = await loop.run_in_executor(None, agent.chat, content)
619+
except Exception as e:
620+
logger.error(f"Agent error in queue processor: {e}")
621+
response = f"Error: {str(e)}"
622+
finally:
623+
# Always clean up the relay callback
624+
if relay_callback and emitter is not None:
625+
try:
626+
emitter.remove_callback(relay_callback)
627+
except (ValueError, AttributeError):
628+
pass
629+
630+
response_message = GatewayMessage(
631+
content=response,
632+
sender_id=session.agent_id,
633+
session_id=session.session_id,
634+
)
635+
session.add_message(response_message)
636+
637+
await self._send_to_client(client_id, {
638+
"type": "response",
639+
"content": response,
640+
"session_id": session.session_id,
641+
})
642+
finally:
643+
session.mark_executing(False)
594644

595645
def _make_stream_relay(
596646
self, client_id: str, session: "GatewaySession"

0 commit comments

Comments
 (0)