Skip to content

Commit c26a40d

Browse files
committed
Email import for gmail.. improve
1 parent 88daf87 commit c26a40d

File tree

3 files changed

+94
-57
lines changed

3 files changed

+94
-57
lines changed

tools/test_email.py

Lines changed: 86 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from colorama import Fore
1111
from pathlib import Path
1212
import argparse
13+
import shelve
1314

1415
try:
1516
import readline # noqa: F401
@@ -36,11 +37,13 @@ def __init__(
3637
self, base_path: Path, db_name: str, conversation: EmailMemory
3738
) -> None:
3839
self.base_path = base_path
40+
self.db_name = db_name
3941
self.db_path = base_path.joinpath(db_name)
4042
self.conversation = conversation
4143
self.query_translator: (
4244
typechat.TypeChatJsonTranslator[search_query_schema.SearchQuery] | None
4345
) = None
46+
self.index_log = load_index_log(str(self.db_path), create_new=False)
4447

4548
def get_translator(self):
4649
if self.query_translator is None:
@@ -52,17 +55,23 @@ def get_translator(self):
5255

5356
async def load_conversation(self, db_name: str, create_new: bool = False):
5457
await self.conversation.settings.storage_provider.close()
58+
self.db_name = db_name
5559
self.db_path = self.base_path.joinpath(db_name)
5660
self.conversation = await load_or_create_email_index(
5761
str(self.db_path), create_new
5862
)
63+
self.index_log = load_index_log(str(self.db_path), create_new)
5964

6065
# Delete the current conversation and re-create it
6166
async def restart_conversation(self):
62-
await self.conversation.settings.storage_provider.close()
63-
self.conversation = await load_or_create_email_index(
64-
str(self.db_path), create_new=True
65-
)
67+
await self.load_conversation(self.db_name, create_new=True)
68+
69+
def is_indexed(self, email_id: str | None) -> bool:
70+
return bool(email_id and self.index_log.get(email_id))
71+
72+
def log_indexed(self, email_id: str | None) -> None:
73+
if email_id is not None:
74+
self.index_log[email_id] = True
6675

6776

6877
CommandHandler = Callable[[EmailContext, list[str]], Awaitable[None]]
@@ -77,8 +86,6 @@ def decorator(func: Callable):
7786
return decorator
7887

7988

80-
# Just simple test code
81-
# TODO : Once stable, move creation etc to query.py
8289
async def main():
8390

8491
if sys.argv[1:2]:
@@ -168,6 +175,10 @@ def _add_messages_def() -> argparse.ArgumentParser:
168175
default="",
169176
help="Path to an .eml file or to a directory with .eml files",
170177
)
178+
cmd.add_argument("--ignore_error", type=bool, default=True, help="Ignore errors")
179+
cmd.add_argument(
180+
"--knowledge", type=bool, default=True, help="Automatically extract knowledge"
181+
)
171182
return cmd
172183

173184

@@ -180,38 +191,47 @@ async def add_messages(context: EmailContext, args: list[str]):
180191

181192
# Get the path to the email file or directory of emails to ingest
182193
src_path = Path(named_args.path)
183-
emails: list[EmailMessage]
194+
emails: Iterable[EmailMessage]
184195
if src_path.is_file():
185196
emails = [import_email_from_file(str(src_path))]
186197
else:
187198
emails = import_emails_from_dir(str(src_path))
188199

189-
print(Fore.CYAN, f"Importing {len(emails)} emails".capitalize())
190-
print(Fore.RESET)
200+
print(Fore.CYAN, f"Importing from {src_path}" + Fore.RESET)
191201

192-
conversation = context.conversation
193-
for email in emails:
194-
# print_email(email)
195-
# print()
196-
# knowledge = email.metadata.get_knowledge()
197-
# print_knowledge(knowledge)
202+
semantic_settings = context.conversation.settings.semantic_ref_index_settings
203+
auto_knowledge = semantic_settings.auto_extract_knowledge
204+
try:
205+
conversation = context.conversation
206+
# Add one at a time for debugging etc.
207+
for i, email in enumerate(emails):
208+
email_id = email.metadata.id
209+
email_src = email.src_url if email.src_url is not None else ""
210+
print_progress(i + 1, None, email.src_url)
211+
print()
212+
if context.is_indexed(email_id):
213+
print(Fore.GREEN + email_src + "[Already indexed]" + Fore.RESET)
214+
continue
198215

199-
print(f"From: {email.metadata.sender}\nTo:{email.metadata.recipients}")
200-
# await conversation.add_message(email)
201-
await conversation.add_messages_with_indexing([email])
202-
print("Success")
216+
try:
217+
await conversation.add_messages_with_indexing([email])
218+
context.log_indexed(email_id)
219+
except Exception as e:
220+
if named_args.ignore_error:
221+
print_error(f"{email.src_url}\n{e}")
222+
print(
223+
Fore.GREEN
224+
+ f"ignore_error = {named_args.ignore_error}"
225+
+ Fore.RESET
226+
)
227+
else:
228+
raise
229+
finally:
230+
semantic_settings.auto_extract_knowledge = auto_knowledge
203231

204232
await print_conversation_stats(conversation)
205233

206234

207-
# async def build_index(context: EmailContext, args: list[str]):
208-
# conversation = context.conversation
209-
# print(Fore.GREEN, "Building index")
210-
# await print_conversation_stats(conversation)
211-
# await conversation.build_index()
212-
# print(Fore.GREEN + "Built index.")
213-
214-
215235
async def search_index(context: EmailContext, args: list[str]):
216236
if len(args) == 0:
217237
return
@@ -345,6 +365,14 @@ async def load_or_create_email_index(db_path: str, create_new: bool) -> EmailMem
345365
return email_memory
346366

347367

368+
def load_index_log(db_path: str, create_new: bool) -> shelve.Shelf[Any]:
369+
log_path = db_path + ".index_log"
370+
index_log = shelve.open(log_path)
371+
if create_new:
372+
index_log.clear()
373+
return index_log
374+
375+
348376
def delete_sqlite_db(db_path: str):
349377
if os.path.exists(db_path):
350378
os.remove(db_path) # Delete existing database for clean test
@@ -404,29 +432,6 @@ def print_knowledge(knowledge: kplib.KnowledgeResponse):
404432
print(Fore.RESET)
405433

406434

407-
def print_list(
408-
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
409-
):
410-
print(color)
411-
if title:
412-
print(f"# {title}\n")
413-
if type == "plain":
414-
for item in list:
415-
print(item)
416-
elif type == "ul":
417-
for item in list:
418-
print(f"- {item}")
419-
elif type == "ol":
420-
for i, item in enumerate(list):
421-
print(f"{i + 1}. {item}")
422-
print(Fore.RESET)
423-
424-
425-
def print_error(msg: str):
426-
print(Fore.RED + msg)
427-
print(Fore.RESET)
428-
429-
430435
async def print_conversation_stats(conversation: IConversation):
431436
print(f"Conversation index stats".upper())
432437
print(f"Message count: {await conversation.messages.size()}")
@@ -453,6 +458,37 @@ async def print_search_results(
453458
print(Fore.RESET)
454459

455460

461+
def print_list(
462+
color, list: Iterable[Any], title: str, type: Literal["plain", "ol", "ul"] = "plain"
463+
):
464+
print(color)
465+
if title:
466+
print(f"# {title}\n")
467+
if type == "plain":
468+
for item in list:
469+
print(item)
470+
elif type == "ul":
471+
for item in list:
472+
print(f"- {item}")
473+
elif type == "ol":
474+
for i, item in enumerate(list):
475+
print(f"{i + 1}. {item}")
476+
print(Fore.RESET)
477+
478+
479+
def print_error(msg: str):
480+
print(Fore.RED + msg + Fore.RESET)
481+
482+
483+
def print_progress(cur: int, total: int | None = None, suffix: str | None = "") -> None:
484+
if suffix is None:
485+
suffix = ""
486+
if total is not None:
487+
print(f"[{cur} / {total}] {suffix}\r", end="", flush=True)
488+
else:
489+
print(f"[{cur}] {suffix}\r", end="", flush=True)
490+
491+
456492
if __name__ == "__main__":
457493
try:
458494
asyncio.run(main())

typeagent/emails/email_import.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,10 @@
1414

1515
def import_emails_from_dir(
1616
dir_path: str, max_chunk_length: int = 4096
17-
) -> list[EmailMessage]:
18-
messages: list[EmailMessage] = []
17+
) -> Iterable[EmailMessage]:
1918
for file_path in Path(dir_path).iterdir():
2019
if file_path.is_file():
21-
messages.append(
22-
import_email_from_file(str(file_path.resolve()), max_chunk_length)
23-
)
24-
return messages
20+
yield import_email_from_file(str(file_path.resolve()), max_chunk_length)
2521

2622

2723
# Imports an email file (.eml) as a list of EmailMessage objects
@@ -32,7 +28,9 @@ def import_email_from_file(
3228
with open(file_path, "r") as f:
3329
email_string = f.read()
3430

35-
return import_email_string(email_string, max_chunk_length)
31+
email = import_email_string(email_string, max_chunk_length)
32+
email.src_url = file_path
33+
return email
3634

3735

3836
# Imports a single email MIME string and returns an EmailMessage object
@@ -65,6 +63,7 @@ def import_email_message(msg: Message, max_chunk_length: int) -> EmailMessage:
6563
cc=_import_address_headers(msg.get_all("Cc", [])),
6664
bcc=_import_address_headers(msg.get_all("Bcc", [])),
6765
subject=msg.get("Subject"),
66+
id=msg.get("Message-ID", None),
6867
)
6968
timestamp: str | None = None
7069
timestamp_date = msg.get("Date", None)

typeagent/emails/email_message.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ class EmailMessageMeta(IKnowledgeSource, IMessageMetadata):
2828
cc: list[str] = Field(default_factory=list)
2929
bcc: list[str] = Field(default_factory=list)
3030
subject: str | None = None
31+
id: str | None = None
3132

3233
@property
3334
def source(self) -> str | None: # type: ignore[reportIncompatibleVariableOverride]
@@ -155,6 +156,7 @@ def __init__(self, **data: Any) -> None:
155156
"Tags associated with the message", default_factory=list
156157
)
157158
timestamp: str | None = None # Use metadata.sent_on for the actual sent time
159+
src_url: str | None = None # Source file or uri for this email
158160

159161
def get_knowledge(self) -> kplib.KnowledgeResponse:
160162
return self.metadata.get_knowledge()

0 commit comments

Comments
 (0)