1212from app .core .config import settings
1313from app .core .db import engine
1414from app .models import ExtractionStatus , Ingestion
15- from app .services .ocr import MistralOCRProvider , OCRProviderError
15+ from app .services .ocr import (
16+ MistralOCRProvider ,
17+ NonRetryableError ,
18+ OCRProviderError ,
19+ RateLimitError ,
20+ RetryableError ,
21+ )
1622from app .services .storage import download_from_storage
1723from app .worker import celery_app
1824
@@ -29,13 +35,21 @@ def get_db_context() -> Generator[Session, None, None]:
2935@celery_app .task (
3036 bind = True ,
3137 name = "app.tasks.extraction.process_ocr" ,
32- max_retries = 3 ,
33- time_limit = 600 , # 10 minutes
38+ autoretry_for = (RetryableError ,), # Auto-retry on transient errors
39+ retry_backoff = True , # Exponential backoff: 1s, 2s, 4s, 8s...
40+ retry_backoff_max = 600 , # Cap backoff at 10 minutes
41+ retry_jitter = True , # Add randomness to prevent thundering herd
42+ retry_kwargs = {"max_retries" : 3 },
43+ time_limit = 600 , # 10 minutes max per attempt
3444) # type: ignore[misc]
3545def process_ocr_task (self : Any , ingestion_id : str ) -> dict [str , Any ]:
3646 """
3747 Process a PDF ingestion through OCR using Mistral AI.
3848
49+ Automatically retries on RetryableError (500, 502, 503, 504, 408) with
50+ exponential backoff. Does NOT retry on NonRetryableError (400, 401, 403, 404).
51+ Special handling for RateLimitError (429) to respect Retry-After header.
52+
3953 Args:
4054 ingestion_id: UUID of the ingestion record
4155
@@ -44,10 +58,13 @@ def process_ocr_task(self: Any, ingestion_id: str) -> dict[str, Any]:
4458
4559 Raises:
4660 ValueError: If ingestion not found or invalid ID format
47- OCRProviderError: If OCR processing fails
48- Retry: If task should be retried (transient errors)
61+ NonRetryableError: Permanent errors (auth, bad request)
62+ RetryableError: Transient errors (after max retries exhausted)
63+ RateLimitError: Rate limit errors (after max retries exhausted)
4964 """
50- logger .info (f"Starting OCR processing for ingestion: { ingestion_id } " )
65+ logger .info (
66+ f"Starting OCR for { ingestion_id } (attempt { self .request .retries + 1 } )"
67+ )
5168
5269 # Validate ingestion_id format
5370 try :
@@ -95,8 +112,14 @@ def process_ocr_task(self: Any, ingestion_id: str) -> dict[str, Any]:
95112 loop .close ()
96113
97114 logger .info (
98- f"[{ ingestion_id } ] OCR completed: { ocr_result .total_pages } pages, "
99- f"{ ocr_result .processing_time_seconds :.2f} s"
115+ f"[{ ingestion_id } ] OCR completed successfully" ,
116+ extra = {
117+ "ingestion_id" : ingestion_id ,
118+ "provider" : ocr_result .ocr_provider ,
119+ "total_pages" : ocr_result .total_pages ,
120+ "processing_time_seconds" : ocr_result .processing_time_seconds ,
121+ "retry_count" : self .request .retries ,
122+ },
100123 )
101124
102125 # Update ingestion status to OCR_COMPLETE
@@ -115,8 +138,37 @@ def process_ocr_task(self: Any, ingestion_id: str) -> dict[str, Any]:
115138 "metadata" : ocr_result .metadata ,
116139 }
117140
118- except OCRProviderError as e :
119- logger .error (f"[{ ingestion_id } ] OCR provider error: { str (e )} " )
141+ except RateLimitError as e :
142+ # Special handling for 429 - respect Retry-After header
143+ logger .warning (
144+ f"[{ ingestion_id } ] Rate limited (429). "
145+ f"Retry-After: { e .retry_after } s. Attempt { self .request .retries + 1 } /3" ,
146+ extra = {
147+ "ingestion_id" : ingestion_id ,
148+ "error_type" : "RateLimitError" ,
149+ "status_code" : 429 ,
150+ "retry_after" : e .retry_after ,
151+ "retry_count" : self .request .retries ,
152+ },
153+ )
154+
155+ if e .retry_after :
156+ # Override default backoff with Retry-After value from API
157+ raise self .retry (exc = e , countdown = e .retry_after )
158+ else :
159+ # Let autoretry_for handle it with exponential backoff
160+ raise
161+
162+ except NonRetryableError as e :
163+ # Don't retry - permanent errors (401, 400, 403, 404)
164+ logger .error (
165+ f"[{ ingestion_id } ] Non-retryable error: { e } " ,
166+ extra = {
167+ "ingestion_id" : ingestion_id ,
168+ "error_type" : type (e ).__name__ ,
169+ "status_code" : e .status_code ,
170+ },
171+ )
120172
121173 # Update status to FAILED
122174 with get_db_context () as db :
@@ -126,18 +178,32 @@ def process_ocr_task(self: Any, ingestion_id: str) -> dict[str, Any]:
126178 db .add (ingestion )
127179 db .commit ()
128180
129- # Retry on transient errors (rate limits, timeouts)
130- if "rate limit" in str (e ).lower () or "timeout" in str (e ).lower ():
131- retry_countdown = 2 ** self .request .retries # Exponential backoff
132- logger .warning (
133- f"[{ ingestion_id } ] Retrying after { retry_countdown } s (attempt { self .request .retries + 1 } /3)"
134- )
135- raise self .retry (exc = e , countdown = retry_countdown , max_retries = 3 )
136- else :
137- raise
181+ raise # Don't retry, fail immediately
182+
183+ except RetryableError as e :
184+ # Transient errors - will be caught by autoretry_for
185+ logger .warning (
186+ f"[{ ingestion_id } ] Retryable error: { e } . "
187+ f"Attempt { self .request .retries + 1 } /3" ,
188+ extra = {
189+ "ingestion_id" : ingestion_id ,
190+ "error_type" : type (e ).__name__ ,
191+ "status_code" : e .status_code ,
192+ "retry_count" : self .request .retries ,
193+ },
194+ )
195+ raise # Let autoretry_for handle it with exponential backoff
138196
139197 except Exception as e :
140- logger .error (f"[{ ingestion_id } ] Unexpected error during OCR: { str (e )} " )
198+ # Unexpected error - log and fail
199+ logger .error (
200+ f"[{ ingestion_id } ] Unexpected error: { str (e )} " ,
201+ exc_info = True ,
202+ extra = {
203+ "ingestion_id" : ingestion_id ,
204+ "error_type" : type (e ).__name__ ,
205+ },
206+ )
141207
142208 # Update status to FAILED
143209 try :
0 commit comments