Skip to content

Commit 17a662a

Browse files
committed
singleton s3 client
1 parent 6be1aac commit 17a662a

File tree

13 files changed

+59
-43
lines changed

13 files changed

+59
-43
lines changed

lambdas/filenameprocessor/src/file_name_processor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from uuid import uuid4
1111

1212
from audit_table import upsert_audit_table
13-
from common.clients import STREAM_NAME, logger, s3_client
13+
from common.clients import STREAM_NAME, logger, get_s3_client
1414
from common.log_decorator import logging_decorator
1515
from common.models.errors import UnhandledAuditTableError
1616
from constants import (
@@ -73,7 +73,7 @@ def handle_record(record) -> dict:
7373

7474
try:
7575
message_id = str(uuid4())
76-
s3_response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
76+
s3_response = get_s3_client().get_object(Bucket=bucket_name, Key=file_key)
7777
created_at_formatted_string, expiry_timestamp = get_creation_and_expiry_times(s3_response)
7878

7979
vaccine_type, supplier = validate_file_key(file_key)

lambdas/filenameprocessor/src/make_and_upload_ack_file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from csv import writer
55
from io import BytesIO, StringIO
66

7-
from common.clients import s3_client
7+
from common.clients import get_s3_client
88

99

1010
def make_the_ack_data(message_id: str, message_delivered: bool, created_at_formatted_string: str) -> dict:
@@ -43,7 +43,7 @@ def upload_ack_file(file_key: str, ack_data: dict, created_at_formatted_string:
4343
csv_buffer.seek(0)
4444
csv_bytes = BytesIO(csv_buffer.getvalue().encode("utf-8"))
4545
ack_bucket_name = os.getenv("ACK_BUCKET_NAME")
46-
s3_client.upload_fileobj(csv_bytes, ack_bucket_name, ack_filename)
46+
get_s3_client().upload_fileobj(csv_bytes, ack_bucket_name, ack_filename)
4747

4848

4949
def make_and_upload_the_ack_file(

lambdas/filenameprocessor/src/utils_for_filenameprocessor.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from datetime import timedelta
44

5-
from common.clients import logger, s3_client
5+
from common.clients import logger, get_s3_client
66
from constants import AUDIT_TABLE_TTL_DAYS
77

88

@@ -16,6 +16,7 @@ def get_creation_and_expiry_times(s3_response: dict) -> (str, int):
1616

1717
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
1818
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
19+
s3_client = get_s3_client()
1920
s3_client.copy_object(
2021
Bucket=bucket_name,
2122
CopySource={"Bucket": bucket_name, "Key": source_file_key},

lambdas/mesh_processor/Makefile

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
TEST_ENV := @PYTHONPATH=src:tests:../shared/src
2+
13
build:
24
docker build -t mesh-lambda-build .
35

@@ -6,6 +8,15 @@ package: build
68
docker run --rm -v $(shell pwd)/build:/build mesh-lambda-build
79

810
test:
9-
python -m unittest
11+
$(TEST_ENV) python -m unittest
12+
13+
coverage-run:
14+
$(TEST_ENV) coverage run --source=src -m unittest discover
15+
16+
coverage-report:
17+
$(TEST_ENV) coverage report -m
18+
19+
coverage-html:
20+
$(TEST_ENV) coverage html
1021

1122
.PHONY: build package test

lambdas/mesh_processor/src/converter.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,7 @@
99
DESTINATION_BUCKET_NAME = os.getenv("DESTINATION_BUCKET_NAME")
1010
UNEXPECTED_EOF_ERROR = "Unexpected EOF"
1111

12-
logging.basicConfig(level=logging.INFO)
13-
logger = logging.getLogger()
14-
15-
s3_client = boto3.client("s3")
12+
from common.clients import logger, get_s3_client
1613

1714

1815
def parse_headers(headers_str: str) -> dict[str, str]:
@@ -74,6 +71,7 @@ def stream_part_body(input_file: BinaryIO, boundary: bytes, output_file: BinaryI
7471

7572

7673
def move_file(source_bucket: str, source_key: str, destination_bucket: str, destination_key: str) -> None:
74+
s3_client = get_s3_client()
7775
s3_client.copy_object(
7876
CopySource={"Bucket": source_bucket, "Key": source_key},
7977
Bucket=destination_bucket,
@@ -89,6 +87,7 @@ def move_file(source_bucket: str, source_key: str, destination_bucket: str, dest
8987

9088

9189
def transfer_multipart_content(bucket_name: str, file_key: str, boundary: bytes, filename: str) -> None:
90+
s3_client = get_s3_client()
9291
with open(f"s3://{bucket_name}/{file_key}", "rb", transport_params={"client": s3_client}) as input_file:
9392
read_until_part_start(input_file, boundary)
9493

@@ -122,6 +121,7 @@ def process_record(record: dict) -> None:
122121
file_key = record["s3"]["object"]["key"]
123122
logger.info(f"Processing {file_key}")
124123

124+
s3_client = get_s3_client()
125125
response = s3_client.head_object(
126126
Bucket=bucket_name,
127127
Key=file_key,

lambdas/recordprocessor/src/file_level_validation.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from csv import DictReader
77

88
from audit_table import update_audit_table_status
9-
from common.clients import logger, s3_client
9+
from common.clients import logger, get_s3_client
1010
from constants import (
1111
ARCHIVE_DIR_NAME,
1212
EXPECTED_CSV_HEADERS,
@@ -63,6 +63,7 @@ def get_permitted_operations(supplier: str, vaccine_type: str, allowed_permissio
6363

6464
def move_file(bucket_name: str, source_file_key: str, destination_file_key: str) -> None:
6565
"""Moves a file from one location to another within a single S3 bucket by copying and then deleting the file."""
66+
s3_client = get_s3_client()
6667
s3_client.copy_object(
6768
Bucket=bucket_name,
6869
CopySource={"Bucket": bucket_name, "Key": source_file_key},

lambdas/recordprocessor/src/make_and_upload_ack_file.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from csv import writer
44
from io import BytesIO, StringIO
55

6-
from common.clients import s3_client
6+
from common.clients import get_s3_client
77
from constants import ACK_BUCKET_NAME
88

99

@@ -46,7 +46,7 @@ def upload_ack_file(file_key: str, ack_data: dict, created_at_formatted_string:
4646
# Upload the CSV file to S3
4747
csv_buffer.seek(0)
4848
csv_bytes = BytesIO(csv_buffer.getvalue().encode("utf-8"))
49-
s3_client.upload_fileobj(csv_bytes, ACK_BUCKET_NAME, ack_filename)
49+
get_s3_client().upload_fileobj(csv_bytes, ACK_BUCKET_NAME, ack_filename)
5050

5151

5252
def make_and_upload_ack_file(

lambdas/recordprocessor/src/utils_for_recordprocessor.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from csv import DictReader
55
from io import TextIOWrapper
66

7-
from common.clients import s3_client
7+
from common.clients import get_s3_client
88

99

1010
def get_environment() -> str:
@@ -16,7 +16,7 @@ def get_environment() -> str:
1616

1717
def get_csv_content_dict_reader(file_key: str, encoder="utf-8") -> DictReader:
1818
"""Returns the requested file contents from the source bucket in the form of a DictReader"""
19-
response = s3_client.get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
19+
response = get_s3_client().get_object(Bucket=os.getenv("SOURCE_BUCKET_NAME"), Key=file_key)
2020
binary_io = response["Body"]
2121
text_io = TextIOWrapper(binary_io, encoding=encoder, newline="")
2222
return DictReader(text_io, delimiter="|")

lambdas/recordprocessor/tests/test_recordprocessor_edge_cases.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import os
22
import unittest
33
from io import BytesIO
4-
from unittest.mock import call, patch
4+
from unittest.mock import Mock, call, patch
55

66
from batch_processor import process_csv_to_fhir
77
from utils_for_recordprocessor_tests.utils_for_recordprocessor_tests import (
@@ -16,8 +16,7 @@ def setUp(self):
1616
self.mock_logger_error = create_patch("logging.Logger.error")
1717
self.mock_send_to_kinesis = create_patch("batch_processor.send_to_kinesis")
1818
self.mock_map_target_disease = create_patch("batch_processor.map_target_disease")
19-
self.mock_s3_get_object = create_patch("utils_for_recordprocessor.s3_client.get_object")
20-
self.mock_s3_put_object = create_patch("utils_for_recordprocessor.s3_client.put_object")
19+
self.mock_get_s3_client = create_patch("utils_for_recordprocessor.get_s3_client")
2120
self.mock_make_and_move = create_patch("file_level_validation.make_and_upload_ack_file")
2221
self.mock_move_file = create_patch("file_level_validation.move_file")
2322
self.mock_get_permitted_operations = create_patch("file_level_validation.get_permitted_operations")
@@ -63,7 +62,9 @@ def test_process_large_file_cp1252(self):
6362
data = self.insert_cp1252_at_end(data, b"D\xe9cembre", 2)
6463
ret1 = {"Body": BytesIO(b"".join(data))}
6564
ret2 = {"Body": BytesIO(b"".join(data))}
66-
self.mock_s3_get_object.side_effect = [ret1, ret2]
65+
mock_s3 = Mock()
66+
mock_s3.get_object.side_effect = [ret1, ret2]
67+
self.mock_get_s3_client.return_value = mock_s3
6768
self.mock_map_target_disease.return_value = "some disease"
6869

6970
message_body = {
@@ -80,7 +81,8 @@ def test_process_large_file_cp1252(self):
8081
self.mock_logger_warning.assert_called()
8182
warning_call_args = self.mock_logger_warning.call_args[0][0]
8283
self.assertTrue(warning_call_args.startswith("Encoding Error: 'utf-8' codec can't decode byte 0xe9"))
83-
self.mock_s3_get_object.assert_has_calls(
84+
# TODO: when running standalone this expects BucketNames.SOURCE. not clear why.
85+
mock_s3.get_object.assert_has_calls(
8486
[
8587
call(Bucket=None, Key="test-filename"),
8688
call(Bucket=None, Key="processing/test-filename"),
@@ -94,7 +96,9 @@ def test_process_large_file_utf8(self):
9496
data = self.expand_test_data(data, n_rows)
9597
ret1 = {"Body": BytesIO(b"".join(data))}
9698
ret2 = {"Body": BytesIO(b"".join(data))}
97-
self.mock_s3_get_object.side_effect = [ret1, ret2]
99+
mock_s3 = Mock()
100+
mock_s3.get_object.side_effect = [ret1, ret2]
101+
self.mock_get_s3_client.return_value = mock_s3
98102
self.mock_map_target_disease.return_value = "some disease"
99103

100104
message_body = {
@@ -118,7 +122,9 @@ def test_process_small_file_cp1252(self):
118122

119123
ret1 = {"Body": BytesIO(b"".join(data))}
120124
ret2 = {"Body": BytesIO(b"".join(data))}
121-
self.mock_s3_get_object.side_effect = [ret1, ret2]
125+
mock_s3 = Mock()
126+
mock_s3.get_object.side_effect = [ret1, ret2]
127+
self.mock_get_s3_client.return_value = mock_s3
122128
self.mock_map_target_disease.return_value = "some disease"
123129

124130
message_body = {
@@ -143,7 +149,9 @@ def test_process_small_file_utf8(self):
143149

144150
ret1 = {"Body": BytesIO(b"".join(data))}
145151
ret2 = {"Body": BytesIO(b"".join(data))}
146-
self.mock_s3_get_object.side_effect = [ret1, ret2]
152+
mock_s3 = Mock()
153+
mock_s3.get_object.side_effect = [ret1, ret2]
154+
self.mock_get_s3_client.return_value = mock_s3
147155
self.mock_map_target_disease.return_value = "some disease"
148156

149157
message_body = {

lambdas/shared/src/common/clients.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,6 @@
1414

1515
REGION_NAME = os.getenv("AWS_REGION", "eu-west-2")
1616

17-
s3_client = boto3_client("s3", region_name=REGION_NAME)
18-
19-
# for lambdas which require a global s3_client
2017
global_s3_client = None
2118

2219

0 commit comments

Comments
 (0)