|
9 | 9 | This module processes S3 events and iterates through each record to process them individually.''' |
10 | 10 |
|
11 | 11 |
|
| 12 | +def _process_all_records(s3_records: list) -> dict: |
| 13 | + record_count = len(s3_records) |
| 14 | + error_count = 0 |
| 15 | + file_keys = [] |
| 16 | + for record in s3_records: |
| 17 | + record_result = process_record(record) |
| 18 | + file_keys.append(record_result["file_key"]) |
| 19 | + if record_result["status"] == "error": |
| 20 | + error_count += 1 |
| 21 | + if error_count > 0: |
| 22 | + logger.error("Processed %d records with %d errors", record_count, error_count) |
| 23 | + return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors", |
| 24 | + "file_keys": file_keys} |
| 25 | + else: |
| 26 | + logger.info("Successfully processed all %d records", record_count) |
| 27 | + return {"status": "success", "message": f"Successfully processed {record_count} records", |
| 28 | + "file_keys": file_keys} |
| 29 | + |
| 30 | + |
12 | 31 | @logging_decorator(prefix="redis_sync", stream_name=STREAM_NAME) |
13 | 32 | def handler(event, _): |
14 | 33 |
|
15 | 34 | try: |
| 35 | + no_records = "No records found in event" |
16 | 36 | # check if the event requires a read, ie {"read": "my-hashmap"} |
17 | 37 | if "read" in event: |
18 | 38 | return read_event(redis_client, event, logger) |
19 | 39 | elif "Records" in event: |
20 | 40 | logger.info("Processing S3 event with %d records", len(event.get('Records', []))) |
21 | | - s3_event = S3Event(event) |
22 | | - record_count = len(s3_event.get_s3_records()) |
23 | | - if record_count == 0: |
24 | | - logger.info("No records found in event") |
25 | | - return {"status": "success", "message": "No records found in event"} |
| 41 | + s3_records = S3Event(event).get_s3_records() |
| 42 | + if not s3_records: |
| 43 | + logger.info(no_records) |
| 44 | + return {"status": "success", "message": no_records} |
26 | 45 | else: |
27 | | - error_count = 0 |
28 | | - file_keys = [] |
29 | | - for record in s3_event.get_s3_records(): |
30 | | - record_result = process_record(record) |
31 | | - file_keys.append(record_result["file_key"]) |
32 | | - if record_result["status"] == "error": |
33 | | - error_count += 1 |
34 | | - if error_count > 0: |
35 | | - logger.error("Processed %d records with %d errors", record_count, error_count) |
36 | | - return {"status": "error", "message": f"Processed {record_count} records with {error_count} errors", |
37 | | - "file_keys": file_keys} |
38 | | - else: |
39 | | - logger.info("Successfully processed all %d records", record_count) |
40 | | - return {"status": "success", "message": f"Successfully processed {record_count} records", |
41 | | - "file_keys": file_keys} |
| 46 | + return _process_all_records(s3_records) |
42 | 47 | else: |
43 | | - logger.info("No records found in event") |
44 | | - return {"status": "success", "message": "No records found in event"} |
| 48 | + logger.info(no_records) |
| 49 | + return {"status": "success", "message": no_records} |
45 | 50 |
|
46 | 51 | except Exception: |
47 | 52 | logger.exception("Error processing S3 event") |
|
0 commit comments