Skip to content

Commit 6959c74

Browse files
authored
[PRMP-179] Adding update function and get_s3_values (#799)
1 parent ba1b60d commit 6959c74

File tree

4 files changed

+234
-2
lines changed

4 files changed

+234
-2
lines changed

lambdas/requirements/layers/requirements_core_lambda_layer.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ oauthlib==3.2.2
1111
packaging==23.0.0
1212
pydantic==2.11
1313
pydantic[email]==2.11
14-
pypdf==6.0.0
14+
pypdf==6.1.3
1515
requests==2.32.4
1616
responses==0.23.1
1717
six==1.16.0
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
import argparse
2+
from typing import Iterable, Callable
3+
from services.base.dynamo_service import DynamoDBService
4+
from utils.audit_logging_setup import LoggingService
5+
from services.base.s3_service import S3Service
6+
7+
class VersionMigration:
8+
filesize_key_field_name = "FileSize"
9+
s3_key_field_name = "S3FileKey"
10+
s3_version_id_field_name = "S3VersionID"
11+
12+
def __init__(self, environment: str, table_name: str, dry_run: bool = False):
13+
self.environment = environment
14+
self.table_name = table_name
15+
self.dry_run = dry_run
16+
self.logger = LoggingService("S3Migration")
17+
self.dynamo_service = DynamoDBService()
18+
self.s3_service = S3Service()
19+
20+
self.target_table = f"{self.environment}_{self.table_name}"
21+
22+
def main(
23+
self, entries: Iterable[dict]
24+
) -> list[tuple[str, Callable[[dict], dict | None]]]:
25+
26+
"""
27+
Main entry point. Returns a list of update functions with labels.
28+
Accepts a list of entries for Lambda-based execution, or scans the table if `entries` is None.
29+
"""
30+
self.logger.info("Starting version migration")
31+
self.logger.info(f"Target table: {self.target_table}")
32+
self.logger.info(f"Dry run mode: {self.dry_run}")
33+
34+
if entries is None:
35+
self.logger.error("No entries provided after scanning entire table.")
36+
raise ValueError("Entries must be provided to main().")
37+
38+
return [
39+
("s3Metadata", self.update_s3_metadata_entry)
40+
]
41+
42+
def process_entries(
43+
self,
44+
label: str,
45+
entries: Iterable[dict],
46+
update_fn: Callable[[dict], dict | None],
47+
):
48+
self.logger.info(f"Running {label} migration")
49+
50+
for index, entry in enumerate(entries, start=1):
51+
item_id = entry.get("ID")
52+
53+
# Add entry ID validation
54+
if not item_id:
55+
self.logger.error(f"[{label}] Item {index} missing ID field, skipping")
56+
continue
57+
58+
self.logger.info(
59+
f"[{label}] Processing item {index} (ID: {item_id})"
60+
)
61+
62+
updated_fields = update_fn(entry)
63+
if not updated_fields:
64+
self.logger.info(
65+
f"[{label}] Item {item_id} does not require update, skipping."
66+
)
67+
continue
68+
69+
if self.dry_run:
70+
self.logger.info(
71+
f"[Dry Run] Would update item {item_id} with {updated_fields}"
72+
)
73+
else:
74+
self.logger.info(f"Updating item {item_id} with {updated_fields}")
75+
try:
76+
self.dynamo_service.update_item(
77+
table_name=self.target_table,
78+
key_pair={"ID": item_id},
79+
updated_fields=updated_fields,
80+
)
81+
self.logger.info(f"Successfully updated item {item_id}")
82+
except Exception as e:
83+
self.logger.error(f"Failed to update item {item_id}: {str(e)}")
84+
continue
85+
86+
self.logger.info(f"{label} migration completed.")
87+
88+
89+
@staticmethod
90+
def parse_s3_path(s3_path: str) -> tuple[str, str] | None:
91+
"""Parse S3 path into bucket and key components"""
92+
if not s3_path or not s3_path.startswith("s3://"):
93+
return None
94+
95+
path = s3_path.removeprefix("s3://")
96+
parts = path.split("/", 1)
97+
98+
if len(parts) != 2 or not parts[0] or not parts[1]:
99+
return None
100+
101+
return parts[0], parts[1]
102+
103+
def get_meta_data(self, s3_bucket: str, s3_key: str) -> tuple[int, str] | None:
104+
try:
105+
s3_head = self.s3_service.get_head_object(s3_bucket, s3_key)
106+
if s3_head:
107+
return s3_head.get('ContentLength'), s3_head.get('VersionId')
108+
except Exception as e:
109+
self.logger.error(f"Failed to retrieve S3 metadata for {s3_key}: {str(e)}")
110+
return None
111+
112+
def update_s3_metadata_entry(self, entry: dict) -> dict | None:
113+
"""Update entry with S3 metadata (FileSize, S3Key, S3VersionID)"""
114+
115+
file_location = entry.get("FileLocation")
116+
if not file_location:
117+
self.logger.warning(f"Missing FileLocation for entry: {entry.get('ID')}")
118+
return None
119+
120+
if not (pathResult := self.parse_s3_path(file_location)) or not all(pathResult):
121+
self.logger.warning(f"Invalid S3 path: {file_location}")
122+
return None
123+
s3_bucket, s3_key = pathResult
124+
125+
if not (metadata := self.get_meta_data(s3_bucket, s3_key)) or not all(metadata):
126+
self.logger.warning(f"Could not retrieve S3 metadata for item {s3_key}")
127+
return None
128+
content_length, version_id = metadata
129+
130+
updated_fields = {}
131+
132+
if self.filesize_key_field_name not in entry:
133+
if content_length is None:
134+
self.logger.error(f"FileSize missing in both DynamoDB and S3 for item {s3_key}")
135+
return None
136+
updated_fields[self.filesize_key_field_name] = content_length
137+
138+
if self.s3_key_field_name not in entry:
139+
updated_fields[self.s3_key_field_name] = s3_key
140+
141+
if self.s3_version_id_field_name not in entry:
142+
if version_id is None:
143+
self.logger.error(f"S3VersionID missing in both DynamoDB and S3 for item {s3_key}")
144+
return None
145+
updated_fields[self.s3_version_id_field_name] = version_id
146+
147+
return updated_fields if updated_fields else None
148+
149+
if __name__ == "__main__":
150+
parser = argparse.ArgumentParser(
151+
prog="dynamodb_migration.py",
152+
description="Migrate DynamoDB table columns",
153+
)
154+
parser.add_argument("environment", help="Environment prefix for DynamoDB table")
155+
parser.add_argument("table_name", help="DynamoDB table name to migrate")
156+
parser.add_argument(
157+
"--dry-run",
158+
action="store_true",
159+
help="Run migration in dry-run mode (no writes)",
160+
)
161+
args = parser.parse_args()
162+
163+
migration = VersionMigration(
164+
environment=args.environment,
165+
table_name=args.table_name,
166+
dry_run=args.dry_run,
167+
)
168+
169+
entries_to_process = list(
170+
migration.dynamo_service.scan_whole_table(migration.target_table)
171+
)
172+
173+
update_functions = migration.main(entries=entries_to_process)
174+
175+
for label, fn in update_functions:
176+
migration.process_entries(label=label, entries=entries_to_process, update_fn=fn)

lambdas/services/base/s3_service.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,9 @@ def get_file_size(self, s3_bucket_name: str, object_key: str) -> int:
183183
response = self.client.head_object(Bucket=s3_bucket_name, Key=object_key)
184184
return response.get("ContentLength", 0)
185185

186+
def get_head_object(self, bucket: str, key: str):
187+
return self.client.head_object(Bucket=bucket, Key=key)
188+
186189
def get_object_stream(self, bucket: str, key: str):
187190
response = self.client.get_object(Bucket=bucket, Key=key)
188191
return response.get("Body")

lambdas/tests/unit/services/base/test_s3_service.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
MOCK_PRESIGNED_URL_RESPONSE,
2020
)
2121
from utils.exceptions import TagNotFoundException
22-
from utils.utilities import flatten
2322

2423
TEST_DOWNLOAD_PATH = "test_path"
2524
MOCK_EVENT_BODY = {
@@ -30,6 +29,11 @@
3029
}
3130

3231

32+
def flatten(list_of_lists):
33+
"""Flatten a list of lists into a single list."""
34+
return [item for sublist in list_of_lists for item in sublist]
35+
36+
3337
@freeze_time("2023-10-30T10:25:00")
3438
@pytest.fixture
3539
def mock_service(mocker, set_env):
@@ -485,3 +489,52 @@ def test_stream_s3_object_to_memory(mock_service, mock_client, mocker):
485489
result = mock_service.stream_s3_object_to_memory(MOCK_BUCKET, TEST_FILE_KEY)
486490

487491
assert result.getvalue() == b"first-chunksecond-chunk"
492+
493+
494+
def test_get_head_object_returns_metadata(mock_service, mock_client):
495+
mock_response = {
496+
"ResponseMetadata": {
497+
"RequestId": "mock_req",
498+
"HostId": "",
499+
"HTTPStatusCode": 200,
500+
"HTTPHeaders": {},
501+
"RetryAttempts": 0,
502+
},
503+
"ContentLength": 3191,
504+
"ETag": '"eb2996dae99afd8308e4c97bdb6a4178"',
505+
"ContentType": "application/pdf",
506+
"Metadata": {"custom-key": "custom-value"},
507+
}
508+
509+
mock_client.head_object.return_value = mock_response
510+
511+
result = mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY)
512+
513+
assert result == mock_response
514+
mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY)
515+
516+
517+
def test_get_head_object_raises_client_error_when_object_not_found(mock_service, mock_client):
518+
mock_error = ClientError(
519+
{"Error": {"Code": "404", "Message": "Not Found"}},
520+
"HeadObject"
521+
)
522+
mock_client.head_object.side_effect = mock_error
523+
524+
with pytest.raises(ClientError):
525+
mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY)
526+
527+
mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY)
528+
529+
530+
def test_get_head_object_raises_client_error_on_access_denied(mock_service, mock_client):
531+
mock_error = ClientError(
532+
{"Error": {"Code": "403", "Message": "Forbidden"}},
533+
"HeadObject"
534+
)
535+
mock_client.head_object.side_effect = mock_error
536+
537+
with pytest.raises(ClientError):
538+
mock_service.get_head_object(bucket=MOCK_BUCKET, key=TEST_FILE_KEY)
539+
540+
mock_client.head_object.assert_called_once_with(Bucket=MOCK_BUCKET, Key=TEST_FILE_KEY)

0 commit comments

Comments
 (0)