diff --git a/mxtoai/_logging.py b/mxtoai/_logging.py
index 1c5f0a3..8c1a5aa 100644
--- a/mxtoai/_logging.py
+++ b/mxtoai/_logging.py
@@ -83,7 +83,18 @@ def get_logger(source: str) -> Any:
def span(
msg_template: str, name: str | None = None, tags: Sequence[str] | None = None, **msg_template_kwargs: Any
) -> Any:
- """Context manager for creating spans in logging."""
+ """
+ Context manager for creating spans in logging.
+
+ Args:
+ msg_template (str): The message template for the span.
+ name (str | None): Optional name for the span.
+ tags (Sequence[str] | None): Optional tags for the span.
+ **msg_template_kwargs: Additional keyword arguments for the message template.
+
+ Yields:
+ Any: The span context manager or a dummy context manager.
+ """
# Check if LOGFIRE_TOKEN environment variable is defined
if os.getenv("LOGFIRE_TOKEN"):
if tags:
diff --git a/mxtoai/agents/email_agent.py b/mxtoai/agents/email_agent.py
index 0730ca4..4e9d6ac 100644
--- a/mxtoai/agents/email_agent.py
+++ b/mxtoai/agents/email_agent.py
@@ -119,11 +119,12 @@ def __init__(
logger.info("Email agent initialized successfully")
def _init_agent(self):
- """Initialize the ToolCallingAgent with Azure OpenAI."""
- # Initialize the model with routing capabilities
- self.routed_model = RoutedLiteLLMModel() # Store as instance variable to update handle later
+ """
+ Initialize the ToolCallingAgent with Azure OpenAI.
+ """
+ # Initialize the routed model with the default model group
+ self.routed_model = RoutedLiteLLMModel()
- # Initialize the agent
self.agent = ToolCallingAgent(
model=self.routed_model,
tools=self.available_tools,
@@ -138,7 +139,12 @@ def _init_agent(self):
logger.debug("Agent initialized with routed model configuration")
def _initialize_search_tools(self) -> SearchWithFallbackTool:
- """Initializes and configures the search tools, returning the SearchWithFallbackTool."""
+ """
+ Initializes and configures the search tools, returning the SearchWithFallbackTool.
+
+ Returns:
+ SearchWithFallbackTool: The configured search tool with Bing and DuckDuckGo as primary engines and Google as fallback.
+ """
bing_search_tool = WebSearchTool(engine="bing", max_results=5)
logger.debug("Initialized WebSearchTool with Bing engine.")
@@ -167,6 +173,25 @@ def _initialize_search_tools(self) -> SearchWithFallbackTool:
logger.info(f"Initialized SearchWithFallbackTool. Primary engines: {primary_names}, Fallback: {fallback_name}")
return search_tool
+ def _get_required_actions(self, mode: str) -> List[str]:
+ """
+ Get list of required actions based on mode.
+
+ Args:
+ mode: The mode of operation (e.g., "summary", "reply", "research", "full")
+
+ Returns:
+ List[str]: List of actions to be performed by the agent
+ """
+ actions = []
+ if mode in ["summary", "full"]:
+ actions.append("Generate summary")
+ if mode in ["reply", "full"]:
+ actions.append("Generate reply")
+ if mode in ["research", "full"]:
+ actions.append("Conduct research")
+ return actions
+
def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]:
"""
Initialize Google search tool with either SerpAPI or Serper provider.
@@ -195,7 +220,15 @@ def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]:
return None
def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional[DeepResearchTool]:
- """Initializes the DeepResearchTool if API key is available."""
+ """
+ Initializes the DeepResearchTool if API key is available.
+
+ Args:
+ enable_deep_research: Flag to enable deep research functionality
+
+ Returns:
+ Optional[DeepResearchTool]: Initialized DeepResearchTool instance or None if API key is not found
+ """
research_tool: Optional[DeepResearchTool] = None
if os.getenv("JINA_API_KEY"):
research_tool = DeepResearchTool()
@@ -210,7 +243,18 @@ def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional
return research_tool
def _create_task(self, email_request: EmailRequest, email_instructions: ProcessingInstructions) -> str:
- """Create a task description for the agent based on email handle instructions."""
+ """
+ Create a task description for the agent based on email handle instructions.
+
+ Args:
+ email_request: EmailRequest instance containing email data
+ email_instructions: EmailHandleInstructions object containing processing configuration
+
+ Returns:
+ str: The task description for the agent
+ """
+
+ # process attachments if specified
attachments = self._format_attachments(email_request.attachments) \
if email_instructions.process_attachments and email_request.attachments else []
@@ -224,14 +268,31 @@ def _create_task(self, email_request: EmailRequest, email_instructions: Processi
)
def _format_attachments(self, attachments: List[EmailAttachment]) -> List[str]:
- """Format attachment details for inclusion in the task."""
+ """
+ Format attachment details for inclusion in the task.
+
+ Args:
+ attachments: List of EmailAttachment objects
+
+ Returns:
+ List[str]: Formatted attachment details
+ """
return [
f"- {att.filename} (Type: {att.contentType}, Size: {att.size} bytes)\n"
f' EXACT FILE PATH: "{att.path}"'
for att in attachments
]
def _create_email_context(self, email_request: EmailRequest, attachment_details=None) -> str:
- """Generate context information from the email request."""
+ """
+ Generate context information from the email request.
+
+ Args:
+ email_request: EmailRequest instance containing email data
+ attachment_details: List of formatted attachment details
+
+ Returns:
+ str: The context information for the agent
+ """
recipients = ", ".join(email_request.recipients) if email_request.recipients else "N/A"
attachments_info = f"Available Attachments:\n{chr(10).join(attachment_details)}" if attachment_details else "No attachments provided."
@@ -248,7 +309,15 @@ def _create_email_context(self, email_request: EmailRequest, attachment_details=
"""
def _create_attachment_task(self, attachment_details: List[str]) -> str:
- """Return instructions for processing attachments, if any."""
+ """
+ Return instructions for processing attachments, if any.
+
+ Args:
+ attachment_details: List of formatted attachment details
+
+ Returns:
+ str: Instructions for processing attachments
+ """
return f"Process these attachments:\n{chr(10).join(attachment_details)}" if attachment_details else ""
def _create_task_template(
@@ -260,7 +329,22 @@ def _create_task_template(
deep_research_mandatory: bool = False,
output_template: str = "",
) -> str:
- """Combine all task components into the final task description."""
+ """
+ Combine all task components into the final task description.
+
+ Args:
+ handle: The email handle being processed.
+ email_context: The context information extracted from the email.
+ handle_specific_template: Any specific template for the handle.
+ attachment_task: Instructions for processing attachments.
+ deep_research_mandatory: Flag indicating if deep research is mandatory.
+ output_template: The output template to use.
+
+ Returns:
+ str: The complete task description for the agent.
+ """
+
+ # Merge the task components into a single string by listing the sections
sections = [
f"Process this email according to the '{handle}' instruction type.\n",
email_context,
@@ -330,7 +414,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
)
tool_name = None # Reset tool_name if extraction failed
- # Revised Output Extraction
action_out = getattr(step, "action_output", None)
obs_out = getattr(step, "observations", None)
@@ -422,7 +505,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
logger.debug(f"[Memory Step {i+1}] Matched tool: deep_research")
try:
if isinstance(tool_output, dict):
- # Store the primary findings content
research_findings_content = tool_output.get("findings", "")
# Store metadata separately
research_metadata = {
@@ -533,7 +615,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
# --- Format the selected content ---
if email_body_content:
- # Remove signature remnants before formatting
signature_markers = [
"Best regards,\nMXtoAI Assistant",
"Best regards,",
@@ -552,7 +633,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
).strip()
logger.debug("Removed potential signature remnants from email body content.")
- # Format using ReportFormatter
result["email_content"]["text"] = self.report_formatter.format_report(
email_body_content, format_type="text", include_signature=True
)
@@ -637,13 +717,10 @@ def process_email(
"""
try:
- # Update the model's current handle
+ # create task
self.routed_model.current_handle = email_instructions
-
- # Create task with specific instructions
task = self._create_task(email_request, email_instructions)
- # Run the agent
try:
logger.info("Starting agent execution...")
final_answer_obj = self.agent.run(task)
@@ -670,7 +747,7 @@ def process_email(
if not processed_result.get("email_content") or not processed_result["email_content"].get("text"):
msg = "No reply text was generated by _process_agent_result"
logger.error(msg)
- # Populate errors within the existing structure if possible
+
if "metadata" not in processed_result:
processed_result["metadata"] = {}
if "errors" not in processed_result["metadata"]:
@@ -680,7 +757,7 @@ def process_email(
processed_result["metadata"]["email_sent"] = {}
processed_result["metadata"]["email_sent"]["status"] = "error"
processed_result["metadata"]["email_sent"]["error"] = msg
- # Return the partially processed result with error flags
+
return processed_result
logger.info(f"Email processed successfully with handle: {email_instructions.handle}")
diff --git a/mxtoai/api.py b/mxtoai/api.py
index e058bd5..f0503f3 100644
--- a/mxtoai/api.py
+++ b/mxtoai/api.py
@@ -40,8 +40,16 @@
# Function to cleanup attachment files and directory
-def cleanup_attachments(directory_path):
- """Delete attachment directory and all its contents"""
+def cleanup_attachments(directory_path: str) -> bool:
+ """
+ Delete attachment directory and all its contents
+
+ Args:
+ directory_path (str): Path to the directory to be deleted
+
+ Returns:
+ bool: True if deletion was successful, False otherwise
+ """
try:
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
@@ -55,7 +63,17 @@ def cleanup_attachments(directory_path):
def create_success_response(
summary: str, email_response: dict[str, Any], attachment_info: list[dict[str, Any]]
) -> Response:
- """Create a success response with summary and email details"""
+ """
+ Create a success response with summary and email details
+
+ Args:
+ summary (str): Summary of the email processing
+ email_response (dict): Response from the email sending service
+ attachment_info (list): List of processed attachments
+
+ Returns:
+ Response: FastAPI Response object with JSON content
+ """
return Response(
content=json.dumps(
{
@@ -72,7 +90,17 @@ def create_success_response(
def create_error_response(summary: str, attachment_info: list[dict[str, Any]], error: str) -> Response:
- """Create an error response with summary and error details"""
+ """
+ Create an error response with summary and error details
+
+ Args:
+ summary (str): Summary of the email processing
+ attachment_info (list): List of processed attachments
+ error (str): Error message
+
+ Returns:
+ Response: FastAPI Response object with JSON content
+ """
return Response(
content=json.dumps(
{
@@ -92,7 +120,17 @@ def create_error_response(summary: str, attachment_info: list[dict[str, Any]], e
async def handle_file_attachments(
attachments: list[EmailAttachment], email_id: str, email_data: EmailRequest
) -> tuple[str, list[dict[str, Any]]]:
- """Process uploaded files and save them as attachments"""
+ """
+ Process uploaded files and save them as attachments
+
+ Args:
+ attachments (list[EmailAttachment]): List of EmailAttachment objects
+ email_id (str): Unique identifier for the email
+ email_data (EmailRequest): EmailRequest object containing email details
+
+ Returns:
+ tuple[str, list[dict[str, Any]]]: Tuple containing the directory path and list of processed attachments
+ """
email_attachments_dir = ""
attachment_info = []
@@ -202,7 +240,16 @@ async def handle_file_attachments(
# Helper function to send email reply using SES
async def send_agent_email_reply(email_data: EmailRequest, processing_result: dict[str, Any]) -> dict[str, Any]:
- """Send email reply using SES and return the response details"""
+ """
+ Send email reply using SES and return the response details
+
+ Args:
+ email_data (EmailRequest): EmailRequest object containing email details
+ processing_result (dict): Result of the email processing
+
+ Returns:
+ dict: Response details including status and message ID
+ """
if not processing_result or "email_content" not in processing_result:
logger.error("Invalid processing result format")
return {"status": "error", "error": "Invalid processing result format", "timestamp": datetime.now().isoformat()}
@@ -283,7 +330,15 @@ async def send_agent_email_reply(email_data: EmailRequest, processing_result: di
# Helper function to create sanitized response
def sanitize_processing_result(processing_result: dict[str, Any]) -> dict[str, Any]:
- """Create a clean response suitable for API return and database storage"""
+ """
+ Create a clean response suitable for API return and database storage
+
+ Args:
+ processing_result (dict): Result of the email processing
+
+ Returns:
+ dict: Sanitized response with metadata, research, and attachment info
+ """
if not isinstance(processing_result, dict):
return {"error": "Invalid processing result format", "timestamp": datetime.now().isoformat()}
@@ -329,7 +384,25 @@ async def process_email(
files: Annotated[list[UploadFile] | None, File()] = None,
api_key: str = Depends(api_auth_scheme),
):
- """Process an incoming email with attachments, analyze content, and send reply"""
+ """
+ Process an incoming email with attachments, analyze content, and send reply
+
+ Args:
+ from_email (str): Sender's email address
+ to (str): Recipient's email address
+ subject (str): Subject of the email
+ textContent (str): Plain text content of the email
+ htmlContent (str): HTML content of the email
+ messageId (str): Unique identifier for the email message
+ date (str): Date when the email was sent
+ emailId (str): Unique identifier for the email in the system
+ rawHeaders (str): Raw headers of the email in JSON format
+ files (list[UploadFile] | None): List of uploaded files as attachments
+ api_key (str): API key for authentication
+
+ Returns:
+ Response: FastAPI Response object with JSON content
+ """
# Validate API key
if response := await validate_api_key(api_key):
return response
diff --git a/mxtoai/dependencies.py b/mxtoai/dependencies.py
index 0bb975f..af3ed4d 100644
--- a/mxtoai/dependencies.py
+++ b/mxtoai/dependencies.py
@@ -1,4 +1,5 @@
from mxtoai.email_handles import DEFAULT_EMAIL_HANDLES
from mxtoai.instruction_resolver import ProcessingInstructionsResolver
+# global resolver for processing instructions
processing_instructions_resolver = ProcessingInstructionsResolver(DEFAULT_EMAIL_HANDLES)
diff --git a/mxtoai/email_handles.py b/mxtoai/email_handles.py
index ae46e08..f8a775e 100644
--- a/mxtoai/email_handles.py
+++ b/mxtoai/email_handles.py
@@ -1,6 +1,7 @@
from mxtoai.models import ProcessingInstructions
from mxtoai.prompts import output_prompts, template_prompts
+# default email handles for processing instructions
DEFAULT_EMAIL_HANDLES = [
ProcessingInstructions(
handle="summarize",
diff --git a/mxtoai/email_sender.py b/mxtoai/email_sender.py
index db01954..d0cad7a 100644
--- a/mxtoai/email_sender.py
+++ b/mxtoai/email_sender.py
@@ -90,6 +90,18 @@ async def send_email(
) -> dict[str, Any]:
"""
Send an email using AWS SES.
+
+ Args:
+ to_address: Recipient email address
+ subject: Subject of the email
+ body_text: Plain text body of the email
+ body_html: HTML body of the email (optional)
+ cc_addresses: List of CC addresses (optional)
+ reply_to_addresses: List of reply-to addresses (optional)
+ sender_email: Sender email address (optional, defaults to default_sender_email)
+
+ Returns:
+ The response from AWS SES
"""
try:
# Use provided sender_email or fall back to default
@@ -309,6 +321,12 @@ async def send_reply(
async def verify_sender_email(email_address: str) -> bool:
"""
Verify a sender email address with AWS SES.
+
+ Args:
+ email_address: The email address to verify.
+
+ Returns:
+ bool: True if verification was successful, False otherwise.
"""
try:
# AWS SES client configuration
@@ -345,6 +363,14 @@ async def verify_sender_email(email_address: str) -> bool:
async def test_send_email(to_address, subject="Test from mxtoai", body_text="This is a test email"):
"""
Test email sending functionality.
+
+ Args:
+ to_address: Recipient email address
+ subject: Subject of the test email
+ body_text: Body text of the test email
+
+ Returns:
+ bool: True if the test email was sent successfully, False otherwise.
"""
try:
sender = EmailSender()
@@ -359,6 +385,9 @@ async def test_send_email(to_address, subject="Test from mxtoai", body_text="Thi
async def run_tests():
"""
Run a series of tests for email functionality.
+
+ Returns:
+ bool: True if all tests passed, False otherwise.
"""
test_email = os.getenv("TEST_EMAIL")
if not test_email:
@@ -386,6 +415,9 @@ async def run_tests():
def log_received_email(email_data: EmailRequest) -> None:
"""
Log details about a received email.
+
+ Args:
+ email_data: The email data to log.
"""
logger.info(f"Received email from {email_data.from_email} to {email_data.to}")
logger.info(f"Subject: {email_data.subject}")
@@ -397,6 +429,12 @@ def log_received_email(email_data: EmailRequest) -> None:
def generate_email_id(email_data: EmailRequest) -> str:
"""
Generate a unique ID for an email based on its metadata.
+
+ Args:
+ email_data: The email data to generate an ID for.
+
+ Returns:
+ str: A unique ID for the email.
"""
timestamp = int(time.time())
hash_input = f"{email_data.from_email}-{email_data.to}-{timestamp}"
@@ -406,6 +444,13 @@ def generate_email_id(email_data: EmailRequest) -> str:
def save_attachments(email_data: EmailRequest, email_id: str) -> tuple[str, list[dict[str, Any]]]:
"""
Save email attachments to disk and return their metadata.
+
+ Args:
+ email_data: The email data containing attachments.
+ email_id: The unique ID for the email.
+
+ Returns:
+ tuple[str, list[dict[str, Any]]]: The directory where attachments are saved and a list of attachment metadata.
"""
if not email_data.attachments:
return ATTACHMENTS_DIR, []
@@ -444,6 +489,13 @@ def save_attachments(email_data: EmailRequest, email_id: str) -> tuple[str, list
def prepare_email_for_ai(email_data: EmailRequest, attachment_info: list[dict[str, Any]]) -> dict[str, Any]:
"""
Prepare email data for AI processing.
+
+ Args:
+ email_data: The email data to prepare.
+ attachment_info: Metadata about the attachments.
+
+ Returns:
+ dict[str, Any]: The prepared email data.
"""
# Create a copy to avoid modifying the original
email_dict = deepcopy(email_data.dict())
@@ -464,6 +516,13 @@ def prepare_email_for_ai(email_data: EmailRequest, attachment_info: list[dict[st
async def generate_email_summary(email_dict: dict[str, Any], attachment_info: list[dict[str, Any]]) -> str:
"""
Generate a summary of the email and its attachments using AI.
+
+ Args:
+ email_dict: The email data to summarize.
+ attachment_info: Metadata about the attachments.
+
+ Returns:
+ str: The generated summary.
"""
# TODO: Implement AI-based summarization
return f"Email from {email_dict['from_email']} with {len(attachment_info)} attachments"
@@ -472,6 +531,13 @@ async def generate_email_summary(email_dict: dict[str, Any], attachment_info: li
def create_reply_content(summary: str, attachment_info: list[dict[str, Any]]) -> tuple[str, str]:
"""
Create the content for the email reply in both text and HTML formats.
+
+ Args:
+ summary: The summary of the email.
+ attachment_info: Metadata about the attachments.
+
+ Returns:
+ tuple[str, str]: The plain text and HTML content for the reply.
"""
# Create plain text version
text_content = [
@@ -507,6 +573,14 @@ def create_reply_content(summary: str, attachment_info: list[dict[str, Any]]) ->
async def send_email_reply(email_dict: dict[str, Any], reply_text: str, reply_html: str) -> dict[str, Any]:
"""
Send a reply to the original email.
+
+ Args:
+ email_dict: The original email data.
+ reply_text: The plain text reply body.
+ reply_html: The HTML reply body.
+
+ Returns:
+ dict[str, Any]: The response from the email sending service.
"""
try:
sender = EmailSender()
diff --git a/mxtoai/prompts/base_prompts.py b/mxtoai/prompts/base_prompts.py
index c1d85bb..46d04f3 100644
--- a/mxtoai/prompts/base_prompts.py
+++ b/mxtoai/prompts/base_prompts.py
@@ -1,4 +1,6 @@
-"""Base prompts and common guidelines for email processing."""
+"""
+Base prompts and common guidelines for email processing.
+"""
MARKDOWN_STYLE_GUIDE = """
MARKDOWN FORMATTING REQUIREMENTS:
diff --git a/mxtoai/prompts/template_prompts.py b/mxtoai/prompts/template_prompts.py
index 52c7bee..042f8bb 100644
--- a/mxtoai/prompts/template_prompts.py
+++ b/mxtoai/prompts/template_prompts.py
@@ -1,4 +1,6 @@
-"""Template prompts for different email processing handlers."""
+"""
+Template prompts for different email processing handlers.
+"""
# Summarize email handler template
SUMMARIZE_TEMPLATE = """
diff --git a/mxtoai/routed_litellm_model.py b/mxtoai/routed_litellm_model.py
index 0620b6a..9c1a871 100644
--- a/mxtoai/routed_litellm_model.py
+++ b/mxtoai/routed_litellm_model.py
@@ -144,6 +144,19 @@ def __call__(
tools_to_call_from: Optional[list[Tool]] = None,
**kwargs, # kwargs from the caller of this RoutedLiteLLMModel instance
) -> ChatMessage:
+ """
+ Generate a response based on the provided messages and other parameters.
+
+ Args:
+ messages (list[dict[str, Any]]): List of messages to process.
+ stop_sequences (Optional[list[str]]): List of stop sequences.
+ grammar (Optional[str]): Grammar to use for the response.
+ tools_to_call_from (Optional[list[Tool]]): List of tools to call from.
+ **kwargs: Additional arguments passed to the generate method.
+
+ Returns:
+ ChatMessage: The generated chat message.
+ """
try:
target_model_group = self._get_target_model()
diff --git a/mxtoai/scripts/citation_tools.py b/mxtoai/scripts/citation_tools.py
index aaf8e88..34d18bd 100644
--- a/mxtoai/scripts/citation_tools.py
+++ b/mxtoai/scripts/citation_tools.py
@@ -18,7 +18,9 @@
def reset_citation_counter():
- """Reset the global URL store."""
+ """
+ Reset the global URL store
+ """
global _all_visited_urls
_all_visited_urls = []
@@ -59,21 +61,16 @@ def forward(self, query: str, filter_year: Optional[int] = None) -> str:
Original search results
"""
- # Get original results
original_results = super().forward(query, filter_year)
# Extract URLs from search results
urls = re.findall(r"\[.*?\]\((https?://.*?)\)", original_results)
-
- # Extract titles alongside URLs where possible
title_url_matches = re.findall(r"\[(.*?)\]\((https?://.*?)\)", original_results)
- # Add URLs to the global collection
for match in title_url_matches:
title, url = match
add_url_to_references(url=url, title=title)
- # Add any URLs that didn't have a title match
for url in urls:
if url not in [u.get("url") for u in _all_visited_urls]:
add_url_to_references(url=url)
@@ -100,18 +97,14 @@ def forward(self, url: str) -> str:
# Get original content
original_content = super().forward(url)
- # Extract title if present
title_match = (
re.search(r"
(.*?)", original_content)
or re.search(r"(.*?)
", original_content)
or re.search(r"# (.*?)$", original_content, re.MULTILINE)
)
-
title = title_match.group(1) if title_match else None
- # Add URL to the global collection
add_url_to_references(url=url, title=title)
-
return original_content
diff --git a/mxtoai/scripts/cookies.py b/mxtoai/scripts/cookies.py
index fc5cc22..2263d8c 100644
--- a/mxtoai/scripts/cookies.py
+++ b/mxtoai/scripts/cookies.py
@@ -706,7 +706,6 @@
},
]
-# Create a RequestsCookieJar instance
COOKIES = RequestsCookieJar()
# Add cookies to the jar
diff --git a/mxtoai/scripts/email_processor.py b/mxtoai/scripts/email_processor.py
index 6e97f51..10e960a 100644
--- a/mxtoai/scripts/email_processor.py
+++ b/mxtoai/scripts/email_processor.py
@@ -6,7 +6,9 @@
class EmailProcessor:
- """Process email content and attachments."""
+ """
+ Process email content and attachments.
+ """
def __init__(self, temp_dir: str = "email_attachments"):
"""
@@ -41,13 +43,8 @@ def process_email_file(self, email_file: str) -> dict[str, Any]:
"date": msg.get("date", ""),
}
- # Extract body content (plain text preferred)
body = self._extract_body(msg)
-
- # Extract and save attachments
attachments = self._extract_attachments(msg, email_file)
-
- # Identify research instructions from the body
research_instructions = self._extract_research_instructions(body)
return {
@@ -105,9 +102,7 @@ def _html_to_text(self, html: str) -> str:
# Simple implementation - can be improved with BeautifulSoup
import re
- # Remove HTML tags
text = re.sub(r"<[^>]+>", " ", html)
- # Fix whitespace
return re.sub(r"\s+", " ", text).strip()
def _extract_attachments(self, msg, email_file: str) -> list[str]:
@@ -127,7 +122,7 @@ def _extract_attachments(self, msg, email_file: str) -> list[str]:
os.makedirs(attachment_dir, exist_ok=True)
if msg.is_multipart():
- for _i, part in enumerate(msg.iter_parts()):
+ for _, part in enumerate(msg.iter_parts()):
filename = part.get_filename()
if filename:
# Clean the filename
diff --git a/mxtoai/scripts/gaia_scorer.py b/mxtoai/scripts/gaia_scorer.py
index bb89d58..8914f6e 100644
--- a/mxtoai/scripts/gaia_scorer.py
+++ b/mxtoai/scripts/gaia_scorer.py
@@ -4,6 +4,14 @@
def normalize_number_str(number_str: str) -> float:
+ """
+ Normalize a number string by removing common units and commas.
+
+ Args:
+ number_str: str, the number string to normalize
+ Returns:
+ float, the normalized number
+ """
# we replace these common units and commas to allow
# conversion to float
for char in ["$", "%", ","]:
@@ -18,6 +26,16 @@ def split_string(
s: str,
char_list: list[str] | None = None,
) -> list[str]:
+ """
+ Split a string into a list of elements based on specified delimiters.
+
+ Args:
+ s: str, the string to split
+ char_list: list of str, delimiters to use for splitting (default: [",", ";"])
+
+ Returns:
+ list of str, the split elements
+ """
if char_list is None:
char_list = [",", ";"]
pattern = f"[{''.join(char_list)}]"
@@ -25,6 +43,15 @@ def split_string(
def is_float(element: any) -> bool:
+ """
+ Check if the element can be converted to a float.
+
+ Args:
+ element: any, the element to check
+
+ Returns:
+ bool, True if the element can be converted to a float, False otherwise
+ """
try:
float(element)
return True
@@ -36,6 +63,16 @@ def question_scorer(
model_answer: str,
ground_truth: str,
) -> bool:
+ """
+ Compare the model answer with the ground truth.
+
+ Args:
+ model_answer: str, the answer generated by the model
+ ground_truth: str, the correct answer
+
+ Returns:
+ bool, True if the model answer is correct, False otherwise
+ """
# if gt is a number
if is_float(ground_truth):
normalized_answer = normalize_number_str(str(model_answer))
@@ -43,17 +80,13 @@ def question_scorer(
# if gt is a list
if any(char in ground_truth for char in [",", ";"]):
- # question with the fish: normalization removes punct
-
gt_elems = split_string(ground_truth)
ma_elems = split_string(model_answer)
- # check length is the same
if len(gt_elems) != len(ma_elems):
warnings.warn("Answer lists have different lengths, returning False.", UserWarning, stacklevel=2)
return False
- # compare each element as float or str
comparisons = []
for ma_elem, gt_elem in zip(ma_elems, gt_elems, strict=False):
if is_float(gt_elem):
@@ -70,7 +103,17 @@ def question_scorer(
return normalize_str(model_answer) == normalize_str(ground_truth)
-def check_prediction_contains_answer_letters_in_order(prediction, true_answer):
+def check_prediction_contains_answer_letters_in_order(prediction: str, true_answer: str) -> bool:
+ """
+ Check if the prediction contains the letters of the true answer in order.
+
+ Args:
+ prediction: str, the predicted answer
+ true_answer: str, the correct answer
+
+ Returns:
+ bool, True if the prediction contains the letters of the true answer in order, False otherwise
+ """
prediction = prediction.lower()
true_answer = true_answer.lower()
if len(prediction) > len(true_answer) * 3:
@@ -84,7 +127,18 @@ def check_prediction_contains_answer_letters_in_order(prediction, true_answer):
return True
-def check_close_call(prediction, true_answer, is_correct):
+def check_close_call(prediction: str, true_answer: str, is_correct: bool) -> bool:
+ """
+ Check if the prediction is a close call to the true answer.
+
+ Args:
+ prediction: str, the predicted answer
+ true_answer: str, the correct answer
+ is_correct: bool, whether the prediction is correct
+
+ Returns:
+ bool, True if the prediction is a close call to the true answer, False otherwise
+ """
if is_correct:
return True
if is_float(true_answer):
@@ -95,19 +149,18 @@ def check_close_call(prediction, true_answer, is_correct):
)
-def normalize_str(input_str, remove_punct=True) -> str:
+def normalize_str(input_str: str, remove_punct: bool = True) -> str:
"""
Normalize a string by:
- Removing all white spaces
- Optionally removing punctuation (if remove_punct is True)
- Converting to lowercase
- Parameters:
- - input_str: str, the string to normalize
- - remove_punct: bool, whether to remove punctuation (default: True)
+ Args:
+ input_str: str, the string to normalize
+ remove_punct: bool, whether to remove punctuation (default: True)
Returns:
- - str, the normalized string
-
+ str, the normalized string
"""
# Remove all white spaces. Required e.g for seagull vs. sea gull
no_spaces = re.sub(r"\s", "", input_str)
diff --git a/mxtoai/scripts/report_formatter.py b/mxtoai/scripts/report_formatter.py
index 3ea0da6..85db204 100644
--- a/mxtoai/scripts/report_formatter.py
+++ b/mxtoai/scripts/report_formatter.py
@@ -11,7 +11,9 @@
class ReportFormatter:
- """Format research reports and emails for delivery."""
+ """
+ Format research reports and emails for delivery.
+ """
def __init__(self, template_dir: Optional[str] = None):
"""
@@ -23,15 +25,12 @@ def __init__(self, template_dir: Optional[str] = None):
"""
# Set up template directory
if template_dir is None:
- # Default to a templates directory next to this file
self.template_dir = os.path.join(os.path.dirname(__file__), "templates")
else:
self.template_dir = template_dir
- # Initialize Jinja environment
+ # Initialize Jinja environment and load themes
self._init_template_env()
-
- # Load themes
self._load_themes()
# Default signature
@@ -111,7 +110,14 @@ def format_report(
return content
def _process_citations(self, content: str) -> str:
- """Process citations and references in the content."""
+ """
+ Process citations and references in the content.
+
+ Args:
+ content: Report content
+ Returns:
+ Processed content with citations and references formatted
+ """
try:
# Find all references sections
reference_sections = list(
@@ -160,12 +166,18 @@ def replace_citation(match):
return content.strip() + "\n\n" + "\n".join(formatted_refs)
except Exception as e:
- # Log error but don't break formatting
logger.exception(f"Error processing citations: {e!s}")
return content
def _remove_existing_signatures(self, content: str) -> str:
- """Remove any existing signature blocks from the content."""
+ """
+ Remove any existing signature blocks from the content.
+
+ Args:
+ content: Report content
+ Returns:
+ Content with existing signatures removed
+ """
signature_patterns = [
r"\n\s*Warm regards,?\s*\n\s*MXtoAI Assistant\s*\n",
r"\n\s*Best regards,?\s*\n\s*MXtoAI Assistant\s*\n",
diff --git a/mxtoai/scripts/text_inspector_tool.py b/mxtoai/scripts/text_inspector_tool.py
index 4ed7708..4ecc3ae 100644
--- a/mxtoai/scripts/text_inspector_tool.py
+++ b/mxtoai/scripts/text_inspector_tool.py
@@ -7,6 +7,9 @@
class TextInspectorTool(Tool):
+ """
+ Tool to inspect files as text and ask questions about them.
+ """
name = "inspect_file_as_text"
description = """
You cannot load files yourself: instead call this tool to read a file as markdown text and ask questions about it.
@@ -27,11 +30,27 @@ class TextInspectorTool(Tool):
md_converter = MarkdownConverter()
def __init__(self, model: Model, text_limit: int):
+ """
+ Initialize the TextInspectorTool.
+ Args:
+ model: The model to use for processing the text.
+ text_limit: The maximum number of characters to process from the file.
+ """
super().__init__()
self.model = model
self.text_limit = text_limit
def forward_initial_exam_mode(self, file_path, question):
+ """
+ Process the file and return a short caption based on the content.
+
+ Args:
+ file_path: Path to the file to be processed.
+ question: Optional question to guide the caption generation.
+
+ Returns:
+ str: The generated caption or the text content of the file.
+ """
try:
if file_path[-4:] in [".png", ".jpg"]:
msg = "Cannot use inspect_file_as_text tool with images: use visualizer instead!"
@@ -82,6 +101,16 @@ def forward_initial_exam_mode(self, file_path, question):
return f"Error processing file: {e!s}"
def forward(self, file_path, question: Optional[str] = None) -> str:
+ """
+ Process the file and return a response based on the content and question.
+
+ Args:
+ file_path: Path to the file to be processed.
+ question: Optional question to guide the response generation.
+
+ Returns:
+ str: The generated response or the text content of the file.
+ """
try:
if file_path[-4:] in [".png", ".jpg"]:
msg = "Cannot use inspect_file_as_text tool with images: use visualizer instead!"
diff --git a/mxtoai/scripts/visual_qa.py b/mxtoai/scripts/visual_qa.py
index cd194af..64c0322 100644
--- a/mxtoai/scripts/visual_qa.py
+++ b/mxtoai/scripts/visual_qa.py
@@ -21,7 +21,16 @@
logger = get_logger("azure_visualizer")
-def process_images_and_text(image_path, query, client):
+def process_images_and_text(image_path: str, query: str, client: InferenceClient):
+ """
+ Process images and text using the IDEFICS model.
+
+ Args:
+ image_path: Path to the image file.
+ query: The question to ask about the image.
+ client: Inference client for the model.
+
+ """
from transformers import AutoProcessor
messages = [
@@ -66,7 +75,15 @@ def encode_local_image(image_path):
# Function to encode the image
-def encode_image(image_path):
+def encode_image(image_path: str) -> str:
+ """
+ Encode an image to base64 format.
+
+ Args:
+ image_path: The path to the image file.
+ Returns:
+ str: The base64 encoded string of the image.
+ """
if image_path.startswith("http"):
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0"
request_kwargs = {
@@ -99,7 +116,16 @@ def encode_image(image_path):
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {os.getenv('OPENAI_API_KEY')}"}
-def resize_image(image_path):
+def resize_image(image_path: str) -> str:
+ """
+ Resize the image to half its original size.
+
+ Args:
+ image_path: The path to the image file.
+
+ Returns:
+ str: The path to the resized image.
+ """
img = Image.open(image_path)
width, height = img.size
img = img.resize((int(width / 2), int(height / 2)))
@@ -123,6 +149,16 @@ class VisualQATool(Tool):
client = InferenceClient("HuggingFaceM4/idefics2-8b-chatty")
def forward(self, image_path: str, question: Optional[str] = None) -> str:
+ """
+ Process the image and return a short caption based on the content.
+
+ Args:
+ image_path: The path to the image on which to answer the question. This should be a local path to downloaded image.
+ question: The question to answer.
+
+ Returns:
+ str: The generated caption or the text content of the file.
+ """
output = ""
add_note = False
if not question:
diff --git a/mxtoai/tasks.py b/mxtoai/tasks.py
index f174618..08dd1b8 100644
--- a/mxtoai/tasks.py
+++ b/mxtoai/tasks.py
@@ -49,7 +49,12 @@
dramatiq.set_broker(rabbitmq_broker)
def cleanup_attachments(email_attachments_dir: str) -> None:
- """Clean up attachments after processing."""
+ """
+ Clean up attachments after processing.
+
+ Args:
+ email_attachments_dir: Directory containing email attachments
+ """
try:
dir_path = Path(email_attachments_dir)
if dir_path.exists():
@@ -65,7 +70,17 @@ def cleanup_attachments(email_attachments_dir: str) -> None:
logger.exception(f"Error cleaning up attachments: {e!s}")
-def should_retry(retries_so_far, exception):
+def should_retry(retries_so_far: int, exception: Exception) -> bool:
+ """
+ Determine whether to retry the task based on the exception and retry count.
+
+ Args:
+ retries_so_far: Number of retries attempted
+ exception: Exception raised during task execution
+
+ Returns:
+ bool: True if the task should be retried, False otherwise
+ """
logger.warning(f"Retrying task after exception: {exception!s}, retries so far: {retries_so_far}")
return retries_so_far < 3
diff --git a/mxtoai/tools/attachment_processing_tool.py b/mxtoai/tools/attachment_processing_tool.py
index 5ff254f..240a7eb 100644
--- a/mxtoai/tools/attachment_processing_tool.py
+++ b/mxtoai/tools/attachment_processing_tool.py
@@ -65,6 +65,12 @@ class AttachmentProcessingTool(Tool):
output_type = "object"
def __init__(self, model: Optional[Model] = None):
+ """
+ Initialize the attachment processing tool.
+
+ Args:
+ model: Optional model for generating summaries or processing content.
+ """
super().__init__()
self.md_converter = MarkdownConverter()
self.model = model
@@ -77,7 +83,15 @@ def __init__(self, model: Optional[Model] = None):
self.attachments_dir.mkdir(parents=True, exist_ok=True)
def _validate_attachment_path(self, file_path: str) -> Path:
- """Validate and resolve the attachment file path."""
+ """
+ Validate and resolve the attachment file path.
+
+ Args:
+ file_path: Path to the attachment file.
+
+ Returns:
+ Path: The resolved file path.
+ """
try:
if not file_path:
msg = "Empty file path provided"
@@ -106,7 +120,15 @@ def _validate_attachment_path(self, file_path: str) -> Path:
raise
def _process_document(self, file_path: Path) -> str:
- """Process document using MarkdownConverter."""
+ """
+ Process document using MarkdownConverter.
+
+ Args:
+ file_path: Path to the document file.
+
+ Returns:
+ str: The text content extracted from the document.
+ """
try:
result = self.md_converter.convert(str(file_path))
if not result or not hasattr(result, "text_content"):
@@ -118,7 +140,16 @@ def _process_document(self, file_path: Path) -> str:
raise
def forward(self, attachments: list[dict[str, Any]], mode: str = "basic") -> dict[str, Any]:
- """Process email attachments synchronously."""
+ """
+ Process email attachments synchronously.
+
+ Args:
+ attachments: List of attachment dictionaries containing file information.
+ mode: Processing mode: 'basic' for metadata only, 'full' for complete content analysis.
+
+ Returns:
+ dict: Processed attachments with content and summaries.
+ """
processed_attachments = []
logger.info(f"Processing {len(attachments)} attachments in {mode} mode")
@@ -206,7 +237,15 @@ def forward(self, attachments: list[dict[str, Any]], mode: str = "basic") -> dic
return {"attachments": processed_attachments, "summary": self._create_attachment_summary(processed_attachments)}
def _create_attachment_summary(self, attachments: list[dict[str, Any]]) -> str:
- """Create a summary of processed attachments."""
+ """
+ Create a summary of processed attachments.
+
+ Args:
+ attachments: List of processed attachment dictionaries.
+
+ Returns:
+ str: Summary of processed attachments.
+ """
if not attachments:
return "No attachments processed."
diff --git a/mxtoai/tools/fallback_search_tool.py b/mxtoai/tools/fallback_search_tool.py
index 4856d08..02b2a21 100644
--- a/mxtoai/tools/fallback_search_tool.py
+++ b/mxtoai/tools/fallback_search_tool.py
@@ -24,6 +24,13 @@ def __init__(
primary_tool: Optional[Tool] = None,
secondary_tool: Optional[Tool] = None,
):
+ """
+ Initialize the FallbackWebSearchTool.
+
+ Args:
+ primary_tool: The primary search tool to use (e.g., GoogleSearchTool).
+ secondary_tool: The secondary search tool to use if the primary fails (e.g., DuckDuckGoSearchTool).
+ """
if not primary_tool and not secondary_tool:
msg = "FallbackWebSearchTool requires at least one search tool."
raise ValueError(msg)
@@ -36,6 +43,12 @@ def __init__(
def forward(self, query: str) -> str:
"""
Execute the search, attempting primary tool first, then secondary.
+
+ Args:
+ query: The search query to perform.
+
+ Returns:
+ str: The search results from the successful tool.
"""
if self.primary_tool:
try:
diff --git a/mxtoai/tools/mock_jina_service.py b/mxtoai/tools/mock_jina_service.py
index 56cc6ea..da1d67c 100644
--- a/mxtoai/tools/mock_jina_service.py
+++ b/mxtoai/tools/mock_jina_service.py
@@ -9,15 +9,27 @@
class MockJinaService:
- """Mock service to simulate Jina AI's DeepSearch API behavior for load testing."""
+ """
+ Mock service to simulate Jina AI's DeepSearch API behavior for load testing.
+ """
def __init__(self):
- """Initialize the mock service with configuration."""
+ """
+ Initialize the mock service with configuration.
+ """
self.min_delay = 60 # 1 minute minimum
self.max_delay = 600 # 10 minutes maximum
def _generate_mock_urls(self, num_urls: int = 10) -> dict[str, list]:
- """Generate mock visited and read URLs."""
+ """
+ Generate mock visited and read URLs.
+
+ Args:
+ num_urls: Number of URLs to generate
+
+ Returns:
+ dict: Dictionary containing visited and read URLs
+ """
domains = ["arxiv.org", "wikipedia.org", "github.com", "research-papers.org", "academic-journals.com"]
all_urls = [
@@ -30,7 +42,15 @@ def _generate_mock_urls(self, num_urls: int = 10) -> dict[str, list]:
return {"visitedURLs": all_urls, "readURLs": read_urls}
def _generate_mock_annotations(self, urls: dict[str, list]) -> list:
- """Generate mock annotations for the URLs."""
+ """
+ Generate mock annotations for the URLs.
+
+ Args:
+ urls: Dictionary containing visited and read URLs
+
+ Returns:
+ list: List of annotations for the URLs
+ """
annotations = []
for i, url in enumerate(urls["readURLs"], 1):
annotations.append(
@@ -47,7 +67,16 @@ def _generate_mock_annotations(self, urls: dict[str, list]) -> list:
return annotations
def _generate_mock_content(self, query: str, annotations: list) -> str:
- """Generate mock research content with citations."""
+ """
+ Generate mock research content with citations.
+
+ Args:
+ query: Research query
+ annotations: List of annotations for the URLs
+
+ Returns:
+ str: Generated content with citations
+ """
sections = ["Introduction", "Background", "Methodology", "Results", "Discussion", "Conclusion"]
content_parts = []
@@ -75,7 +104,15 @@ def _generate_mock_content(self, query: str, annotations: list) -> str:
return "\n".join(content_parts)
def _generate_mock_response(self, query: str) -> dict[str, Any]:
- """Generate a complete mock response."""
+ """
+ Generate a complete mock response.
+
+ Args:
+ query: Research query
+
+ Returns:
+ dict: Mock response containing choices, URLs, and usage information
+ """
# Generate mock URLs
urls = self._generate_mock_urls()
@@ -97,7 +134,14 @@ def _generate_mock_response(self, query: str) -> dict[str, Any]:
}
def _stream_mock_response(self, response: dict[str, Any]) -> Generator[dict[str, Any]]:
- """Stream a mock response with realistic delays."""
+ """
+ Stream a mock response with realistic delays.
+
+ Args:
+ response: Mock response containing choices, URLs, and usage information
+ Yields:
+ dict: Streamed response with role, content, and annotations
+ """
content = response["choices"][0]["message"]["content"]
annotations = response["choices"][0]["message"]["annotations"]
@@ -132,7 +176,17 @@ def _stream_mock_response(self, response: dict[str, Any]) -> Generator[dict[str,
def process_request(
self, query: str, stream: bool = False, reasoning_effort: str = "medium"
) -> dict[str, Any] | Generator[dict[str, Any]]:
- """Process a mock request with realistic delays."""
+ """
+ Process a mock request with realistic delays.
+
+ Args:
+ query: Research query
+ stream: Whether to stream the response
+ reasoning_effort: Level of reasoning effort ("low", "medium", "high")
+
+ Returns:
+ dict or Generator: Mock response or streamed response
+ """
# Calculate delay based on reasoning effort
effort_multipliers = {"low": 0.7, "medium": 1.0, "high": 1.3}
diff --git a/mxtoai/tools/schedule_tool.py b/mxtoai/tools/schedule_tool.py
index 069633c..74f1dd6 100644
--- a/mxtoai/tools/schedule_tool.py
+++ b/mxtoai/tools/schedule_tool.py
@@ -14,7 +14,9 @@
class EventDetails(BaseModel):
- """Data model for event details extracted by the LLM."""
+ """
+ Data model for event details extracted by the LLM.
+ """
title: str = Field(..., description="The title or summary of the event.")
start_time: datetime = Field(..., description="The start date and time of the event. Must include timezone info.")
@@ -39,7 +41,9 @@ def check_timezone_awareness(cls, v):
# Inherit from smolagents.Tool
class ScheduleTool(Tool):
- """Tool to generate iCalendar (.ics) data and 'Add to Calendar' links."""
+ """
+ Tool to generate iCalendar (.ics) data and 'Add to Calendar' links.
+ """
# Add required attributes for Smol Gents
name = "schedule_generator"
@@ -81,7 +85,15 @@ class ScheduleTool(Tool):
)
def generate_ics_content(self, details: EventDetails) -> str:
- """Generates the content for an .ics calendar file."""
+ """
+ Generates the content for an .ics calendar file.
+
+ Args:
+ details: Event details to include in the .ics file.
+
+ Returns:
+ str: The .ics file content as a string.
+ """
c = Calendar()
e = Event()
@@ -111,7 +123,15 @@ def generate_ics_content(self, details: EventDetails) -> str:
return str(c) + "\\n"
def generate_calendar_links(self, details: EventDetails) -> dict[str, str]:
- """Generates 'Add to Calendar' links for popular services."""
+ """
+ Generates 'Add to Calendar' links for popular services.
+
+ Args:
+ details: Event details to include in the links.
+
+ Returns:
+ dict: Dictionary containing links for Google Calendar and Outlook.
+ """
links = {}
# Ensure start_time is timezone-aware (validator should handle this, but double-check)
@@ -171,6 +191,17 @@ def forward(
Expects datetime strings in ISO 8601 format (or similar parsable format).
LLM should be prompted to provide dates in this format including timezone offset.
e.g., "2024-07-29T14:30:00+01:00" or "2024-07-29T13:30:00Z"
+
+ Args:
+ title: The title or summary of the event.
+ start_time: The start date and time (ISO 8601 format with timezone).
+ end_time: The optional end date and time (ISO 8601 format with timezone).
+ description: A detailed description of the event (optional).
+ location: The location (physical address or virtual meeting link) (optional).
+ attendees: List of attendee email addresses (optional).
+
+ Returns:
+ dict: A dictionary containing the status, ICS content, calendar links, and a message.
"""
logger.info(f"Running {self.name} tool with title: '{title}'") # Added logging
try:
diff --git a/mxtoai/whitelist.py b/mxtoai/whitelist.py
index 570563b..b229b9f 100644
--- a/mxtoai/whitelist.py
+++ b/mxtoai/whitelist.py
@@ -12,7 +12,9 @@
def init_supabase():
- """Initialize Supabase client"""
+ """
+ Initialize Supabase client
+ """
global supabase
if supabase is None:
try:
@@ -65,5 +67,10 @@ async def is_email_whitelisted(email: str) -> tuple[bool, bool]:
def get_whitelist_signup_url() -> str:
- """Get the URL where users can sign up to be whitelisted"""
+ """
+ Get the URL where users can sign up to be whitelisted
+
+ Returns:
+ str: The URL for whitelist signup
+ """
return os.getenv("WHITELIST_SIGNUP_URL", "https://mxtoai.com/whitelist-signup")