-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathllm_client.py
More file actions
296 lines (240 loc) · 10.5 KB
/
llm_client.py
File metadata and controls
296 lines (240 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
from structured_output import ParsedEmailList,EmailRequest
from pathlib import Path
import litellm
from litellm import completion
from oci.signer import Signer
from oci.config import from_file
from typing import Optional
import json
import os
import re
class LLMClient:
def __init__(self, OciPath:Path):
os.environ["OCI_CONFIG_FILE"] = str(OciPath)
os.environ["OCI_CLI_PROFILE"] = "DEFAULT"
self.config = from_file()
self.Signer=Signer(
tenancy=self.config["tenancy"],
user=self.config["user"],
fingerprint=self.config["fingerprint"],
private_key_file_location=self.config["key_file"],
pass_phrase=self.config.get("pass_phrase", None),
)
self.model=f"oci/{self.config["model"]}"
self.oci_compartment_id = self.config["oci_compartment_id"]
print(f"Model: {self.model}")
print(f"OCI Compartment ID: {self.oci_compartment_id}")
def _clean_json_response(self, content: str) -> str:
"""Clean up JSON response by extracting valid JSON"""
content = content.strip()
# Remove markdown code blocks
if content.startswith("```json"):
content = content[7:]
elif content.startswith("```"):
content = content[3:]
if content.endswith("```"):
content = content[:-3]
content = content.strip()
# Try to find JSON object or array
# Look for content between outermost { } or [ ]
brace_start = content.find("{")
bracket_start = content.find("[")
# Determine which comes first
if brace_start == -1:
start = bracket_start
elif bracket_start == -1:
start = brace_start
else:
start = min(brace_start, bracket_start)
if start == -1:
return content
# Find matching closing character
if content[start] == "{":
# Find the last closing brace
depth = 0
for i in range(start, len(content)):
if content[i] == "{":
depth += 1
elif content[i] == "}":
depth -= 1
if depth == 0:
return content[start : i + 1].strip()
else:
# Find the last closing bracket
depth = 0
for i in range(start, len(content)):
if content[i] == "[":
depth += 1
elif content[i] == "]":
depth -= 1
if depth == 0:
return content[start : i + 1].strip()
return content.strip()
def fetch_prompt(self,prompt_path: Path, section: Optional[str] = None, **kwargs) -> str:
with open(prompt_path, "r", encoding="utf-8") as f:
content = f.read()
# Extract specific section if requested
if section:
content = self._extract_section(content, section)
# Inject variables
if kwargs:
content = content.format(**kwargs)
return content
def _extract_section(self,content: str, section_name: str) -> str:
"""
Extract a specific section from markdown content by header name.
Supports ## headers at any level.
Args:
content: Full markdown content
section_name: Header name to find (without ## prefix)
Returns:
Content of the section (excluding the header itself)
Raises:
ValueError: If section is not found
"""
lines = content.split("\n")
section_lines = []
in_section = False
section_level = None
for line in lines:
# Check if this is a header line
if line.strip().startswith("#"):
# Parse header level and title
header_match = line.strip().lstrip("#")
header_level = len(line.strip()) - len(header_match)
header_title = header_match.strip()
# Check if this is our target section
if header_title.lower() == section_name.lower():
in_section = True
section_level = header_level
continue # Skip the header itself
# If we're in a section and hit a same/higher level header, we're done
elif in_section and header_level <= section_level:
break
# Collect lines if we're in the target section
if in_section:
section_lines.append(line)
if not section_lines:
raise ValueError(f"Section '{section_name}' not found in markdown file")
return "\n".join(section_lines).strip()
def strip_base64_attachments(self,email_raw):
"""
Rimuove allegati base64 in modo robusto.
Funziona con qualsiasi formato MIME.
"""
# Pattern: Content-Transfer-Encoding: base64 seguito da dati base64
# fino al prossimo boundary (che inizia con --)
pattern = r'(Content-Transfer-Encoding:\s*base64\s*\n)([\s\S]*?)(?=\n--)'
def replace_base64(match):
header = match.group(1)
# Conta quante righe di base64 c'erano
base64_lines = match.group(2).strip().split('\n')
removed_size = sum(len(line) for line in base64_lines)
return f"{header}\n[BASE64 REMOVED - {len(base64_lines)} lines, ~{removed_size} bytes]\n"
cleaned = re.sub(pattern, replace_base64, email_raw)
return cleaned
def strip_base64_simple(self,email_raw):
"""
Versione ancora più semplice: rimuove tutto tra
'Content-Transfer-Encoding: base64' e il prossimo boundary
"""
lines = email_raw.split('\n')
result = []
skip = False
removed_lines = 0
for line in lines:
# Inizia a skippare dopo "Content-Transfer-Encoding: base64"
if 'Content-Transfer-Encoding: base64' in line:
result.append(line)
result.append(f'[BASE64 DATA REMOVED - see next boundary]')
skip = True
removed_lines = 0
continue
# Ferma lo skip al prossimo boundary
if skip and line.startswith('--'):
skip = False
result.append(f'[{removed_lines} lines removed]')
result.append(line)
continue
# Skippa le righe base64
if skip:
removed_lines += 1
continue
# Aggiungi tutte le altre righe
result.append(line)
return '\n'.join(result)
def clean_email_for_llm(self,email_raw, max_chars=8000):
"""
Versione completa: rimuove base64 E tronca se necessario
"""
# Step 1: Rimuovi base64
cleaned = self.strip_base64_simple(email_raw)
# Step 2: Se ancora troppo grande, tronca
if len(cleaned) > max_chars:
lines = cleaned.split('\n')
# Trova dove finiscono gli header
header_end = 0
for i, line in enumerate(lines):
if line.strip() == '' or line.startswith('--'):
header_end = i
break
# Mantieni header completi
header = '\n'.join(lines[:header_end + 5])
# Prendi solo prime righe del body
remaining_chars = max_chars - len(header)
body_lines = lines[header_end + 5:]
body = ''
for line in body_lines:
if len(body) + len(line) > remaining_chars:
break
body += line + '\n'
cleaned = header + '\n' + body + '\n[...TRUNCATED FOR LENGTH...]'
return cleaned
def complete(self, email: EmailRequest, max_retries=3) -> ParsedEmailList:
"""
Parse the email body using LLM.
Args:
email: EmailRequest object containing the email_body
max_retries: Maximum number of retry attempts
Returns:
ParsedEmailList: Structured parsed email data
"""
# Extract the actual email string from the EmailRequest object
mail = self.clean_email_for_llm(email.email_body,2000)
print(mail)
# Fetch prompts
prompt = self.fetch_prompt(Path("prompt.md"), "MAIL PARSER", email=mail)
pydantic_portion = self.fetch_prompt(
Path("prompt.md"),
"Pydantic",
format=json.dumps(ParsedEmailList.model_json_schema(), indent=2)
)
final_prompt = prompt + "\n\n" + pydantic_portion
messages = [{"role": "user", "content": final_prompt}]
for i in range(max_retries):
print(f"Attempt {i + 1}/{max_retries}")
try:
response = completion(
model=self.model,
messages=messages,
oci_signer=self.Signer,
oci_region=self.config["region"],
oci_compartment_id=self.oci_compartment_id
)
content_cleaned = self._clean_json_response(
response["choices"][0]["message"]["content"]
)
json_data = json.loads(content_cleaned)
result = ParsedEmailList.model_validate(json_data)
print(f"Successfully parsed email on attempt {i + 1}")
return {"emails":result.emails,"EMAIL_BODY":mail}
except Exception as e:
print(f"Error on attempt {i + 1}: {e}")
if i < max_retries - 1: # Don't append error on last attempt
print(f"Error {e}")
messages.append({
"role": "user",
"content": f"Previous attempt failed with error: {str(e)}. Please provide valid JSON matching the schema."
})
print(f"Failed to parse email after {max_retries} attempts")
return None