1+ import json
12import logging
23import os
34import traceback
4- import json
5- from dotenv import load_dotenv
6- from typing import AsyncIterable , Any , Literal
7- from pydantic import BaseModel
5+
6+ from collections .abc import AsyncIterable
7+ from typing import Any , Literal
88
99from autogen import AssistantAgent , LLMConfig
1010from autogen .mcp import create_toolkit
11-
11+ from dotenv import load_dotenv
1212from mcp import ClientSession , StdioServerParameters
1313from mcp .client .stdio import stdio_client
14+ from pydantic import BaseModel
15+
1416
1517logger = logging .getLogger (__name__ )
1618
19+
1720class ResponseModel (BaseModel ):
1821 """Response model for the YouTube MCP agent."""
22+
1923 text_reply : str
2024 closed_captions : str | None
2125 status : Literal ["TERMINATE" , "" ]
22-
26+
2327 def format (self ) -> str :
2428 """Format the response as a string."""
2529 if self .closed_captions is None :
2630 return self .text_reply
27- else :
28- return f"{ self .text_reply } \n \n Closed Captions:\n { self .closed_captions } "
31+ return f"{ self .text_reply } \n \n Closed Captions:\n { self .closed_captions } "
2932
3033
3134def get_api_key () -> str :
3235 """Helper method to handle API Key."""
3336 load_dotenv ()
3437 return os .getenv ("OPENAI_API_KEY" )
3538
39+
3640class YoutubeMCPAgent :
37- """Agent to access a Youtube MCP Server to download closed captions"""
41+ """Agent to access a Youtube MCP Server to download closed captions. """
3842
3943 SUPPORTED_CONTENT_TYPES = ["text" , "text/plain" ]
4044
@@ -53,20 +57,26 @@ def __init__(self):
5357 name = "YoutubeMCPAgent" ,
5458 llm_config = llm_config ,
5559 system_message = (
56- "You are a specialized assistant for processing YouTube videos. "
57- "You can use MCP tools to fetch captions and process YouTube content. "
58- "You can provide captions, summarize videos, or analyze content from YouTube. "
59- "If the user asks about anything not related to YouTube videos or doesn't provide a YouTube URL, "
60- "politely state that you can only help with tasks related to YouTube videos.\n \n "
61- "IMPORTANT: Always respond using the ResponseModel format with these fields:\n "
60+ "You are a specialized assistant for processing YouTube "
61+ "videos. You can use MCP tools to fetch captions and "
62+ "process YouTube content. You can provide captions, "
63+ "summarize videos, or analyze content from YouTube. If "
64+ "the user asks about anything not related to YouTube "
65+ "videos or doesn't provide a YouTube URL, politely state "
66+ "that you can only help with tasks related to YouTube "
67+ "videos.\n \n "
68+ "IMPORTANT: Always respond using the ResponseModel format "
69+ "with these fields:\n "
6270 "- text_reply: Your main response text\n "
63- "- closed_captions: YouTube captions if available, null if not relevant\n "
71+ "- closed_captions: YouTube captions if available, null if "
72+ "not relevant\n "
6473 "- status: Always use 'TERMINATE' for all responses \n \n "
6574 "Example response:\n "
6675 "{\n "
67- " \" text_reply\" : \" Here's the information you requested...\" ,\n "
68- " \" closed_captions\" : null,\n "
69- " \" status\" : \" TERMINATE\" \n "
76+ ' "text_reply": "Here\' s the information you '
77+ 'requested...",\n '
78+ ' "closed_captions": null,\n '
79+ ' "status": "TERMINATE"\n '
7080 "}"
7181 ),
7282 )
@@ -83,31 +93,36 @@ def get_agent_response(self, response: str) -> dict[str, Any]:
8393 # Try to parse the response as a ResponseModel JSON
8494 response_dict = json .loads (response )
8595 model = ResponseModel (** response_dict )
86-
96+
8797 # All final responses should be treated as complete
8898 return {
8999 "is_task_complete" : True ,
90100 "require_user_input" : False ,
91- "content" : model .format ()
101+ "content" : model .format (),
92102 }
93103 except Exception as e :
94104 # Log but continue with best-effort fallback
95105 logger .error (f"Error parsing response: { e } , response: { response } " )
96-
106+
97107 # Default to treating it as a completed response
98108 return {
99- "is_task_complete" : True ,
109+ "is_task_complete" : True ,
100110 "require_user_input" : False ,
101- "content" : response
111+ "content" : response ,
102112 }
103113
104- async def stream (self , query : str , sessionId : str ) -> AsyncIterable [dict [str , Any ]]:
114+ async def stream (
115+ self , query : str , session_id : str
116+ ) -> AsyncIterable [dict [str , Any ]]:
105117 """Stream updates from the MCP agent."""
106118 if not self .initialized :
107119 yield {
108120 "is_task_complete" : False ,
109121 "require_user_input" : True ,
110- "content" : "Agent initialization failed. Please check the dependencies and logs."
122+ "content" : (
123+ "Agent initialization failed. Please check the "
124+ "dependencies and logs."
125+ ),
111126 }
112127 return
113128
@@ -116,19 +131,21 @@ async def stream(self, query: str, sessionId: str) -> AsyncIterable[dict[str, An
116131 yield {
117132 "is_task_complete" : False ,
118133 "require_user_input" : False ,
119- "content" : "Processing request..."
134+ "content" : "Processing request..." ,
120135 }
121136
122137 logger .info (f"Processing query: { query [:50 ]} ..." )
123138
124- try :
139+ try :
125140 # Create stdio server parameters for mcp-youtube
126141 server_params = StdioServerParameters (
127142 command = "mcp-youtube" ,
128143 )
129144
130145 # Connect to the MCP server using stdio client
131- async with stdio_client (server_params ) as (read , write ), ClientSession (read , write ) as session :
146+ async with stdio_client (server_params ) as (read , write ), (
147+ ClientSession (read , write )
148+ ) as session :
132149 # Initialize the connection
133150 await session .initialize ()
134151
@@ -147,35 +164,42 @@ async def stream(self, query: str, sessionId: str) -> AsyncIterable[dict[str, An
147164 try :
148165 # Process the result
149166 await result .process ()
150-
167+
151168 # Get the summary which contains the output
152169 response = await result .summary
153170
154171 except Exception as extraction_error :
155- logger .error (f"Error extracting response: { extraction_error } " )
172+ logger .error (
173+ f"Error extracting response: { extraction_error } "
174+ )
156175 traceback .print_exc ()
157- response = f"Error processing request: { str (extraction_error )} "
176+ response = (
177+ f"Error processing request: { extraction_error !s} "
178+ )
158179
159180 # Final response
160181 yield self .get_agent_response (response )
161-
182+
162183 except Exception as e :
163- logger .error (f"Error during processing: { traceback .format_exc ()} " )
184+ logger .error (
185+ f"Error during processing: { traceback .format_exc ()} "
186+ )
164187 yield {
165188 "is_task_complete" : False ,
166189 "require_user_input" : True ,
167- "content" : f"Error processing request: { str ( e ) } "
190+ "content" : f"Error processing request: { e !s } " ,
168191 }
169192 except Exception as e :
170193 logger .error (f"Error in streaming agent: { traceback .format_exc ()} " )
171194 yield {
172195 "is_task_complete" : False ,
173196 "require_user_input" : True ,
174- "content" : f"Error processing request: { str ( e ) } "
197+ "content" : f"Error processing request: { e !s } " ,
175198 }
176199
177- def invoke (self , query : str , sessionId : str ) -> dict [str , Any ]:
200+ def invoke (self , query : str , session_id : str ) -> dict [str , Any ]:
178201 """Synchronous invocation of the MCP agent."""
179202 raise NotImplementedError (
180- "Synchronous invocation is not supported by this agent. Use the streaming endpoint (tasks/sendSubscribe) instead."
203+ "Synchronous invocation is not supported by this agent. "
204+ "Use the streaming endpoint (tasks/sendSubscribe) instead."
181205 )
0 commit comments