Skip to content

Commit 3d6eb0b

Browse files
author
Bob Strahan
committed
feat: Complete Pattern 3 Lambda metering implementation
- Add Lambda metering to all 5 Pattern 3 core processing functions - OCR: Both normal and skip execution paths with metering - Classification: Both normal and skip execution paths (SageMaker UDOP backend) - Extraction: Both normal and skip execution paths with metering - Assessment: Both normal and skip execution paths with metering - Summarization: Successful execution path with metering Pattern 3 now has complete Lambda execution cost tracking matching Pattern 2 implementation. All functions track invocation counts and GB-seconds duration for comprehensive cost visibility.
1 parent 9c407e2 commit 3d6eb0b

File tree

5 files changed

+73
-1
lines changed

5 files changed

+73
-1
lines changed

patterns/pattern-3/src/assessment_function/index.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from idp_common import get_config, assessment
1010
from idp_common.models import Document, Status
1111
from idp_common.docs_service import create_document_service
12+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1213

1314
# Configuration will be loaded in handler function
1415

@@ -22,6 +23,7 @@ def handler(event, context):
2223
This function assesses the confidence of extraction results for a document section
2324
using the Assessment service from the idp_common library.
2425
"""
26+
start_time = time.time() # Capture start time for Lambda metering
2527
logger.info(f"Starting assessment processing for event: {json.dumps(event, default=str)}")
2628

2729
# Load configuration
@@ -94,6 +96,13 @@ def handler(event, context):
9496
# Add only the section being processed (preserve existing data)
9597
section_document.sections = [section]
9698

99+
# Add Lambda metering for assessment skip execution
100+
try:
101+
lambda_metering = calculate_lambda_metering("Assessment", context, start_time)
102+
section_document.metering = merge_metering_data(section_document.metering, lambda_metering)
103+
except Exception as e:
104+
logger.warning(f"Failed to add Lambda metering for assessment skip: {str(e)}")
105+
97106
# Return consistent format for Map state collation
98107
response = {
99108
"section_id": section_id,
@@ -138,6 +147,13 @@ def handler(event, context):
138147
logger.error(error_message)
139148
raise Exception(error_message)
140149

150+
# Add Lambda metering for successful assessment execution
151+
try:
152+
lambda_metering = calculate_lambda_metering("Assessment", context, start_time)
153+
updated_document.metering = merge_metering_data(updated_document.metering, lambda_metering)
154+
except Exception as e:
155+
logger.warning(f"Failed to add Lambda metering for assessment: {str(e)}")
156+
141157
# Prepare output with automatic compression if needed
142158
result = {
143159
'document': updated_document.serialize_document(working_bucket, f"assessment_{section_id}", logger),

patterns/pattern-3/src/classification_function/index.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from idp_common import classification, metrics, get_config
1515
from idp_common.models import Document, Status
1616
from idp_common.docs_service import create_document_service
17+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1718

1819
# Configuration will be loaded in handler function
1920
region = os.environ['AWS_REGION']
@@ -28,6 +29,7 @@ def handler(event, context):
2829
"""
2930
Lambda handler for document classification using SageMaker UDOP model.
3031
"""
32+
start_time = time.time() # Capture start time for Lambda metering
3133
logger.info(f"Event: {json.dumps(event)}")
3234

3335
# Extract document from the OCR result - handle both compressed and uncompressed
@@ -58,6 +60,13 @@ def handler(event, context):
5860
logger.info(f"Updating document execution ARN for classification skip")
5961
document_service.update_document(document)
6062

63+
# Add Lambda metering for classification skip execution
64+
try:
65+
lambda_metering = calculate_lambda_metering("Classification", context, start_time)
66+
document.metering = merge_metering_data(document.metering, lambda_metering)
67+
except Exception as e:
68+
logger.warning(f"Failed to add Lambda metering for classification skip: {str(e)}")
69+
6170
# Prepare output with existing document data
6271
response = {
6372
"document": document.serialize_document(working_bucket, "classification_skip", logger)
@@ -140,6 +149,13 @@ def handler(event, context):
140149
t1 = time.time()
141150
logger.info(f"Time taken for classification: {t1-t0:.2f} seconds")
142151

152+
# Add Lambda metering for successful classification execution
153+
try:
154+
lambda_metering = calculate_lambda_metering("Classification", context, start_time)
155+
document.metering = merge_metering_data(document.metering, lambda_metering)
156+
except Exception as e:
157+
logger.warning(f"Failed to add Lambda metering for classification: {str(e)}")
158+
143159
# Prepare output with automatic compression if needed
144160
response = {
145161
"document": document.serialize_document(working_bucket, "classification", logger)

patterns/pattern-3/src/extraction_function/index.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
from idp_common import metrics, get_config, extraction
1111
from idp_common.models import Document, Section, Status
1212
from idp_common.docs_service import create_document_service
13+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1314

1415
# Configuration will be loaded in handler function
1516

@@ -24,6 +25,7 @@ def handler(event, context):
2425
"""
2526
Process a single section of a document for information extraction
2627
"""
28+
start_time = time.time() # Capture start time for Lambda metering
2729
logger.info(f"Event: {json.dumps(event)}")
2830

2931
# Load configuration
@@ -65,6 +67,13 @@ def handler(event, context):
6567
if section.extraction_result_uri and section.extraction_result_uri.strip():
6668
logger.info(f"Skipping extraction for section {section_id} - already has extraction data: {section.extraction_result_uri}")
6769

70+
# Add Lambda metering for extraction skip execution
71+
try:
72+
lambda_metering = calculate_lambda_metering("Extraction", context, start_time)
73+
full_document.metering = merge_metering_data(full_document.metering, lambda_metering)
74+
except Exception as e:
75+
logger.warning(f"Failed to add Lambda metering for extraction skip: {str(e)}")
76+
6877
# Return the section without processing
6978
response = {
7079
"section_id": section_id,
@@ -117,6 +126,13 @@ def handler(event, context):
117126
logger.error(error_message)
118127
raise Exception(error_message)
119128

129+
# Add Lambda metering for successful extraction execution
130+
try:
131+
lambda_metering = calculate_lambda_metering("Extraction", context, start_time)
132+
section_document.metering = merge_metering_data(section_document.metering, lambda_metering)
133+
except Exception as e:
134+
logger.warning(f"Failed to add Lambda metering for extraction: {str(e)}")
135+
120136
# Prepare output with automatic compression if needed
121137
response = {
122138
"section_id": section_id,

patterns/pattern-3/src/ocr_function/index.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from idp_common import get_config, ocr
1515
from idp_common.models import Document, Status
1616
from idp_common.docs_service import create_document_service
17+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1718

1819
# Configuration will be loaded in handler function
1920

@@ -29,7 +30,8 @@
2930
def handler(event, context):
3031
"""
3132
Lambda handler for OCR processing.
32-
"""
33+
"""
34+
start_time = time.time() # Capture start time for Lambda metering
3335
logger.info(f"Event: {json.dumps(event)}")
3436

3537
# Get document from event - handle both compressed and uncompressed
@@ -61,6 +63,13 @@ def handler(event, context):
6163
logger.info(f"Updating document execution ARN for OCR skip")
6264
document_service.update_document(document)
6365

66+
# Add Lambda metering for OCR skip execution
67+
try:
68+
lambda_metering = calculate_lambda_metering("OCR", context, start_time)
69+
document.metering = merge_metering_data(document.metering, lambda_metering)
70+
except Exception as e:
71+
logger.warning(f"Failed to add Lambda metering for OCR skip: {str(e)}")
72+
6473
# Prepare output with existing document data
6574
working_bucket = os.environ.get('WORKING_BUCKET')
6675
response = {
@@ -105,6 +114,13 @@ def handler(event, context):
105114
t1 = time.time()
106115
logger.info(f"Total OCR processing time: {t1-t0:.2f} seconds")
107116

117+
# Add Lambda metering for successful OCR execution
118+
try:
119+
lambda_metering = calculate_lambda_metering("OCR", context, start_time)
120+
document.metering = merge_metering_data(document.metering, lambda_metering)
121+
except Exception as e:
122+
logger.warning(f"Failed to add Lambda metering for OCR: {str(e)}")
123+
108124
# Prepare output with automatic compression if needed
109125
working_bucket = os.environ.get('WORKING_BUCKET')
110126
response = {

patterns/pattern-3/src/summarization_function/index.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from idp_common import get_config, summarization
1414
from idp_common.models import Document, Status
1515
from idp_common.docs_service import create_document_service
16+
from idp_common.utils import calculate_lambda_metering, merge_metering_data
1617

1718
# Configuration will be loaded in handler function
1819

@@ -72,6 +73,13 @@ def handler(event, context):
7273
else:
7374
logger.warning("Document summarization completed but no summary report URI was set")
7475

76+
# Add Lambda metering for successful summarization execution
77+
try:
78+
lambda_metering = calculate_lambda_metering("Summarization", context, start_time)
79+
processed_document.metering = merge_metering_data(processed_document.metering, lambda_metering)
80+
except Exception as e:
81+
logger.warning(f"Failed to add Lambda metering for summarization: {str(e)}")
82+
7583
# Prepare output with automatic compression if needed
7684
return {
7785
'document': processed_document.serialize_document(working_bucket, "summarization", logger),

0 commit comments

Comments
 (0)