-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
112 lines (90 loc) · 3.59 KB
/
main.py
File metadata and controls
112 lines (90 loc) · 3.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
"""Email assistant powered by LangGraph and LLMs."""
import sys
import logging
from langgraph.graph import StateGraph, START, END
from config import RUN_CONFIG, setup_logging
from models import AgentState
from email_providers import ZohoEmailProvider
from nodes import (
create_ingest_node,
create_classify_node,
classification_router,
create_classification_handler,
route_after_action
)
# Configure logging
logger = logging.getLogger(__name__)
setup_logging()
def build_workflow():
"""Build the LangGraph workflow with all nodes and edges."""
# Initialize email provider
email_provider = ZohoEmailProvider()
# Create node functions with dependency injection
ingest_node = create_ingest_node(email_provider)
classify_node = create_classify_node(email_provider)
classification_handler = create_classification_handler(email_provider)
# Build workflow graph
workflow = StateGraph(AgentState)
# Add nodes
workflow.add_node("ingest", ingest_node)
workflow.add_node("classify", classify_node)
workflow.add_node("handle_classification", classification_handler)
# Add edges
workflow.add_edge(START, "ingest")
workflow.add_edge("ingest", "classify")
# Conditional routing from classify node
workflow.add_conditional_edges("classify", classification_router, {
"handle_classification": "handle_classification",
END: END
})
# Conditional routing after action node (loop or end)
workflow.add_conditional_edges("handle_classification", route_after_action, {
"classify": "classify",
END: END
})
return workflow.compile()
def main():
"""Main execution function."""
logger.info("Starting email assistant workflow")
logger.info(f"Configuration: debug={RUN_CONFIG.debug}, dry_run={RUN_CONFIG.dry_run}, "
f"send_reply={RUN_CONFIG.send_reply}, add_label={RUN_CONFIG.add_label}")
# Build workflow
app = build_workflow()
try:
# Execute workflow
final_state = app.invoke({
"emails": [],
"processed_count": 0,
"replied_count": 0,
"current_index": 0,
"errors": [],
"current_email": {},
"classification_result": None
})
# Print summary
is_terminal = sys.stdout.isatty()
if is_terminal:
# Formatted output for terminal
logger.info("=" * 60)
logger.info("WORKFLOW SUMMARY")
logger.info("=" * 60)
logger.info(f"✓ Total emails processed: {final_state['processed_count']}")
logger.info(f"✓ Article submissions replied: {final_state['replied_count']}")
if final_state['errors']:
logger.warning(f"⚠ Errors encountered: {len(final_state['errors'])}")
for error in final_state['errors']:
logger.warning(f" - {error}")
logger.info("=" * 60)
else:
# Compact output for logs/pipes
error_msg = f", errors={len(final_state['errors'])}" if final_state['errors'] else ""
logger.info(f"Workflow complete: processed={final_state['processed_count']}, "
f"replied={final_state['replied_count']}{error_msg}")
if final_state['errors']:
for error in final_state['errors']:
logger.warning(f"Error: {error}")
except Exception as e:
logger.error(f"Workflow failed: {str(e)}")
raise
if __name__ == "__main__":
main()