Skip to content

Commit 7202591

Browse files
authored
feat(kagent-adk): Use new mcp session for a2a requests (#1220)
Updates `kagent-adk` to allow for a new agent to be used for each runner instance during an A2A request. This allows each a2a request to use a new mcp session instead of reusing one single mcp session during the entire lifetime of the agent. --------- Signed-off-by: JM Huibonhoa <[email protected]>
1 parent b5da06d commit 7202591

File tree

3 files changed

+49
-21
lines changed

3 files changed

+49
-21
lines changed

python/packages/kagent-adk/src/kagent/adk/_a2a.py

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -53,15 +53,26 @@ def thread_dump(request: Request) -> PlainTextResponse:
5353
class KAgentApp:
5454
def __init__(
5555
self,
56-
root_agent: BaseAgent,
56+
root_agent_factory: Callable[[], BaseAgent],
5757
agent_card: AgentCard,
5858
kagent_url: str,
5959
app_name: str,
6060
lifespan: Optional[Callable[[Any], Any]] = None,
6161
plugins: List[BasePlugin] = None,
6262
stream: bool = False,
6363
):
64-
self.root_agent = root_agent
64+
"""Initialize the KAgent application.
65+
66+
Args:
67+
root_agent_factory: Root agent factory function that returns a new agent instance
68+
agent_card: Agent card configuration for A2A protocol
69+
kagent_url: URL of the KAgent backend server
70+
app_name: Application name for identification
71+
lifespan: Optional lifespan function
72+
plugins: Optional list of plugins
73+
stream: Whether to stream the response
74+
"""
75+
self.root_agent_factory = root_agent_factory
6576
self.kagent_url = kagent_url
6677
self.app_name = app_name
6778
self.agent_card = agent_card
@@ -81,9 +92,10 @@ def build(self, local=False) -> FastAPI:
8192
)
8293
session_service = KAgentSessionService(http_client)
8394

84-
adk_app = App(name=self.app_name, root_agent=self.root_agent, plugins=self.plugins)
85-
8695
def create_runner() -> Runner:
96+
root_agent = self.root_agent_factory()
97+
adk_app = App(name=self.app_name, root_agent=root_agent, plugins=self.plugins)
98+
8799
return Runner(
88100
app=adk_app,
89101
session_service=session_service,
@@ -138,12 +150,8 @@ async def test(self, task: str):
138150
session_id=SESSION_ID,
139151
user_id=USER_ID,
140152
)
141-
if isinstance(self.root_agent, Callable):
142-
agent_factory = self.root_agent
143-
root_agent = agent_factory()
144-
else:
145-
root_agent = self.root_agent
146153

154+
root_agent = self.root_agent_factory()
147155
runner = Runner(
148156
agent=root_agent,
149157
app_name=self.app_name,

python/packages/kagent-adk/src/kagent/adk/_agent_executor.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,11 @@ async def execute(
185185
logger.error("Failed to publish failure event: %s", enqueue_error, exc_info=True)
186186
finally:
187187
clear_kagent_span_attributes(context_token)
188+
# close the runner which cleans up the mcptoolsets
189+
# since the runner is created for each a2a request
190+
# and the mcptoolsets are not shared between requests
191+
# this is necessary to gracefully handle mcp toolset connections
192+
await runner.close()
188193

189194
async def _handle_request(
190195
self,

python/packages/kagent-adk/src/kagent/adk/cli.py

Lines changed: 27 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,16 @@ def static(
6464
sts_integration = create_sts_integration()
6565
if sts_integration:
6666
plugins = [sts_integration]
67-
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
68-
maybe_add_skills(root_agent)
67+
68+
def root_agent_factory() -> BaseAgent:
69+
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
70+
71+
maybe_add_skills(root_agent)
72+
73+
return root_agent
6974

7075
kagent_app = KAgentApp(
71-
root_agent,
76+
root_agent_factory,
7277
agent_card,
7378
app_cfg.url,
7479
app_cfg.app_name,
@@ -136,16 +141,22 @@ def run(
136141
):
137142
app_cfg = KAgentConfig()
138143

139-
agent_loader = AgentLoader(agents_dir=working_dir)
140-
root_agent = agent_loader.load_agent(name)
141-
142144
plugins = None
143145
sts_integration = create_sts_integration()
144146
if sts_integration:
145147
plugins = [sts_integration]
146-
add_to_agent(sts_integration, root_agent)
147148

148-
maybe_add_skills(root_agent)
149+
agent_loader = AgentLoader(agents_dir=working_dir)
150+
151+
def root_agent_factory() -> BaseAgent:
152+
root_agent = agent_loader.load_agent(name)
153+
154+
if sts_integration:
155+
add_to_agent(sts_integration, root_agent)
156+
157+
maybe_add_skills(root_agent)
158+
159+
return root_agent
149160

150161
# Load agent config to get stream setting
151162
agent_config = None
@@ -171,7 +182,7 @@ def run(
171182
logger.exception(f"Failed to load agent module '{name}' for lifespan")
172183

173184
kagent_app = KAgentApp(
174-
root_agent,
185+
root_agent_factory,
175186
agent_card,
176187
app_cfg.url,
177188
app_cfg.app_name,
@@ -202,9 +213,13 @@ async def test_agent(agent_config: AgentConfig, agent_card: AgentCard, task: str
202213
sts_integration = create_sts_integration()
203214
if sts_integration:
204215
plugins = [sts_integration]
205-
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
206-
maybe_add_skills(root_agent)
207-
app = KAgentApp(root_agent, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins)
216+
217+
def root_agent_factory() -> BaseAgent:
218+
root_agent = agent_config.to_agent(app_cfg.name, sts_integration)
219+
maybe_add_skills(root_agent)
220+
return root_agent
221+
222+
app = KAgentApp(root_agent_factory, agent_card, app_cfg.url, app_cfg.app_name, plugins=plugins)
208223
await app.test(task)
209224

210225

0 commit comments

Comments
 (0)