Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions app/notifications/process_notifications.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
import uuid
from datetime import datetime

Expand Down Expand Up @@ -34,6 +35,7 @@
dao_delete_notifications_by_id,
)
from app.models import Notification
from app.request_timings import add_context, record_timing
from app.utils import (
parse_and_format_phone_number,
try_download_template_email_file_from_s3,
Expand Down Expand Up @@ -189,9 +191,13 @@ def persist_notification(

# if simulated create a Notification model to return but do not persist the Notification to the dB
if not simulated:
db_insert_start = time.perf_counter()
dao_create_notification(notification=notification, _autocommit=_autocommit)
record_timing("db_insert_ms", time.perf_counter() - db_insert_start)
# Not sure how we can rollback
redis_limits_start = time.perf_counter()
increment_daily_limit_caches(service, notification, key_type)
record_timing("redis_daily_limit_ms", time.perf_counter() - redis_limits_start)

return notification

Expand Down Expand Up @@ -236,7 +242,10 @@ def send_notification_to_queue_detached(key_type, notification_type, notificatio
deliver_task = get_pdf_for_templated_letter

try:
add_context(queue=queue, celery_task_name=getattr(deliver_task, "name", deliver_task.__name__))
queue_publish_start = time.perf_counter()
deliver_task.apply_async([str(notification_id)], queue=queue)
record_timing("queue_publish_ms", time.perf_counter() - queue_publish_start)
except Exception:
dao_delete_notifications_by_id(notification_id)
raise
Expand Down
34 changes: 34 additions & 0 deletions app/request_timings.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from __future__ import annotations

from flask import g, has_request_context


def init_request_timings() -> None:
if not has_request_context():
return
if not hasattr(g, "request_timings"):
g.request_timings = {}
if not hasattr(g, "request_timing_context"):
g.request_timing_context = {}


def record_timing(name: str, duration_seconds: float) -> None:
if not has_request_context():
return
init_request_timings()
g.request_timings[name] = round(duration_seconds * 1000.0, 3)


def add_context(**kwargs: object) -> None:
if not has_request_context():
return
init_request_timings()
g.request_timing_context.update({k: v for k, v in kwargs.items() if v is not None})


def get_timings() -> tuple[dict[str, float], dict[str, object]]:
if not has_request_context():
return {}, {}
timings = getattr(g, "request_timings", {})
context = getattr(g, "request_timing_context", {})
return timings, context
56 changes: 56 additions & 0 deletions app/v2/notifications/post_notifications.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import base64
import functools
import time
import uuid
from datetime import datetime

Expand Down Expand Up @@ -52,6 +53,7 @@
validate_and_format_recipient,
validate_template,
)
from app.request_timings import add_context, get_timings, init_request_timings, record_timing
from app.schema_validation import validate
from app.utils import try_parse_and_format_phone_number
from app.v2.errors import BadRequestError
Expand All @@ -76,8 +78,37 @@
)


def _log_slow_request(request_start: float, notification_type: str, notification_id: str | None = None) -> None:
request_time = time.perf_counter() - request_start
if request_time <= 1.0:
return

timings, context = get_timings()
service_id = getattr(authenticated_service, "id", None)
api_key_type = getattr(api_user, "key_type", None)
add_context(
notification_type=notification_type,
service_id=str(service_id) if service_id is not None else None,
api_key_type=api_key_type,
endpoint=request.endpoint,
path=request.path,
notification_id=notification_id,
request_time_seconds=round(request_time, 3),
)
_, context = get_timings()
current_app.logger.info(
"slow request breakdown",
extra={
"timings_ms": timings,
**context,
},
)


@v2_notification_blueprint.route(f"/{LETTER_TYPE}", methods=["POST"])
def post_precompiled_letter_notification():
init_request_timings()
request_start = time.perf_counter()
check_rate_limiting(authenticated_service, api_user, notification_type=LETTER_TYPE)

request_json = get_valid_json()
Expand All @@ -104,13 +135,21 @@ def post_precompiled_letter_notification():
precompiled=True,
)

notification_id = None
if isinstance(notification, dict):
notification_id = str(notification.get("id"))
_log_slow_request(request_start, LETTER_TYPE, notification_id=notification_id)

return jsonify(notification), 201


@v2_notification_blueprint.route("/<notification_type>", methods=["POST"])
def post_notification(notification_type):
init_request_timings()
request_start = time.perf_counter()
check_rate_limiting(authenticated_service, api_user, notification_type=notification_type)

json_parse_start = time.perf_counter()
with POST_NOTIFICATION_JSON_PARSE_DURATION_SECONDS.time():
request_json = get_valid_json()

Expand All @@ -122,9 +161,11 @@ def post_notification(notification_type):
form = validate(request_json, post_letter_request)
else:
abort(404)
record_timing("json_parse_validate_ms", time.perf_counter() - json_parse_start)

check_service_has_permission(authenticated_service, notification_type)

validate_template_start = time.perf_counter()
template, template_with_content = validate_template(
template_id=form["template_id"],
personalisation=form.get("personalisation", {}),
Expand All @@ -133,6 +174,7 @@ def post_notification(notification_type):
check_char_count=False,
recipient=form.get("email_address"),
)
record_timing("validate_template_ms", time.perf_counter() - validate_template_start)

reply_to = get_reply_to_text(notification_type, form, template)

Expand All @@ -156,6 +198,11 @@ def post_notification(notification_type):
unsubscribe_link=form.get("one_click_unsubscribe_url", None),
)

notification_id = None
if isinstance(notification, dict):
notification_id = str(notification.get("id"))
_log_slow_request(request_start, notification_type, notification_id=notification_id)

return jsonify(notification), 201


Expand All @@ -172,27 +219,34 @@ def process_sms_or_email_notification(
notification_id = uuid.uuid4()
form_send_to = form["email_address"] if notification_type == EMAIL_TYPE else form["phone_number"]

recipient_validate_start = time.perf_counter()
recipient_data = validate_and_format_recipient(
send_to=form_send_to, key_type=api_user.key_type, service=service, notification_type=notification_type
)
record_timing("recipient_validate_ms", time.perf_counter() - recipient_validate_start)

send_to = recipient_data["normalised_to"] if type(recipient_data) is dict else recipient_data

# Do not persist or send notification to the queue if it is a simulated recipient
simulated = simulated_recipient(send_to, notification_type)

document_upload_start = time.perf_counter()
personalisation, document_download_count = process_document_uploads(
form.get("personalisation"),
service,
send_to=send_to,
simulated=simulated,
)
record_timing("document_upload_ms", time.perf_counter() - document_upload_start)
add_context(document_download_count=document_download_count)
if document_download_count:
# We changed personalisation which means we need to update the content
template_with_content.values = personalisation

# validate content length after url is replaced in personalisation.
content_validation_start = time.perf_counter()
check_is_message_too_long(template_with_content)
record_timing("content_validation_ms", time.perf_counter() - content_validation_start)

response = create_response_for_post_notification(
notification_id=notification_id,
Expand All @@ -206,6 +260,7 @@ def process_sms_or_email_notification(
template_with_content=template_with_content,
)

persist_start = time.perf_counter()
persist_notification(
notification_id=notification_id,
template_id=template.id,
Expand All @@ -222,6 +277,7 @@ def process_sms_or_email_notification(
unsubscribe_link=unsubscribe_link,
document_download_count=document_download_count,
)
record_timing("persist_notification_ms", time.perf_counter() - persist_start)

if not simulated:
send_notification_to_queue_detached(
Expand Down
27 changes: 22 additions & 5 deletions gunicorn_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,29 @@ def child_exit(server, worker):
multiprocess.mark_process_dead(worker.pid)


workers = 4
worker_class = "eventlet"
worker_connections = 8 # limit runaway greenthread creation
workers = int(os.getenv("GUNICORN_WORKERS", "4"))
worker_class = os.getenv("GUNICORN_WORKER_CLASS", "eventlet")

# For async workers (eventlet/gevent) this controls concurrent clients per worker.
# For sync/gthread this setting is ignored.
worker_connections = int(os.getenv("GUNICORN_WORKER_CONNECTIONS", "256"))

# Used by gthread worker_class (ignored by eventlet/gevent).
threads = int(os.getenv("GUNICORN_THREADS", "4"))

statsd_host = "{}:8125".format(os.getenv("STATSD_HOST"))
keepalive = 0 # disable temporarily for diagnosing issues
timeout = int(os.getenv("HTTP_SERVE_TIMEOUT_SECONDS", 30)) # though has little effect with eventlet worker_class
keepalive = int(os.getenv("GUNICORN_KEEPALIVE", "90"))

# If using eventlet, this has little effect for long-held requests.
timeout = int(os.getenv("HTTP_SERVE_TIMEOUT_SECONDS", "30"))

# When shutting down/restarting, wait this long for workers to finish requests.
graceful_timeout = int(os.getenv("GUNICORN_GRACEFUL_TIMEOUT", "30"))

# Optional worker recycling to reduce impact of slow leaks/fragmentation.
# Gunicorn uses 0 to disable.
max_requests = int(os.getenv("GUNICORN_MAX_REQUESTS", "0"))
max_requests_jitter = int(os.getenv("GUNICORN_MAX_REQUESTS_JITTER", "0"))

debug_post_threshold = os.getenv("NOTIFY_GUNICORN_DEBUG_POST_REQUEST_LOG_THRESHOLD_SECONDS", None)
if debug_post_threshold:
Expand Down
2 changes: 1 addition & 1 deletion tests/test_gunicorn_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,5 @@
def test_gunicorn_config():
assert workers == 4
assert worker_class == "eventlet"
assert keepalive == 0
assert keepalive == 90
assert timeout == 30