Skip to content

Commit 864a720

Browse files
authored
ConversationBase, Email Memory example updates (#19)
1 parent 2ea32d0 commit 864a720

File tree

6 files changed

+221
-219
lines changed

6 files changed

+221
-219
lines changed

tools/test_email.py

Lines changed: 111 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from colorama import Fore
1111
from pathlib import Path
1212
import argparse
13+
import shelve
1314

1415
try:
1516
import readline # noqa: F401
@@ -19,10 +20,7 @@
1920
import typechat
2021

2122
from typeagent.aitools import utils
22-
from typeagent.knowpro import (
23-
kplib,
24-
searchlang,
25-
)
23+
from typeagent.knowpro import kplib, searchlang, search_query_schema, convknowledge
2624
from typeagent.knowpro.interfaces import IConversation
2725
from typeagent.emails.email_import import import_email_from_file, import_emails_from_dir
2826
from typeagent.emails.email_memory import EmailMemory
@@ -39,22 +37,41 @@ def __init__(
3937
self, base_path: Path, db_name: str, conversation: EmailMemory
4038
) -> None:
4139
self.base_path = base_path
40+
self.db_name = db_name
4241
self.db_path = base_path.joinpath(db_name)
4342
self.conversation = conversation
43+
self.query_translator: (
44+
typechat.TypeChatJsonTranslator[search_query_schema.SearchQuery] | None
45+
) = None
46+
self.index_log = load_index_log(str(self.db_path), create_new=False)
47+
48+
def get_translator(self):
49+
if self.query_translator is None:
50+
model = convknowledge.create_typechat_model()
51+
self.query_translator = utils.create_translator(
52+
model, search_query_schema.SearchQuery
53+
)
54+
return self.query_translator
4455

4556
async def load_conversation(self, db_name: str, create_new: bool = False):
46-
await self.conversation.settings.conversation_settings.storage_provider.close()
57+
await self.conversation.settings.storage_provider.close()
58+
self.db_name = db_name
4759
self.db_path = self.base_path.joinpath(db_name)
4860
self.conversation = await load_or_create_email_index(
4961
str(self.db_path), create_new
5062
)
63+
self.index_log = load_index_log(str(self.db_path), create_new)
5164

5265
# Delete the current conversation and re-create it
5366
async def restart_conversation(self):
54-
await self.conversation.settings.conversation_settings.storage_provider.close()
55-
self.conversation = await load_or_create_email_index(
56-
str(self.db_path), create_new=True
57-
)
67+
await self.load_conversation(self.db_name, create_new=True)
68+
69+
def is_indexed(self, email_id: str | None) -> bool:
70+
return bool(email_id and self.index_log.get(email_id))
71+
72+
def log_indexed(self, email_id: str | None) -> None:
73+
if email_id is not None:
74+
self.index_log[email_id] = True
5875

5976

6077
CommandHandler = Callable[[EmailContext, list[str]], Awaitable[None]]
@@ -69,8 +86,6 @@ def decorator(func: Callable):
6986
return decorator
7087

7188

72-
# Just simple test code
73-
# TODO : Once stable, move creation etc to query.py
7489
async def main():
7590

7691
if sys.argv[1:2]:
@@ -91,10 +106,11 @@ async def main():
91106
print("Email Memory Demo")
92107
print("Type @help for a list of commands")
93108

94-
db_path = str(base_path.joinpath("pyEmails.db"))
109+
default_db = "gmail.db" # "pyEmails.db"
110+
db_path = str(base_path.joinpath(default_db))
95111
context = EmailContext(
96112
base_path,
97-
"pyEmails.db",
113+
default_db,
98114
conversation=await load_or_create_email_index(db_path, create_new=False),
99115
)
100116
print(f"Using email memory at: {db_path}")
@@ -107,7 +123,6 @@ async def main():
107123
"@add_messages": add_messages, # Add messages
108124
"@parse_messages": parse_messages,
109125
"@load_index": load_index,
110-
"@build_index": build_index, # Build index
111126
"@reset_index": reset_index, # Delete index and start over
112127
"@search": search_index, # Search index
113128
"@answer": generate_answer, # Question answer
@@ -162,6 +177,10 @@ def _add_messages_def() -> argparse.ArgumentParser:
162177
default="",
163178
help="Path to an .eml file or to a directory with .eml files",
164179
)
180+
cmd.add_argument("--ignore_error", type=bool, default=True, help="Ignore errors")
181+
cmd.add_argument(
182+
"--knowledge", type=bool, default=True, help="Automatically extract knowledge"
183+
)
165184
return cmd
166185

167186

@@ -174,34 +193,46 @@ async def add_messages(context: EmailContext, args: list[str]):
174193

175194
# Get the path to the email file or directory of emails to ingest
176195
src_path = Path(named_args.path)
177-
emails: list[EmailMessage]
196+
emails: Iterable[EmailMessage]
178197
if src_path.is_file():
179198
emails = [import_email_from_file(str(src_path))]
180199
else:
181200
emails = import_emails_from_dir(str(src_path))
182201

183-
print(Fore.CYAN, f"Importing {len(emails)} emails".capitalize())
184-
print()
185-
186-
conversation = context.conversation
187-
for email in emails:
188-
print_email(email)
189-
print()
190-
# knowledge = email.metadata.get_knowledge()
191-
# print_knowledge(knowledge)
202+
print(Fore.CYAN, f"Importing from {src_path}" + Fore.RESET)
192203

193-
print("Adding email...")
194-
await conversation.add_message(email)
195-
196-
await print_conversation_stats(conversation)
204+
semantic_settings = context.conversation.settings.semantic_ref_index_settings
205+
auto_knowledge = semantic_settings.auto_extract_knowledge
206+
print(Fore.CYAN, f"auto_extract_knowledge={auto_knowledge}" + Fore.RESET)
207+
try:
208+
conversation = context.conversation
209+
# Add one at a time for debugging etc.
210+
for i, email in enumerate(emails):
211+
email_id = email.metadata.id
212+
email_src = email.src_url if email.src_url is not None else ""
213+
print_progress(i + 1, None, email.src_url)
214+
print()
215+
if context.is_indexed(email_id):
216+
print(Fore.GREEN + email_src + "[Already indexed]" + Fore.RESET)
217+
continue
197218

219+
try:
220+
await conversation.add_messages_with_indexing([email])
221+
context.log_indexed(email_id)
222+
except Exception as e:
223+
if named_args.ignore_error:
224+
print_error(f"{email.src_url}\n{e}")
225+
print(
226+
Fore.GREEN
227+
+ f"ignore_error = {named_args.ignore_error}"
228+
+ Fore.RESET
229+
)
230+
else:
231+
raise
232+
finally:
233+
semantic_settings.auto_extract_knowledge = auto_knowledge
198234

199-
async def build_index(context: EmailContext, args: list[str]):
200-
conversation = context.conversation
201-
print(Fore.GREEN, "Building index")
202235
await print_conversation_stats(conversation)
203-
await conversation.build_index()
204-
print(Fore.GREEN + "Built index.")
205236

206237

207238
async def search_index(context: EmailContext, args: list[str]):
@@ -215,8 +246,10 @@ async def search_index(context: EmailContext, args: list[str]):
215246
print(Fore.CYAN, f"Searching for:\n{search_text} ")
216247

217248
debug_context = searchlang.LanguageSearchDebugContext()
218-
results = await context.conversation.search_with_language(
219-
search_text=search_text, debug_context=debug_context
249+
results = await context.conversation.query_debug(
250+
search_text=search_text,
251+
query_translator=context.get_translator(),
252+
debug_context=debug_context,
220253
)
221254
await print_search_results(context.conversation, debug_context, results)
222255

@@ -230,13 +263,9 @@ async def generate_answer(context: EmailContext, args: list[str]):
230263
return
231264

232265
print(Fore.CYAN, f"Getting answer for:\n{question} ")
233-
result = await context.conversation.get_answer_with_language(question=question)
234-
if isinstance(result, typechat.Failure):
235-
print_error(result.message)
236-
return
237266

238-
all_answers, _ = result.value
239-
utils.pretty_print(all_answers)
267+
answer = await context.conversation.query(question)
268+
print(Fore.GREEN + answer)
240269

241270

242271
async def reset_index(context: EmailContext, args: list[str]):
@@ -326,7 +355,6 @@ def help(handlers: dict[str, CommandHandler], args: list[str]):
326355
#
327356
async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMemory:
328357
if create_new:
329-
print(f"Deleting {db_path}")
330358
delete_sqlite_db(db_path)
331359

332360
settings = ConversationSettings()
@@ -336,7 +364,16 @@ async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMem
336364
db_path,
337365
EmailMessage,
338366
)
339-
return await EmailMemory.create(settings)
367+
email_memory = await EmailMemory.create(settings)
368+
return email_memory
369+
370+
371+
def load_index_log(db_path: str, create_new: bool) -> shelve.Shelf[Any]:
372+
log_path = db_path + ".index_log"
373+
index_log = shelve.open(log_path)
374+
if create_new:
375+
index_log.clear()
376+
return index_log
340377

341378

342379
def delete_sqlite_db(db_path: str):
@@ -398,29 +435,6 @@ def print_knowledge(knowledge: kplib.KnowledgeResponse):
398435
print(Fore.RESET)
399436

400437

401-
def print_list(
402-
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
403-
):
404-
print(color)
405-
if title:
406-
print(f"# {title}\n")
407-
if type == "plain":
408-
for item in list:
409-
print(item)
410-
elif type == "ul":
411-
for item in list:
412-
print(f"- {item}")
413-
elif type == "ol":
414-
for i, item in enumerate(list):
415-
print(f"{i + 1}. {item}")
416-
print(Fore.RESET)
417-
418-
419-
def print_error(msg: str):
420-
print(Fore.RED + msg)
421-
print(Fore.RESET)
422-
423-
424438
async def print_conversation_stats(conversation: IConversation):
425439
print(f"Conversation index stats".upper())
426440
print(f"Message count: {await conversation.messages.size()}")
@@ -447,6 +461,37 @@ async def print_search_results(
447461
print(Fore.RESET)
448462

449463

464+
def print_list(
465+
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
466+
):
467+
print(color)
468+
if title:
469+
print(f"# {title}\n")
470+
if type == "plain":
471+
for item in list:
472+
print(item)
473+
elif type == "ul":
474+
for item in list:
475+
print(f"- {item}")
476+
elif type == "ol":
477+
for i, item in enumerate(list):
478+
print(f"{i + 1}. {item}")
479+
print(Fore.RESET)
480+
481+
482+
def print_error(msg: str):
483+
print(Fore.RED + msg + Fore.RESET)
484+
485+
486+
def print_progress(cur: int, total: int | None = None, suffix: str | None = "") -> None:
487+
if suffix is None:
488+
suffix = ""
489+
if total is not None:
490+
print(f"[{cur} / {total}] {suffix}\r", end="", flush=True)
491+
else:
492+
print(f"[{cur}] {suffix}\r", end="", flush=True)
493+
494+
450495
if __name__ == "__main__":
451496
try:
452497
asyncio.run(main())

typeagent/emails/email_import.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,10 @@
1414

1515
def import_emails_from_dir(
1616
dir_path: str, max_chunk_length: int = 4096
17-
) -> list[EmailMessage]:
18-
messages: list[EmailMessage] = []
17+
) -> Iterable[EmailMessage]:
1918
for file_path in Path(dir_path).iterdir():
2019
if file_path.is_file():
21-
messages.append(
22-
import_email_from_file(str(file_path.resolve()), max_chunk_length)
23-
)
24-
return messages
20+
yield import_email_from_file(str(file_path.resolve()), max_chunk_length)
2521

2622

2723
# Imports an email file (.eml) as a list of EmailMessage objects
@@ -32,7 +28,9 @@ def import_email_from_file(
3228
with open(file_path, "r") as f:
3329
email_string = f.read()
3430

35-
return import_email_string(email_string, max_chunk_length)
31+
email = import_email_string(email_string, max_chunk_length)
32+
email.src_url = file_path
33+
return email
3634

3735

3836
# Imports a single email MIME string and returns an EmailMessage object
@@ -65,6 +63,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
6563
cc=_import_address_headers(msg.get_all("Cc", [])),
6664
bcc=_import_address_headers(msg.get_all("Bcc", [])),
6765
subject=msg.get("Subject"),
66+
id=msg.get("Message-ID", None),
6867
)
6968
timestamp: str | None = None
7069
timestamp_date = msg.get("Date", None)

0 commit comments

Comments
 (0)