Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion mxtoai/_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,18 @@ def get_logger(source: str) -> Any:
def span(
msg_template: str, name: str | None = None, tags: Sequence[str] | None = None, **msg_template_kwargs: Any
) -> Any:
"""Context manager for creating spans in logging."""
"""
Context manager for creating spans in logging.

Args:
msg_template (str): The message template for the span.
name (str | None): Optional name for the span.
tags (Sequence[str] | None): Optional tags for the span.
**msg_template_kwargs: Additional keyword arguments for the message template.

Yields:
Any: The span context manager or a dummy context manager.
"""
# Check if LOGFIRE_TOKEN environment variable is defined
if os.getenv("LOGFIRE_TOKEN"):
if tags:
Expand Down
119 changes: 98 additions & 21 deletions mxtoai/agents/email_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,12 @@ def __init__(
logger.info("Email agent initialized successfully")

def _init_agent(self):
"""Initialize the ToolCallingAgent with Azure OpenAI."""
# Initialize the model with routing capabilities
self.routed_model = RoutedLiteLLMModel() # Store as instance variable to update handle later
"""
Initialize the ToolCallingAgent with Azure OpenAI.
"""
# Initialize the routed model with the default model group
self.routed_model = RoutedLiteLLMModel()

# Initialize the agent
self.agent = ToolCallingAgent(
model=self.routed_model,
tools=self.available_tools,
Expand All @@ -138,7 +139,12 @@ def _init_agent(self):
logger.debug("Agent initialized with routed model configuration")

def _initialize_search_tools(self) -> SearchWithFallbackTool:
"""Initializes and configures the search tools, returning the SearchWithFallbackTool."""
"""
Initializes and configures the search tools, returning the SearchWithFallbackTool.

Returns:
SearchWithFallbackTool: The configured search tool with Bing and DuckDuckGo as primary engines and Google as fallback.
"""
bing_search_tool = WebSearchTool(engine="bing", max_results=5)
logger.debug("Initialized WebSearchTool with Bing engine.")

Expand Down Expand Up @@ -167,6 +173,25 @@ def _initialize_search_tools(self) -> SearchWithFallbackTool:
logger.info(f"Initialized SearchWithFallbackTool. Primary engines: {primary_names}, Fallback: {fallback_name}")
return search_tool

def _get_required_actions(self, mode: str) -> List[str]:
"""
Get list of required actions based on mode.

Args:
mode: The mode of operation (e.g., "summary", "reply", "research", "full")

Returns:
List[str]: List of actions to be performed by the agent
"""
actions = []
if mode in ["summary", "full"]:
actions.append("Generate summary")
if mode in ["reply", "full"]:
actions.append("Generate reply")
if mode in ["research", "full"]:
actions.append("Conduct research")
return actions

def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]:
"""
Initialize Google search tool with either SerpAPI or Serper provider.
Expand Down Expand Up @@ -195,7 +220,15 @@ def _initialize_google_search_tool(self) -> Optional[GoogleSearchTool]:
return None

def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional[DeepResearchTool]:
"""Initializes the DeepResearchTool if API key is available."""
"""
Initializes the DeepResearchTool if API key is available.

Args:
enable_deep_research: Flag to enable deep research functionality

Returns:
Optional[DeepResearchTool]: Initialized DeepResearchTool instance or None if API key is not found
"""
research_tool: Optional[DeepResearchTool] = None
if os.getenv("JINA_API_KEY"):
research_tool = DeepResearchTool()
Expand All @@ -210,7 +243,18 @@ def _initialize_deep_research_tool(self, enable_deep_research: bool) -> Optional
return research_tool

def _create_task(self, email_request: EmailRequest, email_instructions: ProcessingInstructions) -> str:
"""Create a task description for the agent based on email handle instructions."""
"""
Create a task description for the agent based on email handle instructions.

Args:
email_request: EmailRequest instance containing email data
email_instructions: EmailHandleInstructions object containing processing configuration

Returns:
str: The task description for the agent
"""

# process attachments if specified
attachments = self._format_attachments(email_request.attachments) \
if email_instructions.process_attachments and email_request.attachments else []

Expand All @@ -224,14 +268,31 @@ def _create_task(self, email_request: EmailRequest, email_instructions: Processi
)

def _format_attachments(self, attachments: List[EmailAttachment]) -> List[str]:
"""Format attachment details for inclusion in the task."""
"""
Format attachment details for inclusion in the task.

Args:
attachments: List of EmailAttachment objects

Returns:
List[str]: Formatted attachment details
"""
return [
f"- {att.filename} (Type: {att.contentType}, Size: {att.size} bytes)\n"
f' EXACT FILE PATH: "{att.path}"'
for att in attachments
]
def _create_email_context(self, email_request: EmailRequest, attachment_details=None) -> str:
"""Generate context information from the email request."""
"""
Generate context information from the email request.

Args:
email_request: EmailRequest instance containing email data
attachment_details: List of formatted attachment details

Returns:
str: The context information for the agent
"""
recipients = ", ".join(email_request.recipients) if email_request.recipients else "N/A"
attachments_info = f"Available Attachments:\n{chr(10).join(attachment_details)}" if attachment_details else "No attachments provided."

Expand All @@ -248,7 +309,15 @@ def _create_email_context(self, email_request: EmailRequest, attachment_details=
"""

def _create_attachment_task(self, attachment_details: List[str]) -> str:
"""Return instructions for processing attachments, if any."""
"""
Return instructions for processing attachments, if any.

Args:
attachment_details: List of formatted attachment details

Returns:
str: Instructions for processing attachments
"""
return f"Process these attachments:\n{chr(10).join(attachment_details)}" if attachment_details else ""

def _create_task_template(
Expand All @@ -260,7 +329,22 @@ def _create_task_template(
deep_research_mandatory: bool = False,
output_template: str = "",
) -> str:
"""Combine all task components into the final task description."""
"""
Combine all task components into the final task description.

Args:
handle: The email handle being processed.
email_context: The context information extracted from the email.
handle_specific_template: Any specific template for the handle.
attachment_task: Instructions for processing attachments.
deep_research_mandatory: Flag indicating if deep research is mandatory.
output_template: The output template to use.

Returns:
str: The complete task description for the agent.
"""

# Merge the task components into a single string by listing the sections
sections = [
f"Process this email according to the '{handle}' instruction type.\n",
email_context,
Expand Down Expand Up @@ -330,7 +414,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
)
tool_name = None # Reset tool_name if extraction failed

# Revised Output Extraction
action_out = getattr(step, "action_output", None)
obs_out = getattr(step, "observations", None)

Expand Down Expand Up @@ -422,7 +505,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
logger.debug(f"[Memory Step {i+1}] Matched tool: deep_research")
try:
if isinstance(tool_output, dict):
# Store the primary findings content
research_findings_content = tool_output.get("findings", "")
# Store metadata separately
research_metadata = {
Expand Down Expand Up @@ -533,7 +615,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic

# --- Format the selected content ---
if email_body_content:
# Remove signature remnants before formatting
signature_markers = [
"Best regards,\nMXtoAI Assistant",
"Best regards,",
Expand All @@ -552,7 +633,6 @@ def _process_agent_result(self, final_answer_obj: Any, agent_steps: List) -> Dic
).strip()
logger.debug("Removed potential signature remnants from email body content.")

# Format using ReportFormatter
result["email_content"]["text"] = self.report_formatter.format_report(
email_body_content, format_type="text", include_signature=True
)
Expand Down Expand Up @@ -637,13 +717,10 @@ def process_email(

"""
try:
# Update the model's current handle
# create task
self.routed_model.current_handle = email_instructions

# Create task with specific instructions
task = self._create_task(email_request, email_instructions)

# Run the agent
try:
logger.info("Starting agent execution...")
final_answer_obj = self.agent.run(task)
Expand All @@ -670,7 +747,7 @@ def process_email(
if not processed_result.get("email_content") or not processed_result["email_content"].get("text"):
msg = "No reply text was generated by _process_agent_result"
logger.error(msg)
# Populate errors within the existing structure if possible

if "metadata" not in processed_result:
processed_result["metadata"] = {}
if "errors" not in processed_result["metadata"]:
Expand All @@ -680,7 +757,7 @@ def process_email(
processed_result["metadata"]["email_sent"] = {}
processed_result["metadata"]["email_sent"]["status"] = "error"
processed_result["metadata"]["email_sent"]["error"] = msg
# Return the partially processed result with error flags

return processed_result

logger.info(f"Email processed successfully with handle: {email_instructions.handle}")
Expand Down
89 changes: 81 additions & 8 deletions mxtoai/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,16 @@


# Function to cleanup attachment files and directory
def cleanup_attachments(directory_path):
"""Delete attachment directory and all its contents"""
def cleanup_attachments(directory_path: str) -> bool:
"""
Delete attachment directory and all its contents

Args:
directory_path (str): Path to the directory to be deleted

Returns:
bool: True if deletion was successful, False otherwise
"""
try:
if os.path.exists(directory_path):
shutil.rmtree(directory_path)
Expand All @@ -55,7 +63,17 @@ def cleanup_attachments(directory_path):
def create_success_response(
summary: str, email_response: dict[str, Any], attachment_info: list[dict[str, Any]]
) -> Response:
"""Create a success response with summary and email details"""
"""
Create a success response with summary and email details

Args:
summary (str): Summary of the email processing
email_response (dict): Response from the email sending service
attachment_info (list): List of processed attachments

Returns:
Response: FastAPI Response object with JSON content
"""
return Response(
content=json.dumps(
{
Expand All @@ -72,7 +90,17 @@ def create_success_response(


def create_error_response(summary: str, attachment_info: list[dict[str, Any]], error: str) -> Response:
"""Create an error response with summary and error details"""
"""
Create an error response with summary and error details

Args:
summary (str): Summary of the email processing
attachment_info (list): List of processed attachments
error (str): Error message

Returns:
Response: FastAPI Response object with JSON content
"""
return Response(
content=json.dumps(
{
Expand All @@ -92,7 +120,17 @@ def create_error_response(summary: str, attachment_info: list[dict[str, Any]], e
async def handle_file_attachments(
attachments: list[EmailAttachment], email_id: str, email_data: EmailRequest
) -> tuple[str, list[dict[str, Any]]]:
"""Process uploaded files and save them as attachments"""
"""
Process uploaded files and save them as attachments

Args:
attachments (list[EmailAttachment]): List of EmailAttachment objects
email_id (str): Unique identifier for the email
email_data (EmailRequest): EmailRequest object containing email details

Returns:
tuple[str, list[dict[str, Any]]]: Tuple containing the directory path and list of processed attachments
"""
email_attachments_dir = ""
attachment_info = []

Expand Down Expand Up @@ -202,7 +240,16 @@ async def handle_file_attachments(

# Helper function to send email reply using SES
async def send_agent_email_reply(email_data: EmailRequest, processing_result: dict[str, Any]) -> dict[str, Any]:
"""Send email reply using SES and return the response details"""
"""
Send email reply using SES and return the response details

Args:
email_data (EmailRequest): EmailRequest object containing email details
processing_result (dict): Result of the email processing

Returns:
dict: Response details including status and message ID
"""
if not processing_result or "email_content" not in processing_result:
logger.error("Invalid processing result format")
return {"status": "error", "error": "Invalid processing result format", "timestamp": datetime.now().isoformat()}
Expand Down Expand Up @@ -283,7 +330,15 @@ async def send_agent_email_reply(email_data: EmailRequest, processing_result: di

# Helper function to create sanitized response
def sanitize_processing_result(processing_result: dict[str, Any]) -> dict[str, Any]:
"""Create a clean response suitable for API return and database storage"""
"""
Create a clean response suitable for API return and database storage

Args:
processing_result (dict): Result of the email processing

Returns:
dict: Sanitized response with metadata, research, and attachment info
"""
if not isinstance(processing_result, dict):
return {"error": "Invalid processing result format", "timestamp": datetime.now().isoformat()}

Expand Down Expand Up @@ -329,7 +384,25 @@ async def process_email(
files: Annotated[list[UploadFile] | None, File()] = None,
api_key: str = Depends(api_auth_scheme),
):
"""Process an incoming email with attachments, analyze content, and send reply"""
"""
Process an incoming email with attachments, analyze content, and send reply

Args:
from_email (str): Sender's email address
to (str): Recipient's email address
subject (str): Subject of the email
textContent (str): Plain text content of the email
htmlContent (str): HTML content of the email
messageId (str): Unique identifier for the email message
date (str): Date when the email was sent
emailId (str): Unique identifier for the email in the system
rawHeaders (str): Raw headers of the email in JSON format
files (list[UploadFile] | None): List of uploaded files as attachments
api_key (str): API key for authentication

Returns:
Response: FastAPI Response object with JSON content
"""
# Validate API key
if response := await validate_api_key(api_key):
return response
Expand Down
1 change: 1 addition & 0 deletions mxtoai/dependencies.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from mxtoai.email_handles import DEFAULT_EMAIL_HANDLES
from mxtoai.instruction_resolver import ProcessingInstructionsResolver

# global resolver for processing instructions
processing_instructions_resolver = ProcessingInstructionsResolver(DEFAULT_EMAIL_HANDLES)
Loading
Loading