Skip to content

Commit 9c407e2

Browse files
author
Bob Strahan
committed
feat: Complete Pattern 2 Lambda metering implementation
- Add Lambda metering to Extraction function (both skip and normal paths) - Add Lambda metering to Assessment function (both skip and normal paths) - Add Lambda metering to Summarization function (successful execution path) - Fix robust type handling in lambda_metering utility function - Support string/numeric conversion for memory_limit_in_mb field Pattern 2 now has complete Lambda execution cost tracking for all core processing functions: OCR, Classification, Extraction, Assessment, Summarization. Each function tracks: - Lambda invocation counts (for /bin/bash.20 per 1M requests pricing) - Lambda GB-seconds duration (for 6.67 per 1M GB-seconds pricing) Provides complete cost visibility for document processing workflows.
1 parent 93c0474 commit 9c407e2

File tree

3 files changed

+40
-1
lines changed

3 files changed

+40
-1
lines changed

patterns/pattern-2/src/assessment_function/index.py

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from idp_common.models import Document, Status
1111
from idp_common.docs_service import create_document_service
1212
from idp_common import s3
13-
from idp_common.utils import normalize_boolean_value
13+
from idp_common.utils import normalize_boolean_value, calculate_lambda_metering, merge_metering_data
1414
from assessment_validator import AssessmentValidator
1515

1616
# Custom exception for throttling scenarios
@@ -94,6 +94,7 @@ def handler(event, context):
9494
This function assesses the confidence of extraction results for a document section
9595
using the Assessment service from the idp_common library.
9696
"""
97+
start_time = time.time() # Capture start time for Lambda metering
9798
logger.info(f"Starting assessment processing for event: {json.dumps(event, default=str)}")
9899

99100
# Load configuration
@@ -166,6 +167,13 @@ def handler(event, context):
166167
# Add only the section being processed (preserve existing data)
167168
section_document.sections = [section]
168169

170+
# Add Lambda metering for assessment skip execution
171+
try:
172+
lambda_metering = calculate_lambda_metering("Assessment", context, start_time)
173+
section_document.metering = merge_metering_data(section_document.metering, lambda_metering)
174+
except Exception as e:
175+
logger.warning(f"Failed to add Lambda metering for assessment skip: {str(e)}")
176+
169177
# Return consistent format for Map state collation
170178
response = {
171179
"section_id": section_id,
@@ -293,6 +301,13 @@ def handler(event, context):
293301
updated_document.errors.extend(validation_errors)
294302
logger.error(f"Validation Error: {validation_errors}")
295303

304+
# Add Lambda metering for successful assessment execution
305+
try:
306+
lambda_metering = calculate_lambda_metering("Assessment", context, start_time)
307+
updated_document.metering = merge_metering_data(updated_document.metering, lambda_metering)
308+
except Exception as e:
309+
logger.warning(f"Failed to add Lambda metering for assessment: {str(e)}")
310+
296311
# Prepare output with automatic compression if needed
297312
result = {
298313
'document': updated_document.serialize_document(working_bucket, f"assessment_{section_id}", logger),

patterns/pattern-2/src/extraction_function/index.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from idp_common import metrics, get_config, extraction
1111
from idp_common.models import Document, Section, Status
1212
from idp_common.docs_service import create_document_service
13+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1314

1415
# Configuration will be loaded in handler function
1516

@@ -24,6 +25,7 @@ def handler(event, context):
2425
"""
2526
Process a single section of a document for information extraction
2627
"""
28+
start_time = time.time() # Capture start time for Lambda metering
2729
logger.info(f"Event: {json.dumps(event)}")
2830

2931
# Load configuration
@@ -65,6 +67,13 @@ def handler(event, context):
6567
if section.extraction_result_uri and section.extraction_result_uri.strip():
6668
logger.info(f"Skipping extraction for section {section_id} - already has extraction data: {section.extraction_result_uri}")
6769

70+
# Add Lambda metering for extraction skip execution
71+
try:
72+
lambda_metering = calculate_lambda_metering("Extraction", context, start_time)
73+
full_document.metering = merge_metering_data(full_document.metering, lambda_metering)
74+
except Exception as e:
75+
logger.warning(f"Failed to add Lambda metering for extraction skip: {str(e)}")
76+
6877
# Return the section without processing
6978
response = {
7079
"section_id": section_id,
@@ -117,6 +126,13 @@ def handler(event, context):
117126
logger.error(error_message)
118127
raise Exception(error_message)
119128

129+
# Add Lambda metering for successful extraction execution
130+
try:
131+
lambda_metering = calculate_lambda_metering("Extraction", context, start_time)
132+
section_document.metering = merge_metering_data(section_document.metering, lambda_metering)
133+
except Exception as e:
134+
logger.warning(f"Failed to add Lambda metering for extraction: {str(e)}")
135+
120136
# Prepare output with automatic compression if needed
121137
response = {
122138
"section_id": section_id,

patterns/pattern-2/src/summarization_function/index.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from idp_common import get_config, summarization
1414
from idp_common.models import Document, Status
1515
from idp_common.docs_service import create_document_service
16+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1617

1718
# Configuration will be loaded in handler function
1819

@@ -72,6 +73,13 @@ def handler(event, context):
7273
else:
7374
logger.warning("Document summarization completed but no summary report URI was set")
7475

76+
# Add Lambda metering for successful summarization execution
77+
try:
78+
lambda_metering = calculate_lambda_metering("Summarization", context, start_time)
79+
processed_document.metering = merge_metering_data(processed_document.metering, lambda_metering)
80+
except Exception as e:
81+
logger.warning(f"Failed to add Lambda metering for summarization: {str(e)}")
82+
7583
# Prepare output with automatic compression if needed
7684
return {
7785
'document': processed_document.serialize_document(working_bucket, "summarization", logger),

0 commit comments

Comments
 (0)