Skip to content

Commit d865586

Browse files
committed
Enhancing A2I template and adding logic for failed review tasks
1 parent 911055e commit d865586

File tree

2 files changed

+65
-53
lines changed
  • patterns/pattern-1/src

2 files changed

+65
-53
lines changed

patterns/pattern-1/src/hitl-process-function/index.py

Lines changed: 61 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,23 @@ def extract_ids_from_human_loop_name(human_loop_name):
144144
try:
145145
if human_loop_name.startswith('review-bda-'):
146146
remaining = human_loop_name[11:] # Remove 'review-bda-' (11 chars)
147-
parts = remaining.rsplit('-', 3) # Split from right, max 3 splits
148-
if len(parts) == 4:
149-
human_review_id = parts[0] # 2-digit random value
150-
execution_id = parts[1] # UUID with hyphens
151-
record_number = int(parts[2])
152-
page_id = int(parts[3])
153-
return execution_id, record_number, page_id
147+
148+
# Split from right to get the last 2 parts (record_number and page_id)
149+
parts = remaining.rsplit('-', 2) # Split from right, max 2 splits
150+
if len(parts) == 3:
151+
# parts[0] contains human_review_id + execution_id
152+
# parts[1] is record_number
153+
# parts[2] is page_id
154+
record_number = int(parts[1])
155+
page_id = int(parts[2])-1
156+
157+
# Now split the first part to separate human_review_id from execution_id
158+
# The human_review_id is the first part before the first hyphen
159+
prefix_parts = parts[0].split('-', 1) # Split only on first hyphen
160+
if len(prefix_parts) == 2:
161+
human_review_id = prefix_parts[0] # e.g., 'SI'
162+
execution_id = prefix_parts[1] # e.g., 'ca13b3ed-d4eb-4e7f-a9aa-01913d24a1e7'
163+
return execution_id, record_number, page_id
154164
except Exception as e:
155165
logger.error(f"Error parsing human loop name {human_loop_name}: {str(e)}")
156166

@@ -180,7 +190,7 @@ def update_token_status(token_id, status, failure_reason, tracking_table):
180190
logger.error(f"Error updating token status: {str(e)}")
181191

182192
def check_all_sections_complete(document_id, tracking_table):
183-
"""Check if all sections for this document are complete (COMPLETED or FAILED)"""
193+
"""Check if all sections for this document are complete (Completed or Failed)"""
184194
try:
185195
response = tracking_table.scan(
186196
FilterExpression="begins_with(PK, :prefix) AND TokenType = :type",
@@ -196,21 +206,21 @@ def check_all_sections_complete(document_id, tracking_table):
196206
if not sections:
197207
return False, False
198208

199-
has_failed_sections = False
209+
has_Failed_sections = False
200210
for section in sections:
201211
status = section.get('Status')
202-
if status == 'FAILED':
203-
has_failed_sections = True
204-
elif status != 'COMPLETED':
212+
if status == 'Failed':
213+
has_Failed_sections = True
214+
elif status != 'Completed':
205215
return False, False # Still has pending sections
206216

207-
return True, has_failed_sections
217+
return True, has_Failed_sections
208218
except Exception as e:
209219
logger.error(f"Error checking section completion status: {str(e)}")
210220
return False, False
211221

212222
def check_all_pages_complete(document_id, section_id, tracking_table):
213-
"""Check if all pages in a section are complete (COMPLETED, FAILED, or STOPPED) and return failure info"""
223+
"""Check if all pages in a section are complete (Completed, Failed, or Stopped) and return failure info"""
214224
try:
215225
response = tracking_table.scan(
216226
FilterExpression="begins_with(PK, :prefix) AND TokenType = :type",
@@ -226,19 +236,19 @@ def check_all_pages_complete(document_id, section_id, tracking_table):
226236
if not items:
227237
return False, []
228238

229-
failed_pages = []
239+
Failed_pages = []
230240
for item in items:
231241
status = item.get('Status')
232-
if status in ['FAILED', 'STOPPED']:
233-
failed_pages.append({
242+
if status in ['Failed', 'Stopped']:
243+
Failed_pages.append({
234244
'page_id': item.get('PageId'),
235245
'status': status,
236246
'failure_reason': item.get('FailureReason', 'Unknown failure')
237247
})
238-
elif status != 'COMPLETED':
248+
elif status != 'Completed':
239249
return False, [] # Still has pending pages
240250

241-
return True, failed_pages
251+
return True, Failed_pages
242252
except Exception as e:
243253
logger.error(f"Error checking page completion status: {str(e)}")
244254
return False, []
@@ -264,8 +274,8 @@ def find_doc_task_token(document_id, tracking_table):
264274
logger.error(f"Error finding section task token: {str(e)}")
265275
return None
266276

267-
def process_completed_hitl(detail, execution_id, record_id, page_id, table, s3_client):
268-
"""Process completed HITL task"""
277+
def process_Completed_hitl(detail, execution_id, record_id, page_id, table, s3_client):
278+
"""Process Completed HITL task"""
269279
try:
270280
# Parse A2I output from S3
271281
output_s3_uri = detail['humanLoopOutput']['outputS3Uri']
@@ -367,7 +377,7 @@ def process_completed_hitl(detail, execution_id, record_id, page_id, table, s3_c
367377

368378
return True
369379
except Exception as e:
370-
logger.error(f"Error processing completed HITL: {str(e)}")
380+
logger.error(f"Error processing Completed HITL: {str(e)}")
371381
return False
372382

373383
def lambda_handler(event, context):
@@ -405,51 +415,51 @@ def lambda_handler(event, context):
405415
page_token_id = f"HITL#{document_id}#section#{record_id}#page#{page_id}"
406416
section_token_id = f"HITL#{document_id}#section#{record_id}"
407417

408-
# Get failure reason for failed/stopped tasks
418+
# Get failure reason for Failed/Stopped tasks
409419
failure_reason = detail.get('failureReason', 'Unknown failure reason') if human_loop_status in ['Failed', 'Stopped'] else None
410420

411-
# Process completed HITL tasks
421+
# Process Completed HITL tasks
412422
if human_loop_status == 'Completed':
413-
success = process_completed_hitl(detail, execution_id, record_id, page_id, table, s3_client)
423+
success = process_Completed_hitl(detail, execution_id, record_id, page_id, table, s3_client)
414424
if not success:
415-
return {"statusCode": 500, "body": "Failed to process completed HITL"}
425+
return {"statusCode": 500, "body": "Failed to process Completed HITL"}
416426

417427
# Update page task token status
418428
update_token_status(page_token_id, human_loop_status, failure_reason, tracking_table)
419429

420430
# Check if all pages in this section are complete
421-
all_pages_complete, failed_pages_in_section = check_all_pages_complete(document_id, record_id, tracking_table)
422-
logger.info(f"all_pages_complete status: {all_pages_complete}, failed_pages: {failed_pages_in_section}")
431+
all_pages_complete, Failed_pages_in_section = check_all_pages_complete(document_id, record_id, tracking_table)
432+
logger.info(f"all_pages_complete status: {all_pages_complete}, Failed_pages: {Failed_pages_in_section}")
423433

424434
if all_pages_complete:
425435
# Update section token status
426-
section_status = "FAILED" if failed_pages_in_section else "COMPLETED"
427-
section_failure_reason = f"Section has {len(failed_pages_in_section)} failed pages" if failed_pages_in_section else None
436+
section_status = "Failed" if Failed_pages_in_section else "Completed"
437+
section_failure_reason = f"Section has {len(Failed_pages_in_section)} Failed pages" if Failed_pages_in_section else None
428438
update_token_status(section_token_id, section_status, section_failure_reason, tracking_table)
429439

430440
# Check if all sections for this document are complete
431-
all_sections_complete, has_failed_sections = check_all_sections_complete(document_id, tracking_table)
441+
all_sections_complete, has_Failed_sections = check_all_sections_complete(document_id, tracking_table)
432442

433443
if all_sections_complete:
434444
section_task_token = find_doc_task_token(document_id, tracking_table)
435445

436446
if section_task_token:
437-
if has_failed_sections:
438-
# Collect all failed pages for failure message
439-
all_failed_pages = []
447+
if has_Failed_sections:
448+
# Collect all Failed pages for failure message
449+
all_Failed_pages = []
440450
response = tracking_table.scan(
441-
FilterExpression="begins_with(PK, :prefix) AND TokenType = :type AND (#status = :failed_status OR #status = :stopped_status)",
451+
FilterExpression="begins_with(PK, :prefix) AND TokenType = :type AND (#status = :Failed_status OR #status = :Stopped_status)",
442452
ExpressionAttributeNames={'#status': 'Status'},
443453
ExpressionAttributeValues={
444454
':prefix': f"HITL#{document_id}#section#",
445455
':type': 'HITL_PAGE',
446-
':failed_status': 'FAILED',
447-
':stopped_status': 'STOPPED'
456+
':Failed_status': 'Failed',
457+
':Stopped_status': 'Stopped'
448458
}
449459
)
450460

451461
for item in response.get('Items', []):
452-
all_failed_pages.append({
462+
all_Failed_pages.append({
453463
'execution_id': execution_id,
454464
'record_id': item.get('SectionId'),
455465
'page_id': item.get('PageId'),
@@ -460,16 +470,16 @@ def lambda_handler(event, context):
460470
stepfunctions.send_task_failure(
461471
taskToken=section_task_token,
462472
error='HITLFailedException',
463-
cause=f"HITL review failed for {len(all_failed_pages)} page(s): {json.dumps(all_failed_pages)}"
473+
cause=f"HITL review Failed for {len(all_Failed_pages)} page(s): {json.dumps(all_Failed_pages)}"
464474
)
465475
logger.info(f"Sent task failure for execution {execution_id}")
466476

467-
# Update document tracking to FAILED
477+
# Update document tracking to Failed
468478
tracking_table.update_item(
469479
Key={'PK': f"document#{document_id}", 'SK': 'metadata'},
470480
UpdateExpression="SET HITLStatus = :status, HITLCompletionTime = :time",
471481
ExpressionAttributeValues={
472-
':status': "FAILED",
482+
':status': "Failed",
473483
':time': datetime.datetime.now(datetime.timezone.utc).isoformat()
474484
}
475485
)
@@ -478,8 +488,8 @@ def lambda_handler(event, context):
478488
Key={'PK': f"doc#{document_id}", 'SK': 'none'},
479489
UpdateExpression="SET ObjectStatus = :status, HITLStatus = :hitlStatus",
480490
ExpressionAttributeValues={
481-
':status': "HITL_FAILED",
482-
':hitlStatus': "FAILED"
491+
':status': "HITL_Failed",
492+
':hitlStatus': "Failed"
483493
}
484494
)
485495
else:
@@ -502,20 +512,20 @@ def lambda_handler(event, context):
502512
stepfunctions.send_task_success(
503513
taskToken=section_task_token,
504514
output=json.dumps({
505-
"status": "completed",
515+
"status": "Completed",
506516
"executionId": execution_id,
507-
"message": "All human reviews completed",
517+
"message": "All human reviews Completed",
508518
"blueprintChanged": len(blueprint_changes) > 0
509519
})
510520
)
511521
logger.info(f"Sent task success for execution {execution_id}")
512522

513-
# Update document tracking to COMPLETED
523+
# Update document tracking to Completed
514524
tracking_table.update_item(
515525
Key={'PK': f"document#{document_id}", 'SK': 'metadata'},
516526
UpdateExpression="SET HITLStatus = :status, HITLCompletionTime = :time, HITLReviewURL = :url",
517527
ExpressionAttributeValues={
518-
':status': "COMPLETED",
528+
':status': "Completed",
519529
':time': datetime.datetime.now(datetime.timezone.utc).isoformat(),
520530
':url': None
521531
}
@@ -525,17 +535,17 @@ def lambda_handler(event, context):
525535
Key={'PK': f"doc#{document_id}", 'SK': 'none'},
526536
UpdateExpression="SET ObjectStatus = :status, HITLStatus = :hitlStatus, HITLReviewURL = :url",
527537
ExpressionAttributeValues={
528-
':status': "COMPLETED",
529-
':hitlStatus': "COMPLETED",
538+
':status': "Completed",
539+
':hitlStatus': "Completed",
530540
':url': None
531541
}
532542
)
533543

534-
return {"statusCode": 200, "body": "Processing completed successfully"}
544+
return {"statusCode": 200, "body": "Processing Completed successfully"}
535545

536546
except ClientError as e:
537547
logger.error(f"DynamoDB error: {e.response['Error']['Message']}")
538548
return {"statusCode": 500, "body": "Database error"}
539549
except Exception as e:
540550
logger.error(f"Unexpected error: {str(e)}")
541-
return {"statusCode": 500, "body": "Processing failed"}
551+
return {"statusCode": 500, "body": "Processing Failed"}

patterns/pattern-1/src/processresults_function/index.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -673,7 +673,7 @@ def start_human_loop(
673673
FlowDefinitionArn = ssm.get_parameter(Name=f"/{os.environ.get('METRIC_NAMESPACE', 'IDP')}/FlowDefinitionArn")['Parameter']['Value']
674674
human_review_id = generate_random_string(2)
675675
response = a2i_runtime_client.start_human_loop(
676-
HumanLoopName=f"review-bda-{execution_id}-{record_number}-{page_id_num}",
676+
HumanLoopName=f"review-bda-{human_review_id}-{execution_id}-{record_number}-{page_id_num}",
677677
FlowDefinitionArn=FlowDefinitionArn,
678678
HumanLoopInput={"InputContent": json.dumps(human_loop_input)}
679679
)
@@ -825,6 +825,7 @@ def process_segments(
825825

826826
now = datetime.datetime.now().isoformat()
827827
hitl_triggered = False
828+
overall_hitl_triggered = False
828829

829830
for record_number, segment in enumerate(segment_metadata, start=1):
830831
logger.info(f"Processing segment for execution id: {execution_id}")
@@ -901,6 +902,7 @@ def process_segments(
901902
if low_confidence:
902903
hitl_triggered = low_confidence
903904
metrics.put_metric('HITLTriggered', 1)
905+
overall_hitl_triggered = True
904906
for page_number in page_indices:
905907
page_str = str(page_number)
906908
key_values = pagespecific_details['key_value_details'].get(page_str, [])
@@ -990,7 +992,7 @@ def process_segments(
990992
except Exception as e:
991993
logger.error(f"Error saving to DynamoDB: {str(e)}")
992994

993-
return document, hitl_triggered
995+
return document, overall_hitl_triggered
994996

995997
def handler(event, context):
996998
"""

0 commit comments

Comments
 (0)