Skip to content

Commit d05fdca

Browse files
committed
Enhance embedding error handling and logging in embed_with_config and marvelai_ingest
1 parent dfd9fc3 commit d05fdca

File tree

2 files changed

+58
-17
lines changed

2 files changed

+58
-17
lines changed

rust_ingest/src/embed.rs

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -136,21 +136,46 @@ pub async fn embed(text: &str, max_tokens: usize, client: &Client) -> Result<Vec
136136
pub async fn embed_with_config(
137137
text: &str,
138138
_max_tokens: usize, // Currently unused but kept for API compatibility
139-
client: &Client,
139+
_client: &Client,
140140
config: EmbedConfig,
141141
) -> Result<Vec<f32>> {
142142
validate_input(text)?;
143143

144-
let mut last_error = None;
144+
let mut last_error: Option<anyhow::Error> = None;
145145

146146
for attempt in 1..=config.max_retries {
147-
match attempt_embedding(text, client, &config).await {
147+
// Use a fresh client for each request to avoid connection reuse issues
148+
let embed_client = reqwest::Client::builder()
149+
.timeout(Duration::from_secs(config.timeout_secs))
150+
.build()
151+
.expect("Failed to build reqwest client");
152+
match attempt_embedding(text, &embed_client, &config).await {
148153
Ok(embedding) => {
149154
validate_embedding(&embedding)?;
150155
return Ok(embedding);
151156
}
152157
Err(e) => {
153-
last_error = Some(e);
158+
eprintln!("❌ Embedding request failed: {}", e);
159+
// Print full error chain if available
160+
let mut source = e.source();
161+
while let Some(s) = source {
162+
eprintln!(" [Error source] {}", s);
163+
source = s.source();
164+
}
165+
let error_string = format!("{}", e);
166+
last_error = Some(e.into());
167+
// Log failed chunk metadata for diagnostics and retry
168+
let log_path = "failed_chunks.log";
169+
let log_content = format!(
170+
"Failed embedding: attempt={}, error='{}', text='{}'\n",
171+
attempt, error_string, text.replace('\n', " ")
172+
);
173+
std::fs::OpenOptions::new()
174+
.create(true)
175+
.append(true)
176+
.open(&log_path)
177+
.and_then(|mut f| std::io::Write::write_all(&mut f, log_content.as_bytes()))
178+
.unwrap_or_else(|err| eprintln!(" [Log error] Could not write to failed_chunks.log: {}", err));
154179
if attempt < config.max_retries {
155180
// Exponential backoff: wait 2^attempt seconds
156181
let delay = Duration::from_secs(2_u64.pow(attempt as u32));
@@ -160,7 +185,7 @@ pub async fn embed_with_config(
160185
}
161186
}
162187

163-
Err(last_error.unwrap_or_else(|| anyhow::anyhow!("All embedding attempts failed")))
188+
Err(last_error.map(|e| e).unwrap_or_else(|| anyhow::anyhow!("All embedding attempts failed")))
164189
}
165190

166191
/// Validates input text before processing.

src/ingest/marvelai_ingest.rs

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ async fn main() {
174174
}
175175
let total = arr.len();
176176
let chunk_size = (total as f64 / 5.0).ceil() as usize;
177-
for i in 0..5 {
177+
for i in 0..1 {
178178
let start = i * chunk_size;
179179
let end = ((i + 1) * chunk_size).min(total);
180180
if start >= end {
@@ -283,17 +283,22 @@ async fn main() {
283283

284284
// Modular chunking for long fields before embedding
285285
let max_embed_len = 250;
286-
let mut chunk_success = true;
286+
let chunk_success = true;
287287
for obj in &chunk_json {
288288
for (field, chunk) in chunk_entity_fields(obj, max_embed_len) {
289289
if debug {
290290
println!(" [DEBUG] Field '{}' chunk size: {} chars", field, chunk.chars().count());
291+
println!(" [DEBUG] Chunk content for '{}.{}':\n{}", field, char_name, chunk);
291292
}
292-
let mut last_error = String::new();
293293
let mut success = false;
294+
// Use a fresh client for each request to avoid connection reuse issues
295+
let embed_client = reqwest::Client::builder()
296+
.timeout(std::time::Duration::from_secs(30))
297+
.build()
298+
.expect("Failed to build reqwest client");
294299
for attempt in 1..=max_retries {
295300
println!(" Attempt {}/{}: embedding '{}.{}' ({} chars)", attempt, max_retries, field, char_name, chunk.chars().count());
296-
let result = embed_with_config(&chunk, 100, &client, embed_config.clone()).await;
301+
let result = embed_with_config(&chunk, 100, &embed_client, embed_config.clone()).await;
297302
match result {
298303
Ok(embedding) => {
299304
println!(" ✅ Embedding vectors received successfully (dim: {})", embedding.len());
@@ -302,7 +307,24 @@ async fn main() {
302307
}
303308
Err(e) => {
304309
println!(" ❌ Embedding request failed for '{}.{}': {}", field, char_name, e);
305-
last_error = format!("Embedding request failed: {}", e);
310+
// Print full error chain if available
311+
let mut source = e.source();
312+
while let Some(s) = source {
313+
println!(" [Error source] {}", s);
314+
source = s.source();
315+
}
316+
// Log failed chunk metadata for retry
317+
let log_path = marvel_dir.join("failed_chunks.log");
318+
let log_content = format!(
319+
"Failed embedding: field='{}', char_name='{}', chunk_size={}, attempt={}, error='{}', chunk='{}'\n",
320+
field, char_name, chunk.chars().count(), attempt, e, chunk.replace('\n', " ")
321+
);
322+
fs::OpenOptions::new()
323+
.create(true)
324+
.append(true)
325+
.open(&log_path)
326+
.and_then(|mut f| std::io::Write::write_all(&mut f, log_content.as_bytes()))
327+
.unwrap_or_else(|err| println!(" [Log error] Could not write to failed_chunks.log: {}", err));
306328
}
307329
}
308330
let backoff = retry_delay * attempt;
@@ -312,16 +334,10 @@ async fn main() {
312334
if !success {
313335
println!(" ❌ Failed to generate embeddings for '{}.{}' after {} attempts. Aborting further chunk processing.", field, char_name, max_retries);
314336
failed_count += 1;
315-
// Log failed chunk for later retry
316-
let log_path = marvel_dir.join("failed_chunks.log");
317-
let log_content = format!("{}: {}\n", field, last_error);
318-
fs::write(&log_path, log_content).expect("Failed to write failed_chunks.log");
319337
break;
320338
}
321339
}
322-
if !chunk_success {
323-
break;
324-
}
340+
// ...existing code...
325341
}
326342
if !chunk_success {
327343
break;

0 commit comments

Comments
 (0)