Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .github/workflows/base-lambdas-reusable-deploy-all.yml
Original file line number Diff line number Diff line change
Expand Up @@ -810,3 +810,17 @@ jobs:
lambda_layer_names: "core_lambda_layer,reports_lambda_layer"
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}

deploy_report_distribution_lambda:
name: Deploy Report Distribution
uses: ./.github/workflows/base-lambdas-reusable-deploy.yml
with:
environment: ${{ inputs.environment }}
python_version: ${{ inputs.python_version }}
build_branch: ${{ inputs.build_branch }}
sandbox: ${{ inputs.sandbox }}
lambda_handler_name: report_distribution_handler
lambda_aws_name: reportDistribution
lambda_layer_names: "core_lambda_layer,reports_lambda_layer"
secrets:
AWS_ASSUME_ROLE: ${{ secrets.AWS_ASSUME_ROLE }}
61 changes: 61 additions & 0 deletions lambdas/handlers/report_distribution_handler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import os
from typing import Any, Dict

from repositories.reporting.report_contact_repository import ReportContactRepository
from services.base.s3_service import S3Service
from services.email_service import EmailService
from services.reporting.report_distribution_service import ReportDistributionService
from utils.audit_logging_setup import LoggingService
from utils.decorators.ensure_env_var import ensure_environment_variables
from utils.decorators.handle_lambda_exceptions import handle_lambda_exceptions
from utils.decorators.override_error_check import override_error_check
from utils.decorators.set_audit_arg import set_request_context_for_logging

logger = LoggingService(__name__)


@ensure_environment_variables(
names=[
"REPORT_BUCKET_NAME",
"CONTACT_TABLE_NAME",
"PRM_MAILBOX_EMAIL",
"SES_FROM_ADDRESS",
]
)
@override_error_check
@handle_lambda_exceptions
@set_request_context_for_logging
def lambda_handler(event, context) -> Dict[str, Any]:
action = event.get("action")
if action not in {"list", "process_one"}:
raise ValueError("Invalid action. Expected 'list' or 'process_one'.")

bucket = event.get("bucket") or os.environ["REPORT_BUCKET_NAME"]
contact_table = os.environ["CONTACT_TABLE_NAME"]
prm_mailbox = os.environ["PRM_MAILBOX_EMAIL"]
from_address = os.environ["SES_FROM_ADDRESS"]

s3_service = S3Service()
contact_repo = ReportContactRepository(contact_table)
email_service = EmailService()

service = ReportDistributionService(
s3_service=s3_service,
contact_repo=contact_repo,
email_service=email_service,
bucket=bucket,
from_address=from_address,
prm_mailbox=prm_mailbox,
)

if action == "list":
prefix = event["prefix"]
keys = service.list_xlsx_keys(prefix=prefix)
logger.info(f"List mode: returning {len(keys)} key(s) for prefix={prefix}")
return {"bucket": bucket, "prefix": prefix, "keys": keys}

key = event["key"]
ods_code = service.extract_ods_code_from_key(key)
service.process_one_report(ods_code=ods_code, key=key)
logger.info(f"Process-one mode: processed ods={ods_code}, key={key}")
return {"status": "ok", "bucket": bucket, "key": key, "ods_code": ods_code}
64 changes: 53 additions & 11 deletions lambdas/handlers/report_orchestration_handler.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import os
import tempfile
from datetime import datetime, timedelta, timezone
from typing import Any, Dict

from repositories.reporting.reporting_dynamo_repository import ReportingDynamoRepository
from services.base.s3_service import S3Service
from services.reporting.excel_report_generator_service import ExcelReportGenerator
from services.reporting.report_orchestration_service import ReportOrchestrationService
from utils.audit_logging_setup import LoggingService
Expand All @@ -23,34 +24,75 @@ def calculate_reporting_window():

yesterday_7am = today_7am - timedelta(days=1)

return (
int(yesterday_7am.timestamp()),
int(today_7am.timestamp()),
)
return int(yesterday_7am.timestamp()), int(today_7am.timestamp())


def get_report_date_folder() -> str:
return datetime.now(timezone.utc).strftime("%Y-%m-%d")


def build_s3_key(ods_code: str, report_date: str) -> str:
return f"Report-Orchestration/{report_date}/{ods_code}.xlsx"


@ensure_environment_variables(
names=["BULK_UPLOAD_REPORT_TABLE_NAME"]
names=[
"BULK_UPLOAD_REPORT_TABLE_NAME",
"REPORT_BUCKET_NAME",
]
)
@override_error_check
@handle_lambda_exceptions
@set_request_context_for_logging
def lambda_handler(event, context):
def lambda_handler(event, context) -> Dict[str, Any]:
logger.info("Report orchestration lambda invoked")
table_name = os.getenv("BULK_UPLOAD_REPORT_TABLE_NAME")

table_name = os.environ["BULK_UPLOAD_REPORT_TABLE_NAME"]
report_bucket = os.environ["REPORT_BUCKET_NAME"]

repository = ReportingDynamoRepository(table_name)
excel_generator = ExcelReportGenerator()
s3_service = S3Service()

service = ReportOrchestrationService(
repository=repository,
excel_generator=excel_generator,
)

window_start, window_end = calculate_reporting_window()
tmp_dir = tempfile.mkdtemp()
report_date = get_report_date_folder()
prefix = f"Report-Orchestration/{report_date}/"

service.process_reporting_window(
generated_files = service.process_reporting_window(
window_start_ts=window_start,
window_end_ts=window_end,
output_dir=tmp_dir,
)

if not generated_files:
logger.info("No reports generated; exiting")
return {
"report_date": report_date,
"bucket": report_bucket,
"prefix": prefix,
"keys": [],
}

keys = []
for ods_code, local_path in generated_files.items():
key = build_s3_key(ods_code, report_date)
s3_service.upload_file_with_extra_args(
file_name=local_path,
s3_bucket_name=report_bucket,
file_key=key,
extra_args={"ServerSideEncryption": "aws:kms"},
)
keys.append(key)
logger.info(f"Uploaded report for ODS={ods_code} to s3://{report_bucket}/{key}")

logger.info(f"Generated and uploaded {len(keys)} report(s) for report_date={report_date}")
return {
"report_date": report_date,
"bucket": report_bucket,
"prefix": prefix,
"keys": keys,
}
17 changes: 17 additions & 0 deletions lambdas/repositories/reporting/report_contact_repository.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from services.base.dynamo_service import DynamoDBService


class ReportContactRepository:
def __init__(self, table_name: str):
self.table_name = table_name
self.dynamo = DynamoDBService()

def get_contact_email(self, ods_code: str) -> str | None:
resp = self.dynamo.get_item(
table_name=self.table_name,
key={"OdsCode": ods_code},
)
item = (resp or {}).get("Item")
if not item:
return None
return item.get("Email")
47 changes: 33 additions & 14 deletions lambdas/repositories/reporting/reporting_dynamo_repository.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
from datetime import timedelta
from typing import Dict, List

from boto3.dynamodb.conditions import Attr
from boto3.dynamodb.conditions import Key
from services.base.dynamo_service import DynamoDBService
from utils.audit_logging_setup import LoggingService
from utils.utilities import utc_date_string, utc_date, utc_day_start_timestamp, utc_day_end_timestamp

logger = LoggingService(__name__)


class ReportingDynamoRepository:
def __init__(self, table_name: str):
self.table_name = table_name
Expand All @@ -17,19 +18,37 @@ def get_records_for_time_window(
start_timestamp: int,
end_timestamp: int,
) -> List[Dict]:
timestamp_index_name = "TimestampIndex"
logger.info(
f"Querying reporting table for window, "
f"table_name: {self.table_name}, "
f"start_timestamp: {start_timestamp}, "
f"end_timestamp: {end_timestamp}",
"Querying reporting table via TimestampIndex for window, "
f"table_name={self.table_name}, start_timestamp={start_timestamp}, end_timestamp={end_timestamp}",
)

filter_expression = Attr("Timestamp").between(
start_timestamp,
end_timestamp,
)
start_date = utc_date(start_timestamp)
end_date = utc_date(end_timestamp)

return self.dynamo_service.scan_whole_table(
table_name=self.table_name,
filter_expression=filter_expression,
)
records_for_window: List[Dict] = []
current_date = start_date

while current_date <= end_date:
day_start_ts = utc_day_start_timestamp(current_date)
day_end_ts = utc_day_end_timestamp(current_date)

effective_start_ts = max(start_timestamp, day_start_ts)
effective_end_ts = min(end_timestamp, day_end_ts)

key_condition = (
Key("Date").eq(current_date.isoformat())
& Key("Timestamp").between(effective_start_ts, effective_end_ts)
)

records_for_day = self.dynamo_service.query_by_key_condition_expression(
table_name=self.table_name,
index_name=timestamp_index_name,
key_condition_expression=key_condition,
)

records_for_window.extend(records_for_day)
current_date += timedelta(days=1)

return records_for_window
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
openpyxl==3.1.5
reportlab==4.3.1
reportlab==4.3.1
pyzipper==0.3.6
40 changes: 40 additions & 0 deletions lambdas/services/base/dynamo_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,6 +433,46 @@ def build_update_transaction_item(
}
}

def query_by_key_condition_expression(
self,
table_name: str,
key_condition_expression: ConditionBase,
index_name: str | None = None,
query_filter: Attr | ConditionBase | None = None,
limit: int | None = None,
) -> list[dict]:
table = self.get_table(table_name)

collected_items: list[dict] = []
exclusive_start_key: dict | None = None

while True:
query_params: dict = {"KeyConditionExpression": key_condition_expression}

if index_name:
query_params["IndexName"] = index_name
if query_filter:
query_params["FilterExpression"] = query_filter
if exclusive_start_key:
query_params["ExclusiveStartKey"] = exclusive_start_key
if limit:
query_params["Limit"] = limit

try:
response = table.query(**query_params)
except ClientError as exc:
logger.error(str(exc), {"Result": f"Unable to query table: {table_name}"})
raise

collected_items.extend(response.get("Items", []))

exclusive_start_key = response.get("LastEvaluatedKey")
if not exclusive_start_key:
break

return collected_items


def query_table_with_paginator(
self,
table_name: str,
Expand Down
Loading
Loading