Skip to content

Commit eb53868

Browse files
⚡ Bolt: Fix N+1 database insertions in BatchProcessor (#248)
Replaced individual per-file SQLite `INSERT` statements with a single batched `executemany` block at the end of the `_process_sequential` and `_process_parallel` functions. Added periodic chunked saves to prevent data loss on crash. Co-authored-by: google-labs-jules[bot] <161369871+google-labs-jules[bot]@users.noreply.github.com> Co-authored-by: thebearwithabite <216692431+thebearwithabite@users.noreply.github.com>
1 parent 602181b commit eb53868

2 files changed

Lines changed: 58 additions & 4 deletions

File tree

.jules/bolt.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,11 @@
3333
## 2025-05-27 - [Bulk SQLite Inserts and Connection Reuse for Tagging]
3434
**Learning:** Sequential `.execute` calls for `INSERT OR REPLACE` inside nested loops over large arrays (like tags) coupled with opening independent DB connections per method creates a severe N+1 problem. Benchmarks showed replacing it with a single shared connection and `executemany` arrays resulted in an ~2x speedup on typical batch tagging workloads.
3535
**Action:** Always batch related SQL records using `.executemany()` and pass an optional `db_connection` downstream to nested operations instead of establishing a new database connection every time.
36+
37+
## 2025-05-15 - Batched DB Inserts
38+
**Learning:** Sequential processing loops that insert database records one at a time cause N+1 query bottlenecks and extremely poor disk I/O performance on large batches.
39+
**Action:** Replace `commit()` inside sequential processing loops with `executemany` that runs a single batched commit when the entire result set is gathered.
40+
41+
## 2025-05-15 - Batched DB Inserts vs Crash Recovery
42+
**Learning:** Fully deferring database saves to the end of a long-running batch job using `executemany` solves the N+1 bottleneck, but introduces a risk of data loss if the process crashes midway.
43+
**Action:** Use periodic chunked batching (e.g., executing `executemany` every 50 records) inside loops to balance disk I/O performance with incremental crash resilience.

batch_processor.py

Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -330,15 +330,49 @@ def _process_sequential(self, job: BatchJob) -> List[FileResult]:
330330

331331
self.current_progress.questions_asked += result.questions_asked
332332

333-
# Save result to database
334-
self._save_file_result(job.job_id, result)
333+
# Result saving is now deferred to batch
335334

335+
# Batch save periodically to prevent data loss on crash
336+
if i > 0 and i % 50 == 0:
337+
self._save_file_results_batch(job.job_id, results[-50:])
338+
336339
# Show progress periodically
337340
if i % 10 == 0 or i == len(job.files) - 1:
338341
self._show_progress()
339342

343+
344+
# Save remaining results that weren't caught in the periodic batch saves
345+
remainder = len(results) % 50
346+
if results and remainder > 0:
347+
self._save_file_results_batch(job.job_id, results[-remainder:])
348+
elif results and len(results) < 50:
349+
self._save_file_results_batch(job.job_id, results)
350+
340351
return results
341352

353+
354+
def _save_file_results_batch(self, job_id: str, results: List[FileResult]):
355+
"""Save multiple file processing results to database efficiently"""
356+
if not results:
357+
return
358+
359+
with sqlite3.connect(self.db_path) as conn:
360+
params = [
361+
(
362+
job_id, str(r.file_path), r.success, r.action_taken,
363+
r.error_message, r.classification, r.confidence,
364+
r.processing_time, r.questions_asked, datetime.now().isoformat()
365+
)
366+
for r in results
367+
]
368+
conn.executemany("""
369+
INSERT INTO file_results
370+
(job_id, file_path, success, action_taken, error_message,
371+
classification, confidence, processing_time, questions_asked, processed_at)
372+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
373+
""", params)
374+
conn.commit()
375+
342376
def _process_parallel(self, job: BatchJob, max_workers: int) -> List[FileResult]:
343377
"""Process files in parallel (for automatic modes only)"""
344378
results = []
@@ -374,9 +408,13 @@ def _process_parallel(self, job: BatchJob, max_workers: int) -> List[FileResult]
374408
if result.action_taken == "skipped":
375409
self.current_progress.skipped += 1
376410

377-
# Save result
378-
self._save_file_result(job.job_id, result)
411+
# Result saving is now deferred to batch
379412

413+
# Batch save periodically to prevent data loss on crash
414+
if self.current_progress.processed % 50 == 0:
415+
# Save the last 50 results
416+
self._save_file_results_batch(job.job_id, results[-50:])
417+
380418
# Update progress display
381419
if self.current_progress.processed % 10 == 0:
382420
self._show_progress()
@@ -396,6 +434,14 @@ def _process_parallel(self, job: BatchJob, max_workers: int) -> List[FileResult]
396434
self.current_progress.processed += 1
397435
self.current_progress.failed += 1
398436

437+
438+
# Save remaining results that weren't caught in the periodic batch saves
439+
remainder = len(results) % 50
440+
if results and remainder > 0:
441+
self._save_file_results_batch(job.job_id, results[-remainder:])
442+
elif results and len(results) < 50:
443+
self._save_file_results_batch(job.job_id, results)
444+
399445
return results
400446

401447
def _process_single_file(self, file_path: Path, job: BatchJob) -> FileResult:

0 commit comments

Comments
 (0)