Skip to content

Commit 7997cb9

Browse files
authored
feat(post-process-forwarder) Allow different types of post process forwarders (#29225)
In feat(post-process-forwarder) Different errors and transaction forwarders #28954, we created different processing strategies for errors and transactions post process forwarders. In this PR, we enable post process forwarder to run with one of the following entities (errors, transactions, all) The all option would be used in scenarios where we don't need separate post process forwarders. Example devserver, single tenant. There is no production change with this code since the default option for entity is "all" which means the current post process forwarder would handle all messages
1 parent 3eb5550 commit 7997cb9

File tree

6 files changed

+35
-1
lines changed

6 files changed

+35
-1
lines changed

src/sentry/eventstream/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,7 @@ def requires_post_process_forwarder(self):
120120

121121
def run_post_process_forwarder(
122122
self,
123+
entity,
123124
consumer_group,
124125
commit_log_topic,
125126
synchronize_commit_group,

src/sentry/eventstream/kafka/backend.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@
1010
from sentry.eventstream.kafka.consumer import SynchronizedConsumer
1111
from sentry.eventstream.kafka.postprocessworker import (
1212
_CONCURRENCY_OPTION,
13+
ErrorsPostProcessForwarderWorker,
14+
PostProcessForwarderType,
1315
PostProcessForwarderWorker,
16+
TransactionsPostProcessForwarderWorker,
1417
_sampled_eventstream_timer,
1518
)
1619
from sentry.eventstream.kafka.protocol import (
@@ -151,6 +154,7 @@ def requires_post_process_forwarder(self):
151154

152155
def _build_consumer(
153156
self,
157+
entity,
154158
consumer_group,
155159
commit_log_topic,
156160
synchronize_commit_group,
@@ -169,7 +173,16 @@ def _build_consumer(
169173
)
170174

171175
concurrency = options.get(_CONCURRENCY_OPTION)
172-
worker = PostProcessForwarderWorker(concurrency=concurrency)
176+
logger.info(f"Starting post process forwrader to consume {entity} messages")
177+
if entity == PostProcessForwarderType.TRANSACTIONS:
178+
worker = TransactionsPostProcessForwarderWorker(concurrency=concurrency)
179+
elif entity == PostProcessForwarderType.ERRORS:
180+
worker = ErrorsPostProcessForwarderWorker(concurrency=concurrency)
181+
else:
182+
# Default implementation which processes both errors and transactions
183+
# irrespective of values in the header. This would most likely be the case
184+
# for development environments.
185+
worker = PostProcessForwarderWorker(concurrency=concurrency)
173186

174187
consumer = BatchingKafkaConsumer(
175188
topics=self.topic,
@@ -183,6 +196,7 @@ def _build_consumer(
183196

184197
def run_batched_consumer(
185198
self,
199+
entity,
186200
consumer_group,
187201
commit_log_topic,
188202
synchronize_commit_group,
@@ -191,6 +205,7 @@ def run_batched_consumer(
191205
initial_offset_reset="latest",
192206
):
193207
consumer = self._build_consumer(
208+
entity,
194209
consumer_group,
195210
commit_log_topic,
196211
synchronize_commit_group,
@@ -404,6 +419,7 @@ def _get_task_kwargs_and_dispatch(self, message) -> None:
404419

405420
def run_post_process_forwarder(
406421
self,
422+
entity,
407423
consumer_group,
408424
commit_log_topic,
409425
synchronize_commit_group,
@@ -416,6 +432,7 @@ def run_post_process_forwarder(
416432
if settings.SENTRY_POST_PROCESS_FORWARDER_BATCHING:
417433
logger.info("Starting batching consumer")
418434
self.run_batched_consumer(
435+
entity,
419436
consumer_group,
420437
commit_log_topic,
421438
synchronize_commit_group,

src/sentry/eventstream/kafka/postprocessworker.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import random
33
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
44
from contextlib import contextmanager
5+
from enum import Enum
56
from typing import Any, Generator, Mapping, Optional, Sequence
67

78
from sentry import options
@@ -25,6 +26,12 @@
2526
_TRANSACTION_FORWARDER_HEADER = "transaction_forwarder"
2627

2728

29+
class PostProcessForwarderType(str, Enum):
30+
ERRORS = "errors"
31+
TRANSACTIONS = "transactions"
32+
ALL = "all"
33+
34+
2835
@contextmanager
2936
def _sampled_eventstream_timer(instance: str) -> Generator[None, None, None]:
3037
record_metric = random.random() < 0.1

src/sentry/runner/commands/devserver.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
"sentry",
1414
"run",
1515
"post-process-forwarder",
16+
"--entity=all",
1617
"--loglevel=debug",
1718
"--commit-batch-size=1",
1819
],

src/sentry/runner/commands/run.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -324,6 +324,12 @@ def cron(**options):
324324
type=click.Choice(["earliest", "latest"]),
325325
help="Position in the commit log topic to begin reading from when no prior offset has been recorded.",
326326
)
327+
@click.option(
328+
"--entity",
329+
default="all",
330+
type=click.Choice(["all", "errors", "transactions"]),
331+
help="The type of entity to process (all, errors, transactions).",
332+
)
327333
@log_options()
328334
@configuration
329335
def post_process_forwarder(**options):
@@ -332,6 +338,7 @@ def post_process_forwarder(**options):
332338

333339
try:
334340
eventstream.run_post_process_forwarder(
341+
entity=options["entity"],
335342
consumer_group=options["consumer_group"],
336343
commit_log_topic=options["commit_log_topic"],
337344
synchronize_commit_group=options["synchronize_commit_group"],

tests/sentry/eventstream/kafka/test_consumer.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -691,6 +691,7 @@ def test_post_process_forwarder_batch_consumer(self, dispatch_post_process_group
691691

692692
eventstream = KafkaEventStream()
693693
consumer = eventstream._build_consumer(
694+
entity="all",
694695
consumer_group=consumer_group,
695696
commit_log_topic=self.commit_log_topic,
696697
synchronize_commit_group=synchronize_commit_group,

0 commit comments

Comments
 (0)