Skip to content

Commit eff7458

Browse files
umeshmagvanrossum-ms
authored andcommitted
knowPro.py: Email experiments (#1693)
First cut of knowPro.py experimental example showing how to index emails. Ongoing work on sample: * Email Memory of EmailMessage objects * Knowledge extraction * Ingestion and Indexing * Sample Email shredders and knowlege extraction * Test script More to come. --------- Co-authored-by: gvanrossum-ms <gvanrossum@microsoft.com>
1 parent 666cbb3 commit eff7458

File tree

6 files changed

+861
-1
lines changed

6 files changed

+861
-1
lines changed

tools/test_email.py

Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
import os
5+
import shlex
6+
import asyncio
7+
import sys
8+
import traceback
9+
from typing import Any, Iterable
10+
from colorama import Fore
11+
from pathlib import Path
12+
13+
import typechat
14+
15+
from typeagent.aitools import utils
16+
from typeagent.knowpro import (
17+
kplib,
18+
searchlang,
19+
)
20+
from typeagent.knowpro.interfaces import (
21+
IConversation
22+
)
23+
from typeagent.emails.email_import import (
24+
import_email_from_file,
25+
import_emails_from_dir
26+
)
27+
from typeagent.emails.email_memory import EmailMemory
28+
from typeagent.emails.email_message import EmailMessage
29+
30+
from typeagent.knowpro.convsettings import ConversationSettings
31+
from typeagent.storage.utils import create_storage_provider
32+
33+
from utool import print_result
34+
35+
class EmailContext:
36+
def __init__(self, db_path: str, conversation: EmailMemory) -> None:
37+
self.db_path = db_path
38+
self.conversation = conversation
39+
40+
async def reset(self):
41+
await self.conversation.settings.conversation_settings.storage_provider.close()
42+
self.conversation = await load_or_create_email_index(self.db_path, create_new=True)
43+
44+
# Just simple test code
45+
# TODO : Once stable, move creation etc to utool.py
46+
async def main():
47+
48+
base_path = Path("/data/testChat/knowpro/email/")
49+
base_path.mkdir(parents=True, exist_ok=True)
50+
utils.load_dotenv()
51+
52+
print("Email Memory Demo")
53+
print("Type @help for a list of commands")
54+
55+
db_path = str(base_path.joinpath("pyEmails.db"))
56+
context = EmailContext(
57+
db_path=db_path,
58+
conversation=await load_or_create_email_index(db_path, create_new=False)
59+
)
60+
print(f"Using email memory at: {db_path}")
61+
await print_conversation_stats(context.conversation)
62+
63+
# Command handlers
64+
cmd_handlers = {
65+
"@add_messages": add_messages, # Add messages
66+
"@build_index": build_index, # Build index
67+
"@reset_index": reset_index, # Delete index and start over
68+
"@search": search_index, # Search index
69+
"@answer": generate_answer # Question answer
70+
}
71+
while True:
72+
line = input("✉>>").strip()
73+
if len(line) == 0:
74+
continue
75+
elif line == "exit":
76+
break
77+
args = shlex.split(line)
78+
if len(args) < 1:
79+
continue
80+
try:
81+
cmd = args[0].lower()
82+
if cmd == "@help":
83+
print_commands(cmd_handlers.keys())
84+
else:
85+
cmd_handler = cmd_handlers.get(cmd)
86+
if cmd_handler:
87+
args.pop(0)
88+
await cmd_handler(context, args)
89+
else:
90+
print_commands(cmd_handlers.keys())
91+
except Exception as e:
92+
print()
93+
print(Fore.RED, f"Error\n: {e}")
94+
traceback.print_exc()
95+
96+
print(Fore.RESET)
97+
98+
# ==
99+
# COMMANDS
100+
# ==
101+
102+
# Adds messages. Takes a path either to a file or to a directory
103+
async def add_messages(context: EmailContext, args: list[str]):
104+
if len(args) < 1:
105+
print_error("No path provided")
106+
return
107+
108+
# Get the path to the email file or directory of emails to ingest
109+
src_path = Path(args[0])
110+
emails: list[EmailMessage]
111+
if src_path.is_file():
112+
emails = [import_email_from_file(str(src_path))]
113+
else:
114+
emails = import_emails_from_dir(str(src_path))
115+
116+
print(Fore.CYAN, f"Importing {len(emails)} emails".capitalize())
117+
print()
118+
119+
conversation = context.conversation
120+
for email in emails:
121+
print_email(email)
122+
print()
123+
# knowledge = email.metadata.get_knowledge()
124+
# print_knowledge(knowledge)
125+
126+
print("Adding email...")
127+
await conversation.add_message(email)
128+
129+
await print_conversation_stats(conversation)
130+
131+
async def build_index(context: EmailContext, args: list[str]):
132+
conversation = context.conversation
133+
print(Fore.GREEN, "Building index")
134+
await print_conversation_stats(conversation)
135+
await conversation.build_index()
136+
print(Fore.GREEN + "Built index.")
137+
138+
async def search_index(context:EmailContext, args: list[str]):
139+
if len(args) == 0:
140+
return
141+
search_text = args[0].strip()
142+
if len(search_text) == 0:
143+
print_error("No search text")
144+
return
145+
146+
print(Fore.CYAN, f"Searching for:\n{search_text} ")
147+
148+
debug_context = searchlang.LanguageSearchDebugContext()
149+
results = await context.conversation.search_with_language(
150+
search_text=search_text,
151+
debug_context=debug_context
152+
)
153+
await print_search_results(context.conversation, debug_context, results)
154+
155+
async def generate_answer(context: EmailContext, args:list[str]):
156+
if len(args) == 0:
157+
return
158+
question = args[0].strip()
159+
if len(question) == 0:
160+
print_error("No question")
161+
return
162+
163+
print(Fore.CYAN, f"Getting answer for:\n{question} ")
164+
result = await context.conversation.get_answer_with_language(
165+
question=question
166+
)
167+
if isinstance(result, typechat.Failure):
168+
print_error(result.message)
169+
return
170+
171+
all_answers, _ = result.value
172+
utils.pretty_print(all_answers)
173+
174+
async def reset_index(context: EmailContext, args: list[str]):
175+
print(f"Deleting {context.db_path}")
176+
await context.reset()
177+
await print_conversation_stats(context.conversation)
178+
#
179+
# Utilities
180+
#
181+
async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMemory:
182+
if create_new:
183+
print(f"Deleting {db_path}")
184+
delete_sqlite_db(db_path)
185+
186+
settings = ConversationSettings()
187+
settings.storage_provider = await create_storage_provider(
188+
settings.message_text_index_settings,
189+
settings.related_term_index_settings,
190+
db_path,
191+
EmailMessage
192+
)
193+
return await EmailMemory.create(settings)
194+
195+
def delete_sqlite_db(db_path: str):
196+
if os.path.exists(db_path):
197+
os.remove(db_path) # Delete existing database for clean test
198+
# Also delete -shm and -wal files if they exist
199+
shm_path = db_path + "-shm"
200+
wal_path = db_path + "-wal"
201+
if os.path.exists(shm_path):
202+
os.remove(shm_path)
203+
if os.path.exists(wal_path):
204+
os.remove(wal_path)
205+
206+
207+
#
208+
# Printing
209+
#
210+
211+
def print_email(email: EmailMessage):
212+
print("From:", email.metadata.sender)
213+
print("To:", ", ".join(email.metadata.recipients))
214+
if email.metadata.cc:
215+
print("Cc:", ", ".join(email.metadata.cc))
216+
if email.metadata.bcc:
217+
print("Bcc:", ", ".join(email.metadata.bcc))
218+
if email.metadata.subject:
219+
print("Subject:", email.metadata.subject)
220+
print("Date:", email.timestamp)
221+
222+
print("Body:")
223+
for chunk in email.text_chunks:
224+
print(Fore.CYAN + chunk)
225+
226+
print(Fore.RESET)
227+
228+
def print_knowledge(knowledge: kplib.KnowledgeResponse):
229+
print_list(Fore.GREEN, knowledge.topics, "Topics")
230+
print()
231+
print_list(Fore.GREEN, knowledge.entities, "Entities")
232+
print()
233+
print_list(Fore.GREEN, knowledge.actions, "Actions")
234+
print()
235+
print(Fore.RESET)
236+
237+
def print_commands(names: Iterable[str]):
238+
print_list(Fore.GREEN, sorted(names), "Commands")
239+
240+
def print_list(color, list: Iterable[Any], title: str):
241+
if title:
242+
print(color + f"# {title}")
243+
print()
244+
for item in list:
245+
print(color + " -", item)
246+
247+
def print_error(msg: str):
248+
print(Fore.RED + msg)
249+
print(Fore.RESET)
250+
251+
async def print_conversation_stats(conversation: IConversation):
252+
print(f"Conversation index stats".upper())
253+
print(f"Message count: {await conversation.messages.size()}")
254+
print(f"Semantic Ref count: {await conversation.semantic_refs.size()}")
255+
256+
async def print_search_results(
257+
conversation: IConversation,
258+
debug_context: searchlang.LanguageSearchDebugContext,
259+
results: typechat.Result[list[searchlang.ConversationSearchResult]]
260+
):
261+
print(Fore.CYAN)
262+
utils.pretty_print(debug_context.search_query)
263+
utils.pretty_print(debug_context.search_query_expr)
264+
if isinstance(results, typechat.Failure):
265+
print_error(results.message)
266+
else:
267+
print(Fore.GREEN, "### SEARCH RESULTS")
268+
print()
269+
search_results = results.value
270+
for search_result in search_results:
271+
print(Fore.GREEN, search_result.raw_query_text)
272+
await print_result(search_result, conversation)
273+
print(Fore.RESET)
274+
275+
if __name__ == "__main__":
276+
try:
277+
asyncio.run(main())
278+
except (KeyboardInterrupt, BrokenPipeError):
279+
print()
280+
sys.exit(1)

typeagent/emails/emailVerbs.json

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
[
2+
{
3+
"term": "send",
4+
"relatedTerms": [
5+
"talk",
6+
"talked",
7+
"talking",
8+
"claimed",
9+
"claim",
10+
"discuss",
11+
"discussed",
12+
"discussing",
13+
"declare",
14+
"declared",
15+
"declaring",
16+
"express",
17+
"expressed",
18+
"expressing",
19+
"mention",
20+
"mentioned",
21+
"mentioning",
22+
"observe",
23+
"say",
24+
"said",
25+
"speak",
26+
"spoke",
27+
"speaking",
28+
"state",
29+
"stated",
30+
"stating",
31+
"thought",
32+
"thoughts",
33+
"think",
34+
"utter",
35+
"uttered",
36+
"uttering",
37+
"articulate",
38+
"articulated",
39+
"articulating",
40+
"announce",
41+
"announced"
42+
]
43+
},
44+
{ "term": "receive", "relatedTerms": ["got"] }
45+
]

0 commit comments

Comments
 (0)