Skip to content

Commit a1c8558

Browse files
DL-114-create-lambda-function-to-handle-csv-files (#2561)
* create lambda function * update comments - no retry * change to sqs and aws wrangler and pathlib * add DLQ and comments * remove python 3.11 which has it as default * fail early - reduce timeout, improve comments * Update terraform/etl/62-lambda-csv-to-glue-catalog.tf Co-authored-by: timburke-hackit <[email protected]> * add a note at the top for column types * revert the change of aws account --------- Co-authored-by: timburke-hackit <[email protected]>
1 parent c8c0a32 commit a1c8558

File tree

4 files changed

+568
-0
lines changed

4 files changed

+568
-0
lines changed
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
"""
2+
Automatically creates/deletes Glue Catalog tables when CSV files are uploaded/deleted in S3.
3+
4+
Note: all column types of the created tables are defined as strings and please be aware when using them downstream.
5+
"""
6+
7+
import json
8+
import logging
9+
import re
10+
import unicodedata
11+
from os import getenv
12+
from pathlib import PurePosixPath
13+
from typing import Any
14+
from urllib.parse import unquote_plus
15+
16+
import awswrangler as wr
17+
18+
logger = logging.getLogger()
19+
logger.setLevel(logging.INFO)
20+
21+
22+
def parse_s3_key(s3_key: str) -> tuple[str, str, str]:
23+
"""Parse S3 key to extract department, user_name, and file_name.
24+
25+
Expected format: <department>/<user_name>/<file.csv>
26+
Example: parking/davina/test_1.csv
27+
28+
Args:
29+
s3_key: URL-encoded S3 object key
30+
31+
Returns:
32+
Tuple of (department, user_name, file_base_name)
33+
"""
34+
decoded_key = unquote_plus(s3_key)
35+
path = PurePosixPath(decoded_key)
36+
37+
if len(path.parts) < 3:
38+
raise ValueError(
39+
f"Invalid S3 key format: {s3_key}. Expected format: <department>/<user_name>/<file.csv>"
40+
)
41+
42+
if path.suffix != ".csv":
43+
raise ValueError(f"File must be a CSV file: {path.name}")
44+
45+
department = path.parts[0]
46+
user_name = path.parts[1]
47+
file_base_name = path.stem
48+
49+
return department, user_name, file_base_name
50+
51+
52+
def normalize_name(name: str, lowercase: bool = True) -> str:
53+
"""Normalize name by replacing all non-alphanumeric characters with underscores.
54+
55+
Strips accents, and converts to lowercase (optional). Consecutive non-alphanumeric
56+
characters are replaced with a single underscore.
57+
58+
Args:
59+
name: Original name (column name, file name, user name, etc.)
60+
lowercase: Whether to convert to lowercase (default: True)
61+
62+
Returns:
63+
Normalized name
64+
"""
65+
formatted_name = name.lower() if lowercase else name
66+
formatted_name = unicodedata.normalize("NFKD", formatted_name)
67+
formatted_name = re.sub(r"[^a-zA-Z0-9]+", "_", formatted_name)
68+
formatted_name = formatted_name.strip("_")
69+
return formatted_name
70+
71+
72+
def deduplicate_column_names(columns: list[str]) -> list[str]:
73+
"""Deduplicate column names by appending a counter to duplicate names.
74+
75+
Args:
76+
columns: List of column names (may contain duplicates)
77+
78+
Returns:
79+
List of deduplicated column names
80+
"""
81+
deduped_headers = []
82+
header_counts = {}
83+
84+
for col in columns:
85+
if col in header_counts:
86+
header_counts[col] += 1
87+
deduped_headers.append(f"{col}_{header_counts[col]}")
88+
else:
89+
header_counts[col] = 0
90+
deduped_headers.append(col)
91+
92+
return deduped_headers
93+
94+
95+
def extract_csv_column_definitions(bucket: str, key: str) -> dict[str, str]:
96+
"""Extract column names from CSV header, normalize and deduplicate them.
97+
98+
Returns dictionary mapping column names to types (all string type).
99+
100+
Args:
101+
bucket: S3 bucket name
102+
key: S3 object key (file path)
103+
104+
Returns:
105+
Dictionary mapping column names to types (all 'string')
106+
107+
Raises:
108+
ValueError: If CSV file has no header row or empty header row
109+
"""
110+
s3_path = f"s3://{bucket}/{key}"
111+
112+
try:
113+
df = wr.s3.read_csv(
114+
path=s3_path,
115+
nrows=1,
116+
use_threads=False,
117+
encoding="utf-8-sig",
118+
on_bad_lines="skip",
119+
)
120+
except Exception as e:
121+
logger.error(f"Failed to read CSV from S3: {e}")
122+
raise ValueError(f"Unable to read CSV file: {e}") from e
123+
124+
if len(df.columns) == 0:
125+
raise ValueError("CSV file has no header row")
126+
127+
column_names = list(df.columns)
128+
129+
if not column_names or all(not col or col.strip() == "" for col in column_names):
130+
raise ValueError("CSV file has empty or invalid header row")
131+
132+
normalized_headers = [normalize_name(col) for col in column_names]
133+
deduped_headers = deduplicate_column_names(normalized_headers)
134+
135+
columns_types = {}
136+
for col_name in deduped_headers:
137+
if not col_name:
138+
col_name = f"column_{len(columns_types)}"
139+
columns_types[col_name] = "string"
140+
141+
logger.info(
142+
f"A total of {len(columns_types)} column definitions were extracted from CSV header"
143+
)
144+
return columns_types
145+
146+
147+
def create_glue_table(
148+
database_name: str,
149+
table_name: str,
150+
bucket: str,
151+
s3_key: str,
152+
columns_types: dict[str, str],
153+
):
154+
"""Create or recreate Glue Catalog table using AWS Data Wrangler.
155+
156+
Args:
157+
database_name: Glue database name
158+
table_name: Glue table name
159+
bucket: S3 bucket name
160+
s3_key: S3 object key (file path)
161+
columns_types: Dictionary mapping column names to types
162+
"""
163+
# Extract directory path: s3://bucket/parking/user/file.csv -> s3://bucket/parking/user/
164+
s3_path_parts = s3_key.rsplit("/", 1)
165+
s3_directory = f"{s3_path_parts[0]}/" if len(s3_path_parts) > 1 else ""
166+
s3_location = f"s3://{bucket}/{s3_directory}"
167+
168+
wr.catalog.create_csv_table(
169+
database=database_name,
170+
table=table_name,
171+
path=s3_location,
172+
columns_types=columns_types,
173+
mode="overwrite",
174+
skip_header_line_count=1,
175+
)
176+
logger.info(
177+
f"Successfully created table: {table_name} in database: {database_name}"
178+
)
179+
180+
181+
def delete_glue_table(database_name: str, table_name: str):
182+
"""Delete Glue Catalog table using AWS Data Wrangler.
183+
184+
Args:
185+
database_name: Glue database name
186+
table_name: Glue table name
187+
"""
188+
wr.catalog.delete_table_if_exists(database=database_name, table=table_name)
189+
logger.info(
190+
f"Successfully deleted table: {table_name} from database: {database_name}"
191+
)
192+
193+
194+
def process_single_event_record(
195+
record: dict[str, Any], database_name: str
196+
) -> tuple[bool, bool]:
197+
"""Process a single S3 event record.
198+
199+
Args:
200+
record: S3 event record dictionary
201+
database_name: Glue database name
202+
203+
Returns:
204+
Tuple of (was_processed, was_skipped)
205+
206+
Raises:
207+
Exception: If processing fails
208+
"""
209+
event_name = record.get("eventName", "")
210+
s3_info = record.get("s3", {})
211+
bucket = s3_info.get("bucket", {}).get("name", "")
212+
s3_key = s3_info.get("object", {}).get("key", "")
213+
214+
if not s3_key:
215+
logger.warning("No S3 key found in event record")
216+
return False, True
217+
218+
decoded_s3_key = unquote_plus(s3_key)
219+
logger.info(f"Processing event: {event_name} for s3://{bucket}/{decoded_s3_key}")
220+
221+
_, user_name, file_base_name = parse_s3_key(s3_key)
222+
table_name = f"{normalize_name(user_name)}_{normalize_name(file_base_name)}"
223+
224+
if event_name.startswith("ObjectCreated"):
225+
logger.info(f"Creating/updating table: {table_name}")
226+
227+
columns_types = extract_csv_column_definitions(bucket, decoded_s3_key)
228+
229+
create_glue_table(
230+
database_name=database_name,
231+
table_name=table_name,
232+
bucket=bucket,
233+
s3_key=decoded_s3_key,
234+
columns_types=columns_types,
235+
)
236+
237+
logger.info(
238+
f"Successfully processed upload: {decoded_s3_key} -> table: {table_name}"
239+
)
240+
return True, False
241+
242+
elif event_name.startswith("ObjectRemoved"):
243+
logger.info(f"Deleting table: {table_name}")
244+
245+
delete_glue_table(database_name=database_name, table_name=table_name)
246+
247+
logger.info(
248+
f"Successfully processed deletion: {decoded_s3_key} -> deleted table: {table_name}"
249+
)
250+
return True, False
251+
252+
else:
253+
logger.warning(f"Unsupported event type: {event_name}")
254+
return False, True
255+
256+
257+
def extract_s3_event_from_sqs_record(sqs_record: dict[str, Any]) -> dict[str, Any]:
258+
"""Extract S3 event from SQS message body.
259+
260+
Args:
261+
sqs_record: SQS record containing S3 event in body
262+
263+
Returns:
264+
S3 event record dictionary
265+
"""
266+
body = sqs_record.get("body", "{}")
267+
s3_event = json.loads(body)
268+
269+
if "Records" in s3_event and len(s3_event["Records"]) > 0:
270+
return s3_event["Records"][0]
271+
272+
return {}
273+
274+
275+
def handle_sqs_event(event: dict[str, Any]) -> dict[str, Any]:
276+
"""Handle SQS event containing S3 event notifications.
277+
278+
Process each SQS message, extract S3 events, and handle partial batch failures.
279+
280+
Args:
281+
event: SQS event dictionary containing Records array of SQS messages
282+
283+
Returns:
284+
Dictionary with batchItemFailures for partial batch failure handling
285+
"""
286+
database_name = getenv("GLUE_DATABASE_NAME", "parking_user_uploads_db")
287+
sqs_records = event.get("Records", [])
288+
total_records = len(sqs_records)
289+
processed_count = 0
290+
skipped_count = 0
291+
failed_message_ids = []
292+
293+
logger.info(f"Processing {total_records} SQS message(s)")
294+
295+
for sqs_record in sqs_records:
296+
message_id = sqs_record.get("messageId", "unknown")
297+
logger.info(f"Processing SQS message {message_id}")
298+
299+
try:
300+
s3_event_record = extract_s3_event_from_sqs_record(sqs_record)
301+
302+
if not s3_event_record:
303+
logger.warning(f"No S3 event found in SQS message {message_id}")
304+
skipped_count += 1
305+
continue
306+
307+
s3_key = (
308+
s3_event_record.get("s3", {}).get("object", {}).get("key", "unknown")
309+
)
310+
logger.info(f"Processing file from message {message_id}: {s3_key}")
311+
312+
was_processed, was_skipped = process_single_event_record(
313+
s3_event_record, database_name
314+
)
315+
316+
if was_processed:
317+
processed_count += 1
318+
logger.info(f"Successfully processed message {message_id}")
319+
elif was_skipped:
320+
skipped_count += 1
321+
logger.info(f"Skipped message {message_id}")
322+
323+
except Exception as e:
324+
error_msg = f"Error processing SQS message {message_id}: {str(e)}"
325+
logger.error(error_msg, exc_info=True)
326+
failed_message_ids.append(message_id)
327+
328+
logger.info(
329+
f"Processing summary: {processed_count} processed, "
330+
f"{skipped_count} skipped, {len(failed_message_ids)} failed, {total_records} total"
331+
)
332+
333+
return {
334+
"batchItemFailures": [
335+
{"itemIdentifier": message_id} for message_id in failed_message_ids
336+
]
337+
}
338+
339+
340+
def lambda_handler(event: dict[str, Any], context: object) -> dict[str, Any]:
341+
"""Lambda function handler for SQS events and batchItemFailures."""
342+
return handle_sqs_event(event)

terraform/core/41-lambda-failure-alarms.tf

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,3 +46,15 @@ module "export_rds_to_s3_snapshot_lambda_alarm" {
4646
alarms_handler_lambda_name = module.lambda_alarms_handler[0].lambda_name
4747
alarms_handler_lambda_arn = module.lambda_alarms_handler[0].lambda_arn
4848
}
49+
50+
module "csv_to_glue_catalog_lambda_alarm" {
51+
count = local.is_production_environment ? 1 : 0
52+
source = "../modules/lambda-alarms-and-monitoring"
53+
tags = module.tags.values
54+
identifier_prefix = local.short_identifier_prefix
55+
lambda_name = "${local.short_identifier_prefix}csv-to-glue-catalog"
56+
project = var.project
57+
environment = var.environment
58+
alarms_handler_lambda_name = module.lambda_alarms_handler[0].lambda_name
59+
alarms_handler_lambda_arn = module.lambda_alarms_handler[0].lambda_arn
60+
}

terraform/etl/05-aws-s3.tf

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,12 @@ module "lambda_artefact_storage_data_source" {
4646
bucket_identifier = "dp-lambda-artefact-storage"
4747
}
4848

49+
module "user_uploads_data_source" {
50+
source = "../modules/data-sources/s3-bucket"
51+
identifier_prefix = local.identifier_prefix
52+
bucket_identifier = "user-uploads"
53+
}
54+
4955
module "spark_ui_output_storage_data_source" {
5056
source = "../modules/data-sources/s3-bucket"
5157
identifier_prefix = local.identifier_prefix

0 commit comments

Comments
 (0)