Skip to content

Commit 1c21817

Browse files
committed
add report v2
1 parent 9973afd commit 1c21817

File tree

3 files changed

+240
-85
lines changed

3 files changed

+240
-85
lines changed

app/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -561,7 +561,7 @@ class Config:
561561
NOTIFICATION_DEEP_HISTORY_DELETE_ARCHIVED = os.environ.get("NOTIFICATION_DEEP_HISTORY_DELETE_ARCHIVED", "1") == "1"
562562

563563
REPORT_REQUEST_NOTIFICATIONS_TIMEOUT_MINUTES = 30
564-
REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE = 2500
564+
REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE = 50000
565565

566566

567567
######################

app/db_copy_utils.py

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,204 @@
1+
"""
2+
Database COPY utilities for streaming large result sets directly to CSV.
3+
4+
This module provides functions to execute PostgreSQL COPY commands for efficient
5+
CSV generation. COPY runs entirely in the database engine, avoiding the overhead
6+
of Python object serialization.
7+
"""
8+
9+
from io import BytesIO
10+
from typing import Any
11+
from uuid import UUID
12+
13+
from flask import current_app
14+
from sqlalchemy import case, desc, func, text
15+
from sqlalchemy.orm import aliased
16+
17+
from app import db
18+
from app.constants import NOTIFICATION_REPORT_REQUEST_MAPPING
19+
from app.models import ApiKey, Job, Notification, TemplateHistory, User
20+
21+
22+
def build_notifications_copy_query(
23+
service_id: UUID,
24+
notification_type: str,
25+
notification_statuses: list[str],
26+
limit_days: int,
27+
chunk_size: int,
28+
older_than_id: UUID | None = None,
29+
) -> str:
30+
# Aliases for table names
31+
n = aliased(Notification)
32+
t = aliased(TemplateHistory)
33+
j = aliased(Job)
34+
u = aliased(User)
35+
a = aliased(ApiKey)
36+
37+
# Build recipient column (special handling for letters)
38+
recipient_col = case(
39+
(n.notification_type == "letter", func.regexp_replace(func.split_part(n.to, "\n", 1), "^ +| +$| ,$", "", "g")),
40+
else_=n.to,
41+
).label("Recipient")
42+
43+
# Build status column
44+
permanent_failure_msg = case(
45+
(n.notification_type == "email", "Email address doesn't exist"),
46+
(n.notification_type == "sms", "Phone number doesn't exist"),
47+
else_="Permanent failure",
48+
)
49+
50+
status_col = case(
51+
(n.status == "created", "Sending"),
52+
(n.status == "sending", "Sending"),
53+
(n.status == "pending", "Sending"),
54+
(n.status == "sent", "Sent"),
55+
(n.status == "delivered", "Delivered"),
56+
(n.status == "pending-virus-check", "Pending virus check"),
57+
(n.status == "virus-scan-failed", "Virus detected"),
58+
(n.status == "returned-letter", "Returned letter"),
59+
(n.status == "failed", "Failed"),
60+
(n.status == "technical-failure", "Tech issue"),
61+
(n.status == "temporary-failure", "Inbox not accepting messages right now"),
62+
(n.status == "permanent-failure", permanent_failure_msg),
63+
(n.status == "cancelled", "Cancelled"),
64+
(n.status == "validation-failed", "Validation failed"),
65+
else_=n.status,
66+
).label("Status")
67+
68+
# Build time column
69+
time_col = func.to_char(
70+
func.timezone("Europe/London", func.timezone("UTC", n.created_at)), "YYYY-MM-DD HH24:MI:SS"
71+
).label("Time")
72+
73+
# Build query
74+
query = (
75+
db.session.query(
76+
recipient_col,
77+
func.coalesce(n.client_reference, "").label("Reference"),
78+
t.name.label("Template"),
79+
n.notification_type.label("Type"),
80+
func.coalesce(u.name, "").label("Sent by"),
81+
func.coalesce(u.email_address, "").label("Sent by email"),
82+
func.coalesce(j.original_file_name, "").label("Job"),
83+
status_col,
84+
time_col,
85+
func.coalesce(a.name, "").label("API key name"),
86+
n.id, # For pagination (not exported to CSV)
87+
n.created_at, # For pagination (not exported to CSV)
88+
)
89+
.select_from(n)
90+
.join(t, t.id == n.template_id)
91+
.outerjoin(j, j.id == n.job_id)
92+
.outerjoin(u, u.id == n.created_by_id)
93+
.outerjoin(a, a.id == n.api_key_id)
94+
)
95+
96+
# Add filters
97+
query = query.filter(
98+
n.service_id == service_id,
99+
n.notification_type == notification_type,
100+
n.created_at >= func.now() - text(f"interval '{limit_days} days'"),
101+
n.key_type != "test",
102+
)
103+
104+
if notification_statuses:
105+
query = query.filter(n.status.in_(notification_statuses))
106+
107+
if older_than_id:
108+
# For pagination: fetch older notifications
109+
older_than_created_at = db.session.query(Notification.created_at).filter(
110+
Notification.id == older_than_id, Notification.service_id == service_id
111+
).scalar_subquery()
112+
query = query.filter(n.created_at < older_than_created_at)
113+
114+
# Add ordering and limit
115+
query = query.order_by(desc(n.created_at), desc(n.id)).limit(chunk_size)
116+
117+
# Compile to SQL string
118+
compiled = query.statement.compile(dialect=db.engine.dialect, compile_kwargs={"literal_binds": True})
119+
return str(compiled)
120+
121+
def execute_copy_to_bytes(
122+
query: str,
123+
include_header: bool = True,
124+
) -> tuple[bytes, UUID | None, int]:
125+
# Get a raw connection from the pool
126+
conn = db.engine.raw_connection()
127+
try:
128+
cursor = conn.cursor()
129+
130+
# First, execute the query to get the last notification ID
131+
# We need this for pagination
132+
cursor.execute(query.strip())
133+
rows = cursor.fetchall()
134+
row_count = len(rows)
135+
136+
last_id = None
137+
if row_count > 0:
138+
# The last two columns are id and created_at (used only for pagination)
139+
last_id = rows[-1][-2] # second-to-last column is the ID
140+
141+
# Now build the COPY command to export the same data as CSV
142+
# We exclude the id and created_at columns from CSV output
143+
copy_query = f"""
144+
COPY (
145+
SELECT
146+
"Recipient",
147+
"Reference",
148+
"Template",
149+
"Type",
150+
"Sent by",
151+
"Sent by email",
152+
"Job",
153+
"Status",
154+
"Time",
155+
"API key name"
156+
FROM ({query.strip()}) AS subquery
157+
) TO STDOUT WITH CSV {"HEADER" if include_header else ""}
158+
"""
159+
160+
# Create a BytesIO buffer to capture the COPY output
161+
buffer = BytesIO()
162+
163+
# Execute COPY and stream to buffer
164+
cursor.copy_expert(copy_query, buffer)
165+
166+
# Get the bytes from the buffer
167+
csv_bytes = buffer.getvalue()
168+
169+
current_app.logger.info(
170+
f"COPY command executed successfully. "
171+
f"Rows: {row_count}, "
172+
f"Size: {len(csv_bytes)} bytes, "
173+
f"Last ID: {last_id}"
174+
)
175+
176+
return csv_bytes, last_id, row_count
177+
178+
finally:
179+
# Always close the raw connection
180+
conn.close()
181+
182+
183+
def get_notifications_csv_chunk(
184+
service_id: UUID,
185+
notification_type: str,
186+
notification_status_filter: str,
187+
limit_days: int,
188+
chunk_size: int,
189+
older_than_id: UUID | None = None,
190+
include_header: bool = True,
191+
) -> tuple[bytes, UUID | None, int]:
192+
# Convert status filter to actual statuses
193+
notification_statuses = NOTIFICATION_REPORT_REQUEST_MAPPING.get(notification_status_filter, [])
194+
195+
query = build_notifications_copy_query(
196+
service_id=service_id,
197+
notification_type=notification_type,
198+
notification_statuses=notification_statuses,
199+
limit_days=limit_days,
200+
chunk_size=chunk_size,
201+
older_than_id=older_than_id,
202+
)
203+
204+
return execute_copy_to_bytes(query, include_header=include_header)

app/report_requests/process_notifications_report.py

Lines changed: 35 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
import csv
2-
from io import StringIO
31
from typing import Any
42
from uuid import UUID
53

@@ -12,10 +10,9 @@
1210
s3_multipart_upload_part,
1311
)
1412

15-
from app.constants import NOTIFICATION_REPORT_REQUEST_MAPPING
16-
from app.dao.notifications_dao import get_notifications_for_service
1713
from app.dao.report_requests_dao import dao_get_report_request_by_id
1814
from app.dao.service_data_retention_dao import fetch_service_data_retention_by_notification_type
15+
from app.db_copy_utils import get_notifications_csv_chunk
1916

2017

2118
class ReportRequestProcessor:
@@ -25,17 +22,16 @@ def __init__(self, service_id: UUID, report_request_id: UUID):
2522
self.report_request = dao_get_report_request_by_id(service_id, report_request_id)
2623
self.notification_type = self.report_request.parameter["notification_type"]
2724
self.notification_status = self.report_request.parameter["notification_status"]
28-
self.page_size = current_app.config.get("REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE")
25+
self.chunk_size = current_app.config.get("REPORT_REQUEST_NOTIFICATIONS_CSV_BATCH_SIZE")
2926
self.s3_bucket = current_app.config["S3_BUCKET_REPORT_REQUESTS_DOWNLOAD"]
3027
self.filename = f"notifications_report/{report_request_id}.csv"
3128
self.upload_id: str | None = None
3229
self.parts: list[dict[str, Any]] = []
3330
self.part_number = 1
34-
self.csv_buffer = StringIO()
35-
self.csv_writer = csv.writer(self.csv_buffer)
31+
self.current_buffer = b""
32+
self.is_first_chunk = True
3633

3734
def process(self) -> None:
38-
self._initialize_csv()
3935
self._start_multipart_upload()
4036

4137
try:
@@ -46,20 +42,7 @@ def process(self) -> None:
4642
self._abort_upload()
4743
raise e
4844

49-
def _initialize_csv(self) -> None:
50-
headers = [
51-
"Recipient",
52-
"Reference",
53-
"Template",
54-
"Type",
55-
"Sent by",
56-
"Sent by email",
57-
"Job",
58-
"Status",
59-
"Time",
60-
"API key name",
61-
]
62-
self.csv_writer.writerow(headers)
45+
6346

6447
def _start_multipart_upload(self) -> None:
6548
response = s3_multipart_upload_create(self.s3_bucket, self.filename)
@@ -68,78 +51,46 @@ def _start_multipart_upload(self) -> None:
6851
def _fetch_and_upload_notifications(self) -> None:
6952
service_retention = fetch_service_data_retention_by_notification_type(self.service_id, self.notification_type)
7053
limit_days = service_retention.days_of_retention if service_retention else 7
71-
older_than = None
72-
is_notification = True
73-
while is_notification:
74-
serialized_notifications = self._fetch_serialized_notifications(limit_days, older_than)
54+
older_than_id = None
55+
has_more_rows = True
56+
57+
while has_more_rows:
58+
# Get a chunk of CSV data from the database using COPY
59+
csv_bytes, last_id, row_count = get_notifications_csv_chunk(
60+
service_id=self.service_id,
61+
notification_type=self.notification_type,
62+
notification_status_filter=self.notification_status,
63+
limit_days=limit_days,
64+
chunk_size=self.chunk_size,
65+
older_than_id=older_than_id,
66+
include_header=self.is_first_chunk,
67+
)
68+
69+
# Add to buffer
70+
self.current_buffer += csv_bytes
71+
72+
# Upload part if buffer is large enough
73+
self._upload_part_if_needed()
7574

76-
is_notification = len(serialized_notifications) != 0
75+
# Update pagination
76+
has_more_rows = row_count >= self.chunk_size
77+
older_than_id = last_id
78+
self.is_first_chunk = False
7779

78-
csv_data = self._convert_notifications_to_csv(serialized_notifications)
79-
self.csv_writer.writerows(csv_data)
80-
self._upload_csv_part_if_needed()
81-
older_than = serialized_notifications[-1]["id"] if is_notification else None
8280
# Upload any remaining data
8381
self._upload_remaining_data()
8482

85-
def _fetch_serialized_notifications(self, limit_days: int, older_than: str | None) -> list[dict[str, Any]]:
86-
statuses = NOTIFICATION_REPORT_REQUEST_MAPPING[self.notification_status]
87-
88-
notifications = get_notifications_for_service(
89-
service_id=self.service_id,
90-
filter_dict={
91-
"template_type": self.notification_type,
92-
"status": statuses,
93-
},
94-
page_size=self.page_size,
95-
count_pages=False,
96-
limit_days=limit_days,
97-
include_jobs=True,
98-
with_personalisation=False,
99-
include_from_test_key=False,
100-
error_out=False,
101-
include_one_off=True,
102-
older_than=older_than,
103-
)
104-
105-
serialized_notifications = [notification.serialize_for_csv() for notification in notifications]
106-
return serialized_notifications
107-
108-
def _convert_notifications_to_csv(self, serialized_notifications: list[dict[str, Any]]) -> list[tuple]:
109-
values = []
110-
for notification in serialized_notifications:
111-
values.append(
112-
(
113-
# the recipient for precompiled letters is the full address block
114-
notification["recipient"].splitlines()[0].lstrip().rstrip(" ,"),
115-
notification["client_reference"],
116-
notification["template_name"],
117-
notification["template_type"],
118-
notification["created_by_name"] or "",
119-
notification["created_by_email_address"] or "",
120-
notification["job_name"] or "",
121-
notification["status"],
122-
notification["created_at"],
123-
notification["api_key_name"] or "",
124-
)
125-
)
126-
return values
12783

128-
def _upload_csv_part_if_needed(self) -> None:
129-
data_bytes = self.csv_buffer.getvalue().encode("utf-8")
130-
if len(data_bytes) >= S3_MULTIPART_UPLOAD_MIN_PART_SIZE:
131-
self._upload_part(data_bytes)
13284

85+
def _upload_part_if_needed(self) -> None:
86+
if len(self.current_buffer) >= S3_MULTIPART_UPLOAD_MIN_PART_SIZE:
87+
self._upload_part(self.current_buffer)
13388
# Reset the buffer for the next part
134-
# truncate(0) does not reset the cursor so seek(0) is needed to reset the cursor
135-
self.csv_buffer.seek(0)
136-
self.csv_buffer.truncate(0)
137-
self.csv_writer = csv.writer(self.csv_buffer)
89+
self.current_buffer = b""
13890

13991
def _upload_remaining_data(self) -> None:
140-
data_bytes = self.csv_buffer.getvalue().encode("utf-8")
141-
if len(data_bytes) > 0:
142-
self._upload_part(data_bytes)
92+
if len(self.current_buffer) > 0:
93+
self._upload_part(self.current_buffer)
14394

14495
def _upload_part(self, data_bytes: bytes) -> None:
14596
response = s3_multipart_upload_part(

0 commit comments

Comments
 (0)