Skip to content

Commit 3c847b6

Browse files
committed
Addressing feedback
1 parent 65315a8 commit 3c847b6

File tree

2 files changed

+34
-73
lines changed

2 files changed

+34
-73
lines changed

temporalio/contrib/openai_agents/_mcp.py

Lines changed: 32 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,6 @@
2424
logger = logging.getLogger(__name__)
2525

2626

27-
class TemporalMCPServer(abc.ABC):
28-
"""A representation of an MCP server to be registered with the OpenAIAgentsPlugin."""
29-
30-
@property
31-
@abc.abstractmethod
32-
def name(self) -> str:
33-
"""Get the server name."""
34-
raise NotImplementedError()
35-
36-
3727
class _StatelessTemporalMCPServerReference(MCPServer):
3828
def __init__(self, server: str, config: Optional[ActivityConfig] = None):
3929
self._name = server + "-stateless"
@@ -93,16 +83,15 @@ async def get_prompt(
9383
)
9484

9585

96-
class StatelessTemporalMCPServer(TemporalMCPServer):
86+
class StatelessTemporalMCPServer:
9787
"""A stateless MCP server implementation for Temporal workflows.
9888
9989
This class wraps an MCP server to make it stateless by executing each MCP operation
10090
as a separate Temporal activity. Each operation (list_tools, call_tool, etc.) will
10191
connect to the underlying server, execute the operation, and then clean up the connection.
10292
103-
This approach is suitable for simple use cases where connection overhead is acceptable
104-
and you don't need to maintain state between operations. It is encouraged when possible as it provides
105-
a better set of durability guarantees that the stateful version.
93+
This approach will not maintain state across calls. If the desired MCPServer needs persistent state in order to
94+
function, this cannot be used.
10695
"""
10796

10897
def __init__(self, server: MCPServer):
@@ -121,54 +110,41 @@ def name(self) -> str:
121110
return self._name
122111

123112
def _get_activities(self) -> Sequence[Callable]:
124-
"""Get the Temporal activities for this MCP server.
125-
126-
Creates and returns the Temporal activity functions that handle MCP operations.
127-
Each activity manages its own connection lifecycle (connect -> operate -> cleanup).
128-
129-
Returns:
130-
A sequence of Temporal activity functions.
131-
132-
Raises:
133-
ValueError: If no MCP server instance was provided during initialization.
134-
"""
135-
server = self._server
136-
137113
@activity.defn(name=self.name + "-list-tools")
138114
async def list_tools() -> list[MCPTool]:
139115
try:
140-
await server.connect()
141-
return await server.list_tools()
116+
await self._server.connect()
117+
return await self._server.list_tools()
142118
finally:
143-
await server.cleanup()
119+
await self._server.cleanup()
144120

145121
@activity.defn(name=self.name + "-call-tool")
146122
async def call_tool(
147123
tool_name: str, arguments: Optional[dict[str, Any]]
148124
) -> CallToolResult:
149125
try:
150-
await server.connect()
151-
return await server.call_tool(tool_name, arguments)
126+
await self._server.connect()
127+
return await self._server.call_tool(tool_name, arguments)
152128
finally:
153-
await server.cleanup()
129+
await self._server.cleanup()
154130

155131
@activity.defn(name=self.name + "-list-prompts")
156132
async def list_prompts() -> ListPromptsResult:
157133
try:
158-
await server.connect()
159-
return await server.list_prompts()
134+
await self._server.connect()
135+
return await self._server.list_prompts()
160136
finally:
161-
await server.cleanup()
137+
await self._server.cleanup()
162138

163139
@activity.defn(name=self.name + "-get-prompt")
164140
async def get_prompt(
165141
name: str, arguments: Optional[dict[str, Any]]
166142
) -> GetPromptResult:
167143
try:
168-
await server.connect()
169-
return await server.get_prompt(name, arguments)
144+
await self._server.connect()
145+
return await self._server.get_prompt(name, arguments)
170146
finally:
171-
await server.cleanup()
147+
await self._server.cleanup()
172148

173149
return list_tools, call_tool, list_prompts, get_prompt
174150

@@ -188,14 +164,16 @@ async def wrapper(*args, **kwargs):
188164
== TIMEOUT_TYPE_SCHEDULE_TO_START
189165
):
190166
raise ApplicationError(
191-
"MCP Stateful Server Worker failed to schedule activity."
167+
"MCP Stateful Server Worker failed to schedule activity.",
168+
type="DedicatedWorkerFailure",
192169
) from e
193170
if (
194171
cause.timeout_failure_info.timeout_type
195172
== TIMEOUT_TYPE_HEARTBEAT
196173
):
197174
raise ApplicationError(
198-
"MCP Stateful Server Worker failed to heartbeat."
175+
"MCP Stateful Server Worker failed to heartbeat.",
176+
type="DedicatedWorkerFailure",
199177
) from e
200178
raise e
201179

@@ -288,19 +266,20 @@ async def get_prompt(
288266
)
289267

290268

291-
class StatefulTemporalMCPServer(TemporalMCPServer):
269+
class StatefulTemporalMCPServer:
292270
"""A stateful MCP server implementation for Temporal workflows.
293271
294272
This class wraps an MCP server to maintain a persistent connection throughout
295273
the workflow execution. It creates a dedicated worker that stays connected to
296274
the MCP server and processes operations on a dedicated task queue.
297275
298-
This approach is more efficient for workflows that make multiple MCP calls,
299-
as it avoids connection overhead, but requires more resources to maintain
300-
the persistent connection and worker.
276+
This approach will allow the MCPServer to maintain state across calls if needed, but the caller
277+
will have to handle cases where the dedicated worker fails, as Temporal is unable to seamlessly
278+
recreate any lost state in that case. It is discouraged to use this approach unless necessary.
301279
302-
The caller will have to handle cases where the dedicated worker fails, as Temporal is
303-
unable to seamlessly recreate any lost state in that case.
280+
Handling dedicated worker failure will entail catching ApplicationError with type "DedicatedWorkerFailure".
281+
Depending on the usage pattern, the caller will then have to either restart from the point at which the Stateful
282+
server was needed or handle continuing from that loss of state in some other way.
304283
"""
305284

306285
def __init__(
@@ -311,8 +290,6 @@ def __init__(
311290
312291
Args:
313292
server: Either an MCPServer instance or a string name for the server.
314-
connect_config: Optional activity configuration for the connection activity.
315-
Defaults to 1-hour start-to-close timeout.
316293
"""
317294
self._server = server
318295
self._name = self._server.name + "-stateful"
@@ -325,39 +302,25 @@ def name(self) -> str:
325302
return self._name
326303

327304
def _get_activities(self) -> Sequence[Callable]:
328-
"""Get the Temporal activities for this stateful MCP server.
329-
330-
Creates and returns the Temporal activity functions that handle MCP operations
331-
and connection management. This includes a long-running connect activity that
332-
maintains the MCP connection and runs a dedicated worker.
333-
334-
Returns:
335-
A sequence containing the connect activity function.
336-
337-
Raises:
338-
ValueError: If no MCP server instance was provided during initialization.
339-
"""
340-
server = self._server
341-
342305
@activity.defn(name=self.name + "-list-tools")
343306
async def list_tools() -> list[MCPTool]:
344-
return await server.list_tools()
307+
return await self._server.list_tools()
345308

346309
@activity.defn(name=self.name + "-call-tool")
347310
async def call_tool(
348311
tool_name: str, arguments: Optional[dict[str, Any]]
349312
) -> CallToolResult:
350-
return await server.call_tool(tool_name, arguments)
313+
return await self._server.call_tool(tool_name, arguments)
351314

352315
@activity.defn(name=self.name + "-list-prompts")
353316
async def list_prompts() -> ListPromptsResult:
354-
return await server.list_prompts()
317+
return await self._server.list_prompts()
355318

356319
@activity.defn(name=self.name + "-get-prompt")
357320
async def get_prompt(
358321
name: str, arguments: Optional[dict[str, Any]]
359322
) -> GetPromptResult:
360-
return await server.get_prompt(name, arguments)
323+
return await self._server.get_prompt(name, arguments)
361324

362325
async def heartbeat_every(delay: float, *details: Any) -> None:
363326
"""Heartbeat every so often while not cancelled"""
@@ -369,7 +332,7 @@ async def heartbeat_every(delay: float, *details: Any) -> None:
369332
async def connect() -> None:
370333
heartbeat_task = asyncio.create_task(heartbeat_every(30))
371334
try:
372-
await server.connect()
335+
await self._server.connect()
373336

374337
worker = Worker(
375338
activity.client(),
@@ -380,7 +343,7 @@ async def connect() -> None:
380343

381344
await worker.run()
382345
finally:
383-
await server.cleanup()
346+
await self._server.cleanup()
384347
heartbeat_task.cancel()
385348
try:
386349
await heartbeat_task

temporalio/contrib/openai_agents/_openai_runner.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,8 @@ async def run(
6969
_StatefulTemporalMCPServerReference,
7070
),
7171
):
72-
warnings.warn(
73-
"Unknown mcp_server type {} may not work durably.".format(
74-
type(s)
75-
)
72+
raise ValueError(
73+
f"Unknown mcp_server type {type(s)} may not work durably."
7674
)
7775

7876
# workaround for https://github.com/pydantic/pydantic/issues/9541

0 commit comments

Comments
 (0)