Skip to content

Commit a84ed0f

Browse files
[PRMP-399] Dynamo DB Migration Segment Lambda Handler and Service (#813)
Co-authored-by: Copilot <[email protected]>
1 parent dfc8ea9 commit a84ed0f

File tree

5 files changed

+938
-0
lines changed

5 files changed

+938
-0
lines changed

.github/workflows/base-lambdas-reusable-deploy-all.yml

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,6 +557,20 @@ jobs:
557557
secrets:
558558
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
559559

560+
deploy_migration_dynamodb_segment_lambda:
561+
name: Deploy migration dynamodb segment lambda
562+
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
563+
with:
564+
environment: ${{ inputs.environment}}
565+
python_version: ${{ inputs.python_version }}
566+
build_branch: ${{ inputs.build_branch}}
567+
sandbox: ${{ inputs.sandbox }}
568+
lambda_handler_name: migration_dynamodb_segment_handler
569+
lambda_aws_name: MigrationDynamodbSegment
570+
lambda_layer_names: 'core_lambda_layer'
571+
secrets:
572+
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
573+
560574
deploy_fhir_document_reference_upload_lambda:
561575
name: Deploy Upload Document References FHIR Lambda
562576
if: inputs.environment == 'development' || inputs.environment == 'test'
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import logging
2+
import boto3
3+
from botocore.config import Config
4+
from botocore.exceptions import ClientError
5+
6+
from services.migration_dynamodb_segment_service import MigrationDynamoDBSegmentService
7+
8+
logger = logging.getLogger(__name__)
9+
10+
# Initialize boto3 client outside handler with explicit timeout
11+
config = Config(
12+
connect_timeout=5,
13+
read_timeout=10,
14+
retries={'max_attempts': 3}
15+
)
16+
17+
# Initialize DynamoDB clients dictionary for reuse across invocations
18+
# Using a dictionary (initialized at module level) to satisfy SonarCube requirement
19+
# that AWS clients be initialized outside the Lambda handler
20+
dynamodb_clients = {}
21+
22+
def get_dynamodb_client(region):
23+
"""Get or create a DynamoDB client for the specified region"""
24+
if region not in dynamodb_clients:
25+
dynamodb_clients[region] = boto3.client('dynamodb', region_name=region, config=config)
26+
return dynamodb_clients[region]
27+
28+
def validate_execution_id(event):
29+
"""Validate and extract execution_id from event"""
30+
if 'executionId' not in event:
31+
raise ValueError("Invalid or missing 'executionId' in event")
32+
33+
execution_id = event['executionId']
34+
if not isinstance(execution_id, str) or execution_id.strip() == "":
35+
raise ValueError("Invalid or missing 'executionId' in event")
36+
37+
# Extract just the ID part
38+
return execution_id.split(':')[-1]
39+
40+
def validate_total_segments(event):
41+
"""Validate and extract total_segments from event"""
42+
if 'totalSegments' not in event:
43+
raise ValueError("Invalid 'totalSegments' in event - must be a valid integer")
44+
45+
try:
46+
total_segments = int(event['totalSegments'])
47+
except (ValueError, TypeError):
48+
raise ValueError("Invalid 'totalSegments' in event - must be a valid integer")
49+
50+
if total_segments <= 0:
51+
raise ValueError("'totalSegments' must be positive")
52+
if total_segments > 1000:
53+
raise ValueError("'totalSegments' exceeds maximum allowed value of 1000")
54+
55+
return total_segments
56+
57+
def validate_table_arn(event):
58+
"""Validate and extract table information from DynamoDB table ARN"""
59+
if 'tableArn' not in event:
60+
raise ValueError("Invalid or missing 'tableArn' in event")
61+
62+
table_arn = event['tableArn']
63+
if not isinstance(table_arn, str) or table_arn.strip() == "":
64+
raise ValueError("Invalid or missing 'tableArn' in event")
65+
66+
# Validate ARN format: arn:aws:dynamodb:region:account-id:table/table-name
67+
if not table_arn.startswith('arn:aws:dynamodb:'):
68+
raise ValueError("Invalid DynamoDB table ARN format - must start with 'arn:aws:dynamodb:'")
69+
70+
if ':table/' not in table_arn:
71+
raise ValueError("Invalid DynamoDB table ARN format - missing ':table/' component")
72+
73+
try:
74+
table_name = table_arn.split(':table/')[-1]
75+
if not table_name:
76+
raise ValueError("Invalid DynamoDB table ARN format - table name is empty")
77+
78+
# Extract region for validation
79+
arn_parts = table_arn.split(':')
80+
if len(arn_parts) < 6:
81+
raise ValueError("Invalid DynamoDB table ARN format - insufficient components")
82+
83+
region = arn_parts[3]
84+
if not region:
85+
raise ValueError("Invalid DynamoDB table ARN format - region is empty")
86+
87+
return table_name, region
88+
89+
except (IndexError, ValueError) as e:
90+
raise ValueError(f"Invalid DynamoDB table ARN format: {e}")
91+
92+
def validate_table_exists(table_name, region):
93+
"""Validate that the DynamoDB table exists and is accessible"""
94+
try:
95+
client = get_dynamodb_client(region)
96+
response = client.describe_table(TableName=table_name)
97+
table_status = response['Table']['TableStatus']
98+
99+
if table_status not in ['ACTIVE', 'UPDATING']:
100+
raise ValueError(f"DynamoDB table '{table_name}' is not in ACTIVE status. Current status: {table_status}")
101+
102+
return True
103+
104+
except ClientError as e:
105+
error_code = e.response.get('Error', {}).get('Code', 'Unknown')
106+
if error_code == 'ResourceNotFoundException':
107+
raise ValueError(f"DynamoDB table '{table_name}' does not exist in region '{region}'")
108+
elif error_code == 'AccessDeniedException':
109+
raise ValueError(f"Access denied to DynamoDB table '{table_name}'. Check IAM permissions")
110+
else:
111+
raise ValueError(f"Failed to validate DynamoDB table '{table_name}': {e}")
112+
113+
def lambda_handler(event, context):
114+
total_segments = None
115+
execution_id = None
116+
117+
try:
118+
table_name, region = validate_table_arn(event)
119+
validate_table_exists(table_name, region)
120+
execution_id = validate_execution_id(event)
121+
total_segments = validate_total_segments(event)
122+
123+
return MigrationDynamoDBSegmentService().segment(execution_id, total_segments)
124+
125+
except Exception as e:
126+
extras = {
127+
'executionId': execution_id if execution_id is not None else event.get('executionId'),
128+
'totalSegments': total_segments if total_segments is not None else event.get('totalSegments'),
129+
'errorType': type(e).__name__
130+
}
131+
logger.error(f"Exception in migration_dynamodb_segment_handler: {e}", extra=extras, exc_info=True)
132+
raise
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import secrets
2+
import boto3
3+
import json
4+
import logging
5+
import os
6+
from botocore.exceptions import ClientError
7+
8+
logger = logging.getLogger(__name__)
9+
10+
class MigrationDynamoDBSegmentService:
11+
def __init__(self):
12+
self.s3_client = boto3.client("s3")
13+
self.bucket_name = os.environ.get("MIGRATION_SEGMENT_BUCKET_NAME")
14+
if not self.bucket_name:
15+
raise ValueError("MIGRATION_SEGMENT_BUCKET_NAME environment variable is required")
16+
17+
## this is necessary because random.shuffle is not cryptographically secure and will be flagged by security scanners
18+
def _secure_shuffle(self, seq):
19+
"""Cryptographically secure shuffle using secrets module"""
20+
seq = list(seq)
21+
for i in range(len(seq) - 1, 0, -1):
22+
j = secrets.randbelow(i + 1)
23+
seq[i], seq[j] = seq[j], seq[i]
24+
return seq
25+
26+
def segment(self, id: str, total_segments: int) -> dict:
27+
try:
28+
segments = list(range(0, total_segments))
29+
segments = self._secure_shuffle(segments)
30+
31+
self.s3_client.put_object(
32+
Bucket=self.bucket_name,
33+
Key=f"stepfunctionconfig-{id}.json",
34+
Body=json.dumps(segments),
35+
ContentType='application/json')
36+
return {'bucket': self.bucket_name, 'key': f"stepfunctionconfig-{id}.json"}
37+
except ClientError as aws_error:
38+
extras = {
39+
'executionId': id,
40+
'totalSegments': total_segments,
41+
'bucketName': self.bucket_name,
42+
'errorType': type(aws_error).__name__,
43+
'awsErrorCode': aws_error.response.get('Error', {}).get('Code', 'Unknown')
44+
}
45+
logger.error(f"AWS error in migration_dynamodb_segment_service: {aws_error}", extra=extras, exc_info=True)
46+
raise
47+
except Exception as e:
48+
extras = {
49+
'executionId': id,
50+
'totalSegments': total_segments,
51+
'bucketName': self.bucket_name,
52+
'errorType': type(e).__name__
53+
}
54+
logger.error(f"Exception in migration_dynamodb_segment_service: {e}", extra=extras, exc_info=True)
55+
raise

0 commit comments

Comments
 (0)