Skip to content

Commit b558535

Browse files
authored
fix(hybridcloud) Deliver payloads concurrently (#66870)
We're hitting the ceiling of how many messages we can deliver concurrently with single threaded workers. Because most of our time is spent in IO, we can get more throughput by sacrificing strong ordering and delivering messages in small concurrent batches. Should a batch contain a delivery that needs to be retried progress will be stopped. This means we could delivery n-1 messages out of order if we hit an intermittent error. Initially I only want to use parallel delivery for large mailboxes as they would naturally not have strong ordering due to the amount of concurrency they create. Both the usage of threaded io and the threadpool size can be controlled by options.
1 parent 0ceac29 commit b558535

File tree

3 files changed

+416
-17
lines changed

3 files changed

+416
-17
lines changed

src/sentry/hybridcloud/tasks/deliver_webhooks.py

Lines changed: 122 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22
import logging
3+
from concurrent.futures import ThreadPoolExecutor, as_completed
34

45
import sentry_sdk
56
from django.db.models import Min, Subquery
@@ -8,6 +9,7 @@
89
from requests.models import HTTPError
910
from rest_framework import status
1011

12+
from sentry import options
1113
from sentry.exceptions import RestrictedIPAddress
1214
from sentry.hybridcloud.models.webhookpayload import BACKOFF_INTERVAL, MAX_ATTEMPTS, WebhookPayload
1315
from sentry.shared_integrations.exceptions import (
@@ -84,6 +86,7 @@ def schedule_webhook_delivery(**kwargs) -> None:
8486
metrics.distribution(
8587
"hybridcloud.schedule_webhook_delivery.mailbox_count", scheduled_mailboxes.count()
8688
)
89+
use_parallel = options.get("hybridcloud.webhookpayload.use_parallel")
8790
for record in scheduled_mailboxes[:BATCH_SIZE]:
8891
# Reschedule the records that we will attempt to deliver next.
8992
# We update schedule_for in an attempt to minimize races for potentially in-flight batches.
@@ -92,11 +95,14 @@ def schedule_webhook_delivery(**kwargs) -> None:
9295
.order_by("id")
9396
.values("id")[:MAX_MAILBOX_DRAIN]
9497
)
95-
WebhookPayload.objects.filter(id__in=Subquery(mailbox_batch)).update(
98+
updated_count = WebhookPayload.objects.filter(id__in=Subquery(mailbox_batch)).update(
9699
schedule_for=timezone.now() + BATCH_SCHEDULE_OFFSET
97100
)
98-
99-
drain_mailbox.delay(record["id"])
101+
# If we have a full batch we should process in parallel as we're likely behind.
102+
if use_parallel and updated_count >= MAX_MAILBOX_DRAIN:
103+
drain_mailbox_parallel.delay(record["id"])
104+
else:
105+
drain_mailbox.delay(record["id"])
100106

101107

102108
@instrumented_task(
@@ -159,6 +165,119 @@ def drain_mailbox(payload_id: int) -> None:
159165
return
160166

161167

168+
@instrumented_task(
169+
name="sentry.hybridcloud.tasks.deliver_webhooks.drain_mailbox_parallel",
170+
queue="webhook.control",
171+
silo_mode=SiloMode.CONTROL,
172+
)
173+
def drain_mailbox_parallel(payload_id: int) -> None:
174+
"""
175+
Deliver messages from a mailbox in small parallel batches.
176+
177+
Parallel delivery sacrifices strict ordering for increased throughput.
178+
Because of the sequential delivery in a mailbox we can't get higher throughput
179+
by scheduling batches in parallel.
180+
181+
Messages will be delivered in small batches until one fails, the batch
182+
delay timeout is reached, or a message with a schedule_for greater than
183+
the current time is encountered. A message with a higher schedule_for value
184+
indicates that we have hit the start of another batch that has been scheduled.
185+
"""
186+
try:
187+
payload = WebhookPayload.objects.get(id=payload_id)
188+
except WebhookPayload.DoesNotExist:
189+
# We could have hit a race condition. Since we've lost already return
190+
# and let the other process continue, or a future process.
191+
metrics.incr("hybridcloud.deliver_webhooks.delivery", tags={"outcome": "race"})
192+
logger.info(
193+
"deliver_webhook_parallel.potential_race",
194+
extra={
195+
"id": payload_id,
196+
},
197+
)
198+
return
199+
200+
worker_threads = options.get("hybridcloud.webhookpayload.worker_threads")
201+
deadline = timezone.now() + BATCH_SCHEDULE_OFFSET
202+
request_failed = False
203+
204+
while True:
205+
# We have run until the end of our batch schedule delay. Break the loop so this worker can take another
206+
# task.
207+
if timezone.now() >= deadline:
208+
logger.info(
209+
"deliver_webhook_parallel.delivery_deadline",
210+
extra={
211+
"mailbox_name": payload.mailbox_name,
212+
},
213+
)
214+
metrics.incr(
215+
"hybridcloud.deliver_webhooks.delivery", tags={"outcome": "delivery_deadline"}
216+
)
217+
break
218+
219+
# Fetch records from the batch in batch_size blocks. This avoids reading
220+
# redundant data should we hit an error and should help keep query duration low.
221+
query = WebhookPayload.objects.filter(
222+
id__gte=payload.id, mailbox_name=payload.mailbox_name
223+
).order_by("id")
224+
225+
# No more messages to deliver
226+
if query.count() < 1:
227+
break
228+
229+
# Use a threadpool to send requests concurrently
230+
with ThreadPoolExecutor(max_workers=worker_threads) as threadpool:
231+
futures = {
232+
threadpool.submit(deliver_message_parallel, record)
233+
for record in query[:worker_threads]
234+
}
235+
for future in as_completed(futures):
236+
payload_record, err = future.result()
237+
238+
if err:
239+
# Was this the final attempt? Failing on a final attempt shouldn't stop
240+
# deliveries as we won't retry
241+
if payload_record.attempts >= MAX_ATTEMPTS:
242+
payload_record.delete()
243+
244+
metrics.incr(
245+
"hybridcloud.deliver_webhooks.delivery",
246+
tags={"outcome": "attempts_exceed"},
247+
)
248+
logger.info(
249+
"deliver_webhook_parallel.discard",
250+
extra={"id": payload_record.id, "attempts": payload_record.attempts},
251+
)
252+
else:
253+
metrics.incr(
254+
"hybridcloud.deliver_webhooks.delivery", tags={"outcome": "retry"}
255+
)
256+
payload_record.schedule_next_attempt()
257+
request_failed = True
258+
if not isinstance(err, DeliveryFailed):
259+
raise err
260+
else:
261+
# Delivery was successful
262+
payload_record.delete()
263+
metrics.incr("hybridcloud.deliver_webhooks.delivery", tags={"outcome": "ok"})
264+
metrics.distribution(
265+
"hybridcloud.deliver_webhooks.attempts", payload_record.attempts
266+
)
267+
268+
# If a delivery failed we should stop processing this mailbox and try again later.
269+
if request_failed:
270+
return
271+
272+
273+
def deliver_message_parallel(payload: WebhookPayload) -> tuple[WebhookPayload, Exception | None]:
274+
try:
275+
perform_request(payload)
276+
return (payload, None)
277+
except Exception as err:
278+
return (payload, err)
279+
280+
162281
def deliver_message(payload: WebhookPayload) -> None:
163282
"""Deliver a message if it still has delivery attempts remaining"""
164283
if payload.attempts >= MAX_ATTEMPTS:

src/sentry/options/defaults.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1619,6 +1619,18 @@
16191619

16201620
# Break glass controls
16211621
register("hybrid_cloud.rpc.disabled-service-methods", default=[], flags=FLAG_AUTOMATOR_MODIFIABLE)
1622+
1623+
register(
1624+
"hybridcloud.webhookpayload.use_parallel",
1625+
default=False,
1626+
flags=FLAG_AUTOMATOR_MODIFIABLE,
1627+
)
1628+
1629+
register(
1630+
"hybridcloud.webhookpayload.worker_threads",
1631+
default=4,
1632+
flags=FLAG_AUTOMATOR_MODIFIABLE,
1633+
)
16221634
# == End hybrid cloud subsystem
16231635

16241636
# Decides whether an incoming transaction triggers an update of the clustering rule applied to it.

0 commit comments

Comments
 (0)