|
10 | 10 | from typing import Any, Dict, Optional |
11 | 11 |
|
12 | 12 | import aiohttp |
| 13 | +import re |
13 | 14 | import ray |
14 | 15 | from celery import Task, chain, states |
15 | 16 | from celery.exceptions import Retry |
@@ -41,11 +42,33 @@ def extract_error_code(reason: str, parsed_error: Optional[Dict] = None) -> Opti |
41 | 42 | Extract error code from error message or parsed error dict. |
42 | 43 | Returns error code if matched, None otherwise. |
43 | 44 | """ |
44 | | - # First check if error_code is already in parsed_error |
| 45 | + # 1) parsed_error dict |
45 | 46 | if parsed_error and isinstance(parsed_error, dict): |
46 | | - error_code = parsed_error.get("error_code") |
47 | | - if error_code: |
48 | | - return error_code |
| 47 | + code = parsed_error.get("error_code") |
| 48 | + if code: |
| 49 | + return code |
| 50 | + |
| 51 | + # 2) try parse reason as JSON |
| 52 | + try: |
| 53 | + parsed = json.loads(reason) |
| 54 | + if isinstance(parsed, dict): |
| 55 | + code = parsed.get("error_code") |
| 56 | + if code: |
| 57 | + return code |
| 58 | + detail = parsed.get("detail") |
| 59 | + if isinstance(detail, dict) and detail.get("error_code"): |
| 60 | + return detail.get("error_code") |
| 61 | + except Exception: |
| 62 | + pass |
| 63 | + |
| 64 | + # 3) regex from raw string (supports single/double quotes) |
| 65 | + try: |
| 66 | + match = re.search( |
| 67 | + r'["\']error_code["\']\s*:\s*["\']([^"\']+)["\']', reason) |
| 68 | + if match: |
| 69 | + return match.group(1) |
| 70 | + except Exception: |
| 71 | + pass |
49 | 72 |
|
50 | 73 | return "unknown_error" |
51 | 74 |
|
@@ -688,68 +711,61 @@ async def index_documents(): |
688 | 711 |
|
689 | 712 | try: |
690 | 713 | connector = aiohttp.TCPConnector(verify_ssl=False) |
691 | | - # Increased timeout for large documents and slow ES bulk operations |
692 | | - # Use generous total timeout to avoid marking long-running but successful |
693 | | - # indexing as failed. |
694 | 714 | timeout = aiohttp.ClientTimeout(total=600) |
695 | 715 |
|
696 | 716 | async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session: |
697 | 717 | async with session.post( |
698 | 718 | full_url, |
699 | 719 | headers=headers, |
700 | 720 | json=formatted_chunks, |
701 | | - raise_for_status=True |
| 721 | + raise_for_status=False |
702 | 722 | ) as response: |
703 | | - result = await response.json() |
| 723 | + text = await response.text() |
| 724 | + status = response.status |
| 725 | + # Try parse JSON body for structured error_code/message |
| 726 | + parsed_body = None |
| 727 | + try: |
| 728 | + parsed_body = json.loads(text) |
| 729 | + except Exception: |
| 730 | + parsed_body = None |
| 731 | + |
| 732 | + if status >= 400: |
| 733 | + error_code = None |
| 734 | + if isinstance(parsed_body, dict): |
| 735 | + error_code = parsed_body.get("error_code") |
| 736 | + detail = parsed_body.get("detail") |
| 737 | + if isinstance(detail, dict) and detail.get("error_code"): |
| 738 | + error_code = detail.get("error_code") |
| 739 | + elif isinstance(detail, str): |
| 740 | + try: |
| 741 | + parsed_detail = json.loads(detail) |
| 742 | + if isinstance(parsed_detail, dict): |
| 743 | + error_code = parsed_detail.get( |
| 744 | + "error_code", error_code) |
| 745 | + except Exception: |
| 746 | + pass |
| 747 | + |
| 748 | + if not error_code: |
| 749 | + try: |
| 750 | + match = re.search( |
| 751 | + r'["\']error_code["\']\s*:\s*["\']([^"\']+)["\']', text) |
| 752 | + if match: |
| 753 | + error_code = match.group(1) |
| 754 | + except Exception: |
| 755 | + pass |
| 756 | + |
| 757 | + if error_code: |
| 758 | + # Raise flat payload to avoid nested JSON and preserve error_code |
| 759 | + raise Exception(json.dumps({ |
| 760 | + "error_code": error_code |
| 761 | + }, ensure_ascii=False)) |
| 762 | + |
| 763 | + raise Exception( |
| 764 | + f"ElasticSearch service returned HTTP {status}") |
| 765 | + |
| 766 | + result = parsed_body if isinstance(parsed_body, dict) else await response.json() |
704 | 767 | return result |
705 | 768 |
|
706 | | - except aiohttp.ClientResponseError as e: |
707 | | - # 400: embedding model reports chunk count exceeds concurrency |
708 | | - if e.status == 400: |
709 | | - raise Exception(json.dumps({ |
710 | | - "message": f"ElasticSearch service returned 400 Bad Request: {str(e)}", |
711 | | - "index_name": original_index_name, |
712 | | - "task_name": "forward", |
713 | | - "source": original_source, |
714 | | - "original_filename": original_filename, |
715 | | - "error_code": "embedding_chunks_exceed_limit" |
716 | | - }, ensure_ascii=False)) |
717 | | - |
718 | | - # Timeout from Elasticsearch refresh / bulk operations: stop retrying and treat as es_bulk_failed |
719 | | - timeout_markers = [ |
720 | | - "Connection timeout caused by", |
721 | | - "Read timed out", |
722 | | - "ReadTimeoutError" |
723 | | - ] |
724 | | - if any(marker in str(e) for marker in timeout_markers): |
725 | | - raise Exception(json.dumps({ |
726 | | - "message": f"ElasticSearch operation timed out: {str(e)}", |
727 | | - "index_name": original_index_name, |
728 | | - "task_name": "forward", |
729 | | - "source": original_source, |
730 | | - "original_filename": original_filename, |
731 | | - "error_code": "es_bulk_failed" |
732 | | - }, ensure_ascii=False)) |
733 | | - |
734 | | - # 503: vector service busy: bubble up immediately, let caller decide |
735 | | - if e.status == 503: |
736 | | - raise Exception(json.dumps({ |
737 | | - "message": f"ElasticSearch service unavailable: {str(e)}", |
738 | | - "index_name": original_index_name, |
739 | | - "task_name": "forward", |
740 | | - "source": original_source, |
741 | | - "original_filename": original_filename, |
742 | | - "error_code": "vector_service_busy" |
743 | | - }, ensure_ascii=False)) |
744 | | - |
745 | | - # Other client response errors: bubble up |
746 | | - raise Exception(json.dumps({ |
747 | | - "message": f"ElasticSearch service unavailable: {str(e)}", |
748 | | - "index_name": original_index_name, |
749 | | - "task_name": "forward", |
750 | | - "source": original_source, |
751 | | - "original_filename": original_filename |
752 | | - }, ensure_ascii=False)) |
753 | 769 | except aiohttp.ClientConnectorError as e: |
754 | 770 | logger.error( |
755 | 771 | f"[{self.request.id}] FORWARD TASK: Connection error to {full_url}: {str(e)}") |
@@ -879,6 +895,10 @@ async def index_documents(): |
879 | 895 | } |
880 | 896 | except Exception as e: |
881 | 897 | # If it's an Exception, all go here (including our custom JSON message) |
| 898 | + # Important: if this is a Celery Retry, re-raise immediately without recording error_code |
| 899 | + if isinstance(e, Retry): |
| 900 | + raise |
| 901 | + |
882 | 902 | task_id = self.request.id |
883 | 903 | try: |
884 | 904 | error_info = json.loads(str(e)) |
|
0 commit comments