Skip to content

Commit b283098

Browse files
author
Bob Strahan
committed
feat: improve throttling detection and retry handling in assessment service
1 parent 5638dd1 commit b283098

File tree

2 files changed

+166
-45
lines changed

2 files changed

+166
-45
lines changed

lib/idp_common_pkg/idp_common/assessment/granular_service.py

Lines changed: 71 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1734,11 +1734,16 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
17341734
# Store the primary exception for easy access by caller
17351735
document.metadata["primary_exception"] = primary_exception
17361736

1737-
# Cache successful task results (only when some tasks fail - for retry scenarios)
1737+
# Check for any failed tasks (both exceptions and unsuccessful results)
1738+
failed_results = [r for r in all_task_results if not r.success]
1739+
any_failures = bool(failed_task_exceptions or failed_results)
1740+
1741+
# Cache successful tasks only when there are failures (for retry optimization)
1742+
if any_failures:
17381743
successful_results = [r for r in all_task_results if r.success]
17391744
if successful_results:
17401745
logger.info(
1741-
f"Caching {len(successful_results)} successful assessment task results for document {document.id} section {section_id} due to {len(failed_task_exceptions)} failed tasks (retry scenario)"
1746+
f"Caching {len(successful_results)} successful assessment task results for document {document.id} section {section_id} due to {len(failed_results)} failed results + {len(failed_task_exceptions)} failed exceptions (retry scenario)"
17421747
)
17431748
self._cache_successful_assessment_tasks(
17441749
document.id,
@@ -1748,7 +1753,7 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
17481753
)
17491754
else:
17501755
logger.warning(
1751-
f"No successful assessment task results to cache for document {document.id} section {section_id} - all {len(failed_task_exceptions)} tasks failed"
1756+
f"No successful assessment task results to cache for document {document.id} section {section_id} - all tasks failed"
17521757
)
17531758
else:
17541759
# All new tasks succeeded - no need to cache since there won't be retries
@@ -1868,6 +1873,69 @@ def process_document_section(self, document: Document, section_id: str) -> Docum
18681873
logger.error(error_msg)
18691874
document.status = Status.FAILED
18701875
document.errors.append(error_msg)
1876+
1877+
# Check if this is a throttling exception and populate metadata for retry handling
1878+
if self._is_throttling_exception(e):
1879+
logger.info(
1880+
f"Populating metadata for throttling exception: {type(e).__name__}"
1881+
)
1882+
document.metadata = document.metadata or {}
1883+
document.metadata["failed_assessment_tasks"] = {
1884+
"granular_processing": {
1885+
"exception_type": type(e).__name__,
1886+
"exception_message": str(e),
1887+
"exception_class": e.__class__.__module__
1888+
+ "."
1889+
+ e.__class__.__name__,
1890+
"is_throttling": True,
1891+
}
1892+
}
1893+
document.metadata["primary_exception"] = e
1894+
1895+
# Additional check: if document status is FAILED and contains throttling errors,
1896+
# populate metadata even if no exceptions were thrown
1897+
if (
1898+
document.status == Status.FAILED
1899+
and document.errors
1900+
and not hasattr(document, "metadata")
1901+
or not document.metadata
1902+
or "failed_assessment_tasks" not in document.metadata
1903+
):
1904+
# Check if any errors contain throttling keywords
1905+
throttling_keywords = [
1906+
"throttlingexception",
1907+
"provisionedthroughputexceededexception",
1908+
"servicequotaexceededexception",
1909+
"toomanyrequestsexception",
1910+
"requestlimitexceeded",
1911+
"too many tokens",
1912+
"please wait before trying again",
1913+
"reached max retries",
1914+
]
1915+
1916+
has_throttling_error = False
1917+
throttling_error_msg = None
1918+
for error_msg in document.errors:
1919+
error_lower = str(error_msg).lower()
1920+
if any(keyword in error_lower for keyword in throttling_keywords):
1921+
has_throttling_error = True
1922+
throttling_error_msg = error_msg
1923+
break
1924+
1925+
if has_throttling_error:
1926+
logger.info(
1927+
f"Populating metadata for throttling error found in document.errors: {throttling_error_msg}"
1928+
)
1929+
document.metadata = document.metadata or {}
1930+
document.metadata["failed_assessment_tasks"] = {
1931+
"document_level_error": {
1932+
"exception_type": "ThrottlingError",
1933+
"exception_message": throttling_error_msg,
1934+
"exception_class": "DocumentLevelThrottlingError",
1935+
"is_throttling": True,
1936+
}
1937+
}
1938+
18711939
return document
18721940

18731941
def assess_document(self, document: Document) -> Document:

patterns/pattern-2/src/assessment_function/index.py

Lines changed: 95 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,81 @@
1212
from idp_common import s3
1313
from assessment_validator import AssessmentValidator
1414

15+
# Custom exception for throttling scenarios
16+
class ThrottlingException(Exception):
17+
"""Exception raised when throttling is detected in document processing results"""
18+
pass
19+
20+
# Throttling detection constants
21+
THROTTLING_KEYWORDS = [
22+
"throttlingexception",
23+
"provisionedthroughputexceededexception",
24+
"servicequotaexceededexception",
25+
"toomanyrequestsexception",
26+
"requestlimitexceeded",
27+
"too many tokens",
28+
"please wait before trying again",
29+
"reached max retries"
30+
]
31+
32+
THROTTLING_EXCEPTIONS = [
33+
"ThrottlingException",
34+
"ProvisionedThroughputExceededException",
35+
"ServiceQuotaExceededException",
36+
"TooManyRequestsException",
37+
"RequestLimitExceeded"
38+
]
39+
1540
# Configuration will be loaded in handler function
1641

1742
logger = logging.getLogger()
1843
logger.setLevel(os.environ.get("LOG_LEVEL", "INFO"))
1944
logging.getLogger('idp_common.bedrock.client').setLevel(os.environ.get("BEDROCK_LOG_LEVEL", "INFO"))
2045

46+
def is_throttling_exception(exception):
47+
"""
48+
Check if an exception is related to throttling.
49+
50+
Args:
51+
exception: The exception to check
52+
53+
Returns:
54+
bool: True if the exception is throttling-related, False otherwise
55+
"""
56+
from botocore.exceptions import ClientError
57+
58+
if isinstance(exception, ClientError):
59+
error_code = exception.response.get('Error', {}).get('Code', '')
60+
return error_code in THROTTLING_EXCEPTIONS
61+
62+
exception_name = type(exception).__name__
63+
exception_message = str(exception).lower()
64+
65+
return (
66+
exception_name in THROTTLING_EXCEPTIONS or
67+
any(keyword in exception_message for keyword in THROTTLING_KEYWORDS)
68+
)
69+
70+
def check_document_for_throttling_errors(document):
71+
"""
72+
Check if a document has throttling errors in its errors field.
73+
74+
Args:
75+
document: The document object to check
76+
77+
Returns:
78+
tuple: (has_throttling_errors: bool, first_throttling_error: str or None)
79+
"""
80+
if document.status != Status.FAILED or not document.errors:
81+
return False, None
82+
83+
for error_msg in document.errors:
84+
error_lower = str(error_msg).lower()
85+
if any(keyword in error_lower for keyword in THROTTLING_KEYWORDS):
86+
return True, error_msg
87+
88+
return False, None
89+
2190
def handler(event, context):
2291
"""
2392
Lambda handler for document assessment.
@@ -85,55 +154,39 @@ def handler(event, context):
85154
t1 = time.time()
86155
logger.info(f"Total assessment time: {t1-t0:.2f} seconds")
87156

88-
# Check for failed assessment tasks that might require retry
89-
if (hasattr(updated_document, 'metadata') and
90-
updated_document.metadata and
91-
'failed_assessment_tasks' in updated_document.metadata):
92-
93-
failed_tasks = updated_document.metadata['failed_assessment_tasks']
94-
throttling_tasks = {
95-
task_id: task_info for task_id, task_info in failed_tasks.items()
96-
if task_info.get('is_throttling', False)
97-
}
98-
99-
logger.warning(
100-
f"Assessment completed with {len(failed_tasks)} failed tasks, "
101-
f"{len(throttling_tasks)} due to throttling"
102-
)
103-
104-
if throttling_tasks:
105-
logger.info(
106-
f"Throttling detected in {len(throttling_tasks)} tasks. "
107-
f"Successful tasks have been cached for retry."
157+
# Check for failed assessment tasks that might require retry (granular assessment)
158+
if hasattr(updated_document, 'metadata') and updated_document.metadata:
159+
failed_tasks = updated_document.metadata.get('failed_assessment_tasks', {})
160+
if failed_tasks:
161+
throttling_tasks = {
162+
task_id: task_info for task_id, task_info in failed_tasks.items()
163+
if task_info.get('is_throttling', False)
164+
}
165+
166+
logger.warning(
167+
f"Assessment completed with {len(failed_tasks)} failed tasks, "
168+
f"{len(throttling_tasks)} due to throttling"
108169
)
170+
171+
if throttling_tasks:
172+
logger.info(
173+
f"Throttling detected in {len(throttling_tasks)} tasks. "
174+
f"Successful tasks have been cached for retry."
175+
)
176+
177+
# Check for throttling errors in document status and errors field
178+
has_throttling, throttling_error = check_document_for_throttling_errors(updated_document)
179+
if has_throttling:
180+
logger.error(f"Throttling error detected in document errors: {throttling_error}")
181+
logger.error("Raising ThrottlingException to trigger Step Functions retry")
182+
raise ThrottlingException(f"Throttling detected in document processing: {throttling_error}")
109183

110184
except Exception as e:
111185
t1 = time.time()
112186
logger.error(f"Assessment failed after {t1-t0:.2f} seconds: {str(e)}")
113187

114188
# Check if this is a throttling exception that should trigger retry
115-
from botocore.exceptions import ClientError
116-
throttling_exceptions = [
117-
"ThrottlingException",
118-
"ProvisionedThroughputExceededException",
119-
"ServiceQuotaExceededException",
120-
"TooManyRequestsException",
121-
"RequestLimitExceeded"
122-
]
123-
124-
is_throttling = False
125-
if isinstance(e, ClientError):
126-
error_code = e.response.get('Error', {}).get('Code', '')
127-
is_throttling = error_code in throttling_exceptions
128-
else:
129-
exception_name = type(e).__name__
130-
exception_message = str(e).lower()
131-
is_throttling = (
132-
exception_name in throttling_exceptions or
133-
any(throttle_term.lower() in exception_message for throttle_term in throttling_exceptions)
134-
)
135-
136-
if is_throttling:
189+
if is_throttling_exception(e):
137190
logger.error(f"Throttling exception detected: {type(e).__name__}. This will trigger state machine retry.")
138191
# Update document status before re-raising
139192
document_service.update_document(docStatus)

0 commit comments

Comments
 (0)