Workflow Orchestrator Pause and resume. #360
Unanswered
sathibabu-enmovil
asked this question in
Q&A
Replies: 0 comments
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Uh oh!
There was an error while loading. Please reload this page.
-
I am implementing an orchestrator workflow for my project, where multiple agents collaborate to complete tasks. In certain scenarios, some agents need to wait for external data—such as vendor input—before they can proceed further. In these cases, I need the ability to pause the workflow execution until the required data is available, and then resume the process seamlessly.
Could you please advise on best practices for implementing this pause-and-resume functionality, both in an asyncio-based workflow and when using Temporal workflows?
`import asyncio
import os
import logging
import json
import yaml
from mcp_agent.agents.agent import Agent
from mcp_agent.executor.workflow import Workflow, WorkflowResult, Signal
from mcp_agent.workflows.llm.augmented_llm import RequestParams
from augmented_llm_azure import AzureAugmentedLLM
from mcp_agent.workflows.orchestrator.orchestrator import Orchestrator
from mcp_agent.server.app_server import create_mcp_server_for_app
from mcp_agent.config import Settings
from mcp_agent.app import MCPApp
import warnings
from human_input import register_human_input_tools, console_human_input_callback
from agent_prompt import (PLANNER_PROMPT, REVIEW_AGENT_PROMPT,
RFQ_CREATOR_PROMPT, RFQ_ANALYZER_ANNOUNCER_PROMPT,
RFQ_NOTIFIER_PROMPT, BID_COLLECTOR_PROMPT, SYNTHESIZER_PROMPT)
warnings.filterwarnings("ignore")
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(name)
with open("mcp_agent.config.yaml") as f:
d = yaml.safe_load(f)
settings = Settings(**d)
mcp_app = MCPApp(name="basic_agent_server", description="Basic agent server example")
@mcp_app.workflow
class OrchestratorWorkflow(Workflow[str]):
"""
A simple workflow that demonstrates the basic structure of a Temporal workflow.
"""
async def main():
async with mcp_app.run() as agent_app:
logger.info(f"Creating MCP server for {agent_app.name}")
logger.info("Registered workflows:")
for workflow_id in agent_app.workflows:
logger.info(f" - {workflow_id}")
mcp_server = create_mcp_server_for_app(agent_app)
register_human_input_tools(mcp_server) # <-- Register your extra tool(s) here!
mcp_server.settings.host = "0.0.0.0"
mcp_server.settings.port = 8042
await mcp_server.run_sse_async()
# await mcp_server.run_streamable_http_async()
# await mcp_server.run_stdio_async()
if name == "main":
asyncio.run(main())`
Beta Was this translation helpful? Give feedback.
All reactions