Skip to content

Commit 2580af4

Browse files
committed
Enhance JSON ingestion: enforce file type validation, split chunks into sub-chunks, and improve error handling and logging for embedding failures
1 parent d05fdca commit 2580af4

File tree

2 files changed

+116
-53
lines changed

2 files changed

+116
-53
lines changed

rust_ingest/src/ingest.rs

Lines changed: 61 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -354,18 +354,71 @@ async fn process_single_file_for_embedding(
354354
config: &IngestConfig,
355355
client: &reqwest::Client,
356356
) -> Result<Vec<f32>> {
357-
// Read and truncate file content
357+
// Only process .json or .jsonl files
358+
let ext = path.extension().and_then(|e| e.to_str()).unwrap_or("");
359+
if ext != "json" && ext != "jsonl" {
360+
return Err(anyhow::anyhow!("Only .json and .jsonl files are supported for ingestion: {}", path.display()));
361+
}
362+
358363
let content = std::fs::read_to_string(path)
359364
.with_context(|| format!("Failed to read file: {}", path.display()))?;
360365

361-
let truncated_content = truncate_content(&content, config.max_chars);
362-
363-
// Generate embedding vector
364-
let embedding = embed::embed(truncated_content, config.max_tokens, client)
365-
.await
366-
.with_context(|| format!("Failed to generate embedding for file: {}", path.display()))?;
366+
// Parse as JSON array (or linewise for JSONL)
367+
let objects: Vec<serde_json::Value> = if ext == "jsonl" {
368+
content
369+
.lines()
370+
.filter_map(|line| serde_json::from_str(line).ok())
371+
.collect()
372+
} else {
373+
match serde_json::from_str(&content) {
374+
Ok(serde_json::Value::Array(arr)) => arr,
375+
Ok(obj) => vec![obj],
376+
Err(e) => return Err(anyhow::anyhow!("Failed to parse JSON: {}", e)),
377+
}
378+
};
367379

368-
Ok(embedding)
380+
// For each object, iterate fields and chunk
381+
let mut indexed_embeddings = Vec::new();
382+
let mut total_sub_chunks = 0;
383+
for (obj_idx, obj) in objects.iter().enumerate() {
384+
if let Some(map) = obj.as_object() {
385+
for (field, value) in map.iter() {
386+
let field_str = match value {
387+
serde_json::Value::String(s) => s.clone(),
388+
_ => value.to_string(),
389+
};
390+
// Split into sub-chunks ≤200 chars
391+
let mut start = 0;
392+
let chunk_len = field_str.chars().count();
393+
let sub_chunk_size = 200;
394+
let mut field_sub_chunks = 0;
395+
while start < chunk_len {
396+
let end = (start + sub_chunk_size).min(chunk_len);
397+
let sub_chunk: String = field_str.chars().skip(start).take(end - start).collect();
398+
// Embed each sub-chunk
399+
let embedding = embed::embed(&sub_chunk, config.max_tokens, client)
400+
.await
401+
.with_context(|| format!("Failed to embed field '{}', chunk {}-{} in file: {}", field, start, end, path.display()))?;
402+
// Store embedding with metadata
403+
indexed_embeddings.push((obj_idx, field.clone(), start, end, embedding));
404+
start = end;
405+
field_sub_chunks += 1;
406+
}
407+
if field_sub_chunks > 0 {
408+
println!("[DEBUG] Field '{}' produced {} sub-chunks for file: {}", field, field_sub_chunks, path.display());
409+
}
410+
total_sub_chunks += field_sub_chunks;
411+
}
412+
}
413+
}
414+
println!("[DEBUG] Total sub-chunks embedded for file {}: {}", path.display(), total_sub_chunks);
415+
println!("[DEBUG] Total embeddings indexed: {}", indexed_embeddings.len());
416+
// Instead of returning a single embedding, return an error if none found
417+
if indexed_embeddings.is_empty() {
418+
return Err(anyhow::anyhow!("No embeddings generated for file: {}", path.display()));
419+
}
420+
// For compatibility, return the first embedding (could be refactored to index all)
421+
Ok(indexed_embeddings[0].4.clone())
369422
}
370423

371424
/// Processes a single file and adds it to the index.

src/ingest/marvelai_ingest.rs

Lines changed: 55 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -286,55 +286,65 @@ async fn main() {
286286
let chunk_success = true;
287287
for obj in &chunk_json {
288288
for (field, chunk) in chunk_entity_fields(obj, max_embed_len) {
289-
if debug {
290-
println!(" [DEBUG] Field '{}' chunk size: {} chars", field, chunk.chars().count());
291-
println!(" [DEBUG] Chunk content for '{}.{}':\n{}", field, char_name, chunk);
289+
// Further split any chunk >200 chars into sub-chunks ≤200 chars
290+
let mut sub_chunks = Vec::new();
291+
let mut start = 0;
292+
let chunk_len = chunk.chars().count();
293+
let sub_chunk_size = 200;
294+
while start < chunk_len {
295+
let end = (start + sub_chunk_size).min(chunk_len);
296+
let sub_chunk: String = chunk.chars().skip(start).take(end - start).collect();
297+
sub_chunks.push(sub_chunk);
298+
start = end;
292299
}
293-
let mut success = false;
294-
// Use a fresh client for each request to avoid connection reuse issues
295-
let embed_client = reqwest::Client::builder()
296-
.timeout(std::time::Duration::from_secs(30))
297-
.build()
298-
.expect("Failed to build reqwest client");
299-
for attempt in 1..=max_retries {
300-
println!(" Attempt {}/{}: embedding '{}.{}' ({} chars)", attempt, max_retries, field, char_name, chunk.chars().count());
301-
let result = embed_with_config(&chunk, 100, &embed_client, embed_config.clone()).await;
302-
match result {
303-
Ok(embedding) => {
304-
println!(" ✅ Embedding vectors received successfully (dim: {})", embedding.len());
305-
success = true;
306-
break;
307-
}
308-
Err(e) => {
309-
println!(" ❌ Embedding request failed for '{}.{}': {}", field, char_name, e);
310-
// Print full error chain if available
311-
let mut source = e.source();
312-
while let Some(s) = source {
313-
println!(" [Error source] {}", s);
314-
source = s.source();
300+
for (sub_idx, sub_chunk) in sub_chunks.iter().enumerate() {
301+
if debug {
302+
println!(" [DEBUG] Field '{}' sub-chunk {} size: {} chars", field, sub_idx + 1, sub_chunk.chars().count());
303+
println!(" [DEBUG] Sub-chunk content for '{}.{}[{}]':\n{}", field, char_name, sub_idx + 1, sub_chunk);
304+
}
305+
let mut success = false;
306+
let embed_client = reqwest::Client::builder()
307+
.timeout(std::time::Duration::from_secs(120))
308+
.build()
309+
.expect("Failed to build reqwest client");
310+
for attempt in 1..=max_retries {
311+
println!(" Attempt {}/{}: embedding '{}.{}[{}]' ({} chars)", attempt, max_retries, field, char_name, sub_idx + 1, sub_chunk.chars().count());
312+
let result = embed_with_config(sub_chunk, 100, &embed_client, embed_config.clone()).await;
313+
match result {
314+
Ok(embedding) => {
315+
println!(" ✅ Embedding vectors received successfully (dim: {})", embedding.len());
316+
success = true;
317+
break;
318+
}
319+
Err(e) => {
320+
println!(" ❌ Embedding request failed for '{}.{}[{}]': {}", field, char_name, sub_idx + 1, e);
321+
let mut source = e.source();
322+
while let Some(s) = source {
323+
println!(" [Error source] {}", s);
324+
source = s.source();
325+
}
326+
let log_path = marvel_dir.join("failed_chunks.log");
327+
let log_content = format!(
328+
"Failed embedding: field='{}', char_name='{}', sub_chunk_idx={}, chunk_size={}, attempt={}, error='{}', chunk='{}'\n",
329+
field, char_name, sub_idx + 1, sub_chunk.chars().count(), attempt, e, sub_chunk.replace('\n', " ")
330+
);
331+
fs::OpenOptions::new()
332+
.create(true)
333+
.append(true)
334+
.open(&log_path)
335+
.and_then(|mut f| std::io::Write::write_all(&mut f, log_content.as_bytes()))
336+
.unwrap_or_else(|err| println!(" [Log error] Could not write to failed_chunks.log: {}", err));
315337
}
316-
// Log failed chunk metadata for retry
317-
let log_path = marvel_dir.join("failed_chunks.log");
318-
let log_content = format!(
319-
"Failed embedding: field='{}', char_name='{}', chunk_size={}, attempt={}, error='{}', chunk='{}'\n",
320-
field, char_name, chunk.chars().count(), attempt, e, chunk.replace('\n', " ")
321-
);
322-
fs::OpenOptions::new()
323-
.create(true)
324-
.append(true)
325-
.open(&log_path)
326-
.and_then(|mut f| std::io::Write::write_all(&mut f, log_content.as_bytes()))
327-
.unwrap_or_else(|err| println!(" [Log error] Could not write to failed_chunks.log: {}", err));
328338
}
339+
let backoff = retry_delay * attempt;
340+
println!(" ⏳ Waiting {}s before next attempt...", backoff);
341+
tokio::time::sleep(std::time::Duration::from_secs(backoff as u64)).await;
342+
}
343+
if !success {
344+
println!(" ❌ Failed to generate embeddings for '{}.{}[{}]' after {} attempts. Aborting further chunk processing.", field, char_name, sub_idx + 1, max_retries);
345+
failed_count += 1;
346+
break;
329347
}
330-
let backoff = retry_delay * attempt;
331-
println!(" ⏳ Waiting {}s before next attempt...", backoff);
332-
tokio::time::sleep(std::time::Duration::from_secs(backoff as u64)).await;
333-
}
334-
if !success {
335-
println!(" ❌ Failed to generate embeddings for '{}.{}' after {} attempts. Aborting further chunk processing.", field, char_name, max_retries);
336-
failed_count += 1;
337-
break;
338348
}
339349
}
340350
// ...existing code...

0 commit comments

Comments
 (0)