Skip to content

Commit a9e75ed

Browse files
authored
feat(taskworker): Compress send_resource_change_webhook task parameters (#95337)
In our US region, the `send_resource_change_webhook` is sending large task payloads to kafka. As we observed, this has negative effects on the taskbroker system. Now that taskworker supports compression, this PR is responsible for compressing the task parameters of this task before producing to kafka. Rollout: When this PR is merged and deployed, it should be a no-op to start. Since compression is gated behind a sentry option, the only difference is that we should see more options checks in postgres. When ready, we can incrementally sample tasks to be compressed in smaller environments first before reaching US region.
1 parent 806095a commit a9e75ed

File tree

3 files changed

+6
-0
lines changed

3 files changed

+6
-0
lines changed

src/sentry/sentry_apps/tasks/sentry_apps.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
from sentry.silo.base import SiloMode
4747
from sentry.tasks.base import instrumented_task, retry
4848
from sentry.taskworker.config import TaskworkerConfig
49+
from sentry.taskworker.constants import CompressionType
4950
from sentry.taskworker.namespaces import sentryapp_control_tasks, sentryapp_tasks
5051
from sentry.taskworker.retry import Retry, retry_task
5152
from sentry.types.rules import RuleFuture
@@ -654,6 +655,7 @@ def get_webhook_data(
654655
times=3,
655656
delay=60 * 5,
656657
),
658+
compression_type=CompressionType.ZSTD,
657659
),
658660
**TASK_OPTIONS,
659661
)

src/sentry/tasks/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,7 @@ def _wrapped(*args, **kwargs):
208208
processing_deadline_duration=taskworker_config.processing_deadline_duration,
209209
at_most_once=taskworker_config.at_most_once,
210210
wait_for_delivery=taskworker_config.wait_for_delivery,
211+
compression_type=taskworker_config.compression_type,
211212
)(func)
212213

213214
task = override_task(task, taskworker_task, taskworker_config, name)

src/sentry/taskworker/config.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import datetime
22

3+
from sentry.taskworker.constants import CompressionType
34
from sentry.taskworker.registry import TaskNamespace
45
from sentry.taskworker.retry import Retry
56

@@ -18,10 +19,12 @@ def __init__(
1819
processing_deadline_duration: int | datetime.timedelta | None = None,
1920
at_most_once: bool = False,
2021
wait_for_delivery: bool = False,
22+
compression_type: CompressionType = CompressionType.PLAINTEXT,
2123
):
2224
self.namespace = namespace
2325
self.retry = retry
2426
self.expires = expires
2527
self.processing_deadline_duration = processing_deadline_duration
2628
self.at_most_once = at_most_once
2729
self.wait_for_delivery = wait_for_delivery
30+
self.compression_type = compression_type

0 commit comments

Comments
 (0)