Skip to content

Commit 57338ce

Browse files
markstoryandrewshie-sentry
authored andcommitted
fix(taskworker) Make task routing overrides apply at runtime (#87538)
Improve how quickly task production responds to changes in topic routing. Instead of routing configuration only being considered during application startup, we apply routing to each task that is produced. Refs getsentry/taskbroker#264
1 parent 1cefa2a commit 57338ce

File tree

5 files changed

+82
-58
lines changed

5 files changed

+82
-58
lines changed

src/sentry/taskworker/registry.py

Lines changed: 22 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -33,25 +33,18 @@ class TaskNamespace:
3333
def __init__(
3434
self,
3535
name: str,
36-
topic: Topic,
36+
router: TaskRouter,
3737
retry: Retry | None,
3838
expires: int | datetime.timedelta | None = None,
3939
processing_deadline_duration: int = DEFAULT_PROCESSING_DEADLINE,
4040
):
4141
self.name = name
42-
self.topic = topic
42+
self.router = router
4343
self.default_retry = retry
4444
self.default_expires = expires # seconds
4545
self.default_processing_deadline_duration = processing_deadline_duration # seconds
4646
self._registered_tasks: dict[str, Task[Any, Any]] = {}
47-
self._producer: SingletonProducer = SingletonProducer(
48-
self._basic_producer, max_futures=1000
49-
)
50-
51-
def _basic_producer(self) -> KafkaProducer:
52-
cluster_name = get_topic_definition(self.topic)["cluster"]
53-
producer_config = get_kafka_producer_cluster_options(cluster_name)
54-
return KafkaProducer(producer_config)
47+
self._producers: dict[Topic, SingletonProducer] = {}
5548

5649
def get(self, name: str) -> Task[Any, Any]:
5750
"""
@@ -69,6 +62,10 @@ def contains(self, name: str) -> bool:
6962
"""
7063
return name in self._registered_tasks
7164

65+
@property
66+
def topic(self) -> Topic:
67+
return self.router.route_namespace(self.name)
68+
7269
def register(
7370
self,
7471
*,
@@ -131,8 +128,9 @@ def wrapped(func: Callable[P, R]) -> Task[P, R]:
131128
def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False) -> None:
132129
metrics.incr("taskworker.registry.send_task", tags={"namespace": activation.namespace})
133130

134-
produce_future = self._producer.produce(
135-
ArroyoTopic(name=self.topic.value),
131+
topic = self.router.route_namespace(self.name)
132+
produce_future = self._producer(topic).produce(
133+
ArroyoTopic(name=topic.value),
136134
KafkaPayload(key=None, value=activation.SerializeToString(), headers=[]),
137135
)
138136
if wait_for_delivery:
@@ -141,6 +139,17 @@ def send_task(self, activation: TaskActivation, wait_for_delivery: bool = False)
141139
except Exception:
142140
logger.exception("Failed to wait for delivery")
143141

142+
def _producer(self, topic: Topic) -> SingletonProducer:
143+
if topic not in self._producers:
144+
145+
def factory() -> KafkaProducer:
146+
cluster_name = get_topic_definition(topic)["cluster"]
147+
producer_config = get_kafka_producer_cluster_options(cluster_name)
148+
return KafkaProducer(producer_config)
149+
150+
self._producers[topic] = SingletonProducer(factory, max_futures=1000)
151+
return self._producers[topic]
152+
144153

145154
class TaskRegistry:
146155
"""
@@ -191,10 +200,9 @@ def create_namespace(
191200
192201
Namespaces can define default behavior for tasks defined within a namespace.
193202
"""
194-
topic = self._router.route_namespace(name)
195203
namespace = TaskNamespace(
196204
name=name,
197-
topic=topic,
205+
router=self._router,
198206
retry=retry,
199207
expires=expires,
200208
processing_deadline_duration=processing_deadline_duration,

src/sentry/taskworker/router.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ def route_namespace(self, name: str) -> Topic: ...
1111

1212

1313
class DefaultRouter:
14-
"""Simple router used for self-hosted and local development"""
14+
"""Router that uses django settings and options to select topics at runtime"""
1515

1616
def route_namespace(self, name: str) -> Topic:
1717
overrides = options.get("taskworker.route.overrides")

src/sentry/taskworker/worker.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,7 @@ def fetch_task(self) -> TaskActivation | None:
467467
return None
468468

469469
if not activation:
470+
metrics.incr("taskworker.worker.fetch_task.not_found")
470471
logger.debug("taskworker.fetch_task.not_found")
471472

472473
self._gettask_backoff_seconds = min(self._gettask_backoff_seconds + 1, 10)

tests/sentry/taskworker/test_registry.py

Lines changed: 56 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from concurrent.futures import Future
2-
from unittest.mock import patch
2+
from unittest.mock import Mock
33

44
import pytest
55
from django.test.utils import override_settings
@@ -11,13 +11,14 @@
1111
from sentry.conf.types.kafka_definition import Topic
1212
from sentry.taskworker.registry import TaskNamespace, TaskRegistry
1313
from sentry.taskworker.retry import LastAction, Retry
14+
from sentry.taskworker.router import DefaultRouter
1415
from sentry.taskworker.task import Task
1516

1617

1718
def test_namespace_register_task() -> None:
1819
namespace = TaskNamespace(
1920
name="tests",
20-
topic=Topic.TASK_WORKER,
21+
router=DefaultRouter(),
2122
retry=None,
2223
)
2324

@@ -37,7 +38,7 @@ def simple_task():
3738
def test_namespace_register_inherits_default_retry() -> None:
3839
namespace = TaskNamespace(
3940
name="tests",
40-
topic=Topic.TASK_WORKER,
41+
router=DefaultRouter(),
4142
retry=Retry(times=5, on=(RuntimeError,)),
4243
)
4344

@@ -65,7 +66,7 @@ def retry_none_param() -> None:
6566
def test_register_inherits_default_expires_processing_deadline() -> None:
6667
namespace = TaskNamespace(
6768
name="tests",
68-
topic=Topic.TASK_WORKER,
69+
router=DefaultRouter(),
6970
retry=None,
7071
expires=10 * 60,
7172
processing_deadline_duration=5,
@@ -93,7 +94,7 @@ def with_expires() -> None:
9394
def test_namespace_get_unknown() -> None:
9495
namespace = TaskNamespace(
9596
name="tests",
96-
topic=Topic.TASK_WORKER,
97+
router=DefaultRouter(),
9798
retry=None,
9899
)
99100

@@ -102,10 +103,11 @@ def test_namespace_get_unknown() -> None:
102103
assert "No task registered" in str(err)
103104

104105

106+
@pytest.mark.django_db
105107
def test_namespace_send_task_no_retry() -> None:
106108
namespace = TaskNamespace(
107109
name="tests",
108-
topic=Topic.TASK_WORKER,
110+
router=DefaultRouter(),
109111
retry=None,
110112
)
111113

@@ -118,21 +120,24 @@ def simple_task() -> None:
118120
assert activation.retry_state.max_attempts == 1
119121
assert activation.retry_state.on_attempts_exceeded == ON_ATTEMPTS_EXCEEDED_DISCARD
120122

121-
with patch.object(namespace, "_producer") as mock_producer:
122-
namespace.send_task(activation)
123-
assert mock_producer.produce.call_count == 1
123+
mock_producer = Mock()
124+
namespace._producers[Topic.TASK_WORKER] = mock_producer
125+
126+
namespace.send_task(activation)
127+
assert mock_producer.produce.call_count == 1
124128

125-
mock_call = mock_producer.produce.call_args
126-
assert mock_call[0][0].name == "task-worker"
129+
mock_call = mock_producer.produce.call_args
130+
assert mock_call[0][0].name == "task-worker"
127131

128-
proto_message = mock_call[0][1].value
129-
assert proto_message == activation.SerializeToString()
132+
proto_message = mock_call[0][1].value
133+
assert proto_message == activation.SerializeToString()
130134

131135

136+
@pytest.mark.django_db
132137
def test_namespace_send_task_with_retry() -> None:
133138
namespace = TaskNamespace(
134139
name="tests",
135-
topic=Topic.TASK_WORKER,
140+
router=DefaultRouter(),
136141
retry=None,
137142
)
138143

@@ -147,19 +152,22 @@ def simple_task() -> None:
147152
assert activation.retry_state.max_attempts == 3
148153
assert activation.retry_state.on_attempts_exceeded == ON_ATTEMPTS_EXCEEDED_DEADLETTER
149154

150-
with patch.object(namespace, "_producer") as mock_producer:
151-
namespace.send_task(activation)
152-
assert mock_producer.produce.call_count == 1
155+
mock_producer = Mock()
156+
namespace._producers[Topic.TASK_WORKER] = mock_producer
157+
158+
namespace.send_task(activation)
159+
assert mock_producer.produce.call_count == 1
153160

154-
mock_call = mock_producer.produce.call_args
155-
proto_message = mock_call[0][1].value
156-
assert proto_message == activation.SerializeToString()
161+
mock_call = mock_producer.produce.call_args
162+
proto_message = mock_call[0][1].value
163+
assert proto_message == activation.SerializeToString()
157164

158165

166+
@pytest.mark.django_db
159167
def test_namespace_with_retry_send_task() -> None:
160168
namespace = TaskNamespace(
161169
name="tests",
162-
topic=Topic.TASK_WORKER,
170+
router=DefaultRouter(),
163171
retry=Retry(times=3),
164172
)
165173

@@ -172,21 +180,24 @@ def simple_task() -> None:
172180
assert activation.retry_state.max_attempts == 3
173181
assert activation.retry_state.on_attempts_exceeded == ON_ATTEMPTS_EXCEEDED_DEADLETTER
174182

175-
with patch.object(namespace, "_producer") as mock_producer:
176-
namespace.send_task(activation)
177-
assert mock_producer.produce.call_count == 1
183+
mock_producer = Mock()
184+
namespace._producers[Topic.TASK_WORKER] = mock_producer
185+
186+
namespace.send_task(activation)
187+
assert mock_producer.produce.call_count == 1
178188

179-
mock_call = mock_producer.produce.call_args
180-
assert mock_call[0][0].name == "task-worker"
189+
mock_call = mock_producer.produce.call_args
190+
assert mock_call[0][0].name == "task-worker"
181191

182-
proto_message = mock_call[0][1].value
183-
assert proto_message == activation.SerializeToString()
192+
proto_message = mock_call[0][1].value
193+
assert proto_message == activation.SerializeToString()
184194

185195

196+
@pytest.mark.django_db
186197
def test_namespace_with_wait_for_delivery_send_task() -> None:
187198
namespace = TaskNamespace(
188199
name="tests",
189-
topic=Topic.TASK_WORKER,
200+
router=DefaultRouter(),
190201
retry=Retry(times=3),
191202
)
192203

@@ -196,18 +207,20 @@ def simple_task() -> None:
196207

197208
activation = simple_task.create_activation()
198209

199-
with patch.object(namespace, "_producer") as mock_producer:
200-
ret_value: Future[None] = Future()
201-
ret_value.set_result(None)
202-
mock_producer.produce.return_value = ret_value
203-
namespace.send_task(activation, wait_for_delivery=True)
204-
assert mock_producer.produce.call_count == 1
210+
mock_producer = Mock()
211+
namespace._producers[Topic.TASK_WORKER] = mock_producer
212+
213+
ret_value: Future[None] = Future()
214+
ret_value.set_result(None)
215+
mock_producer.produce.return_value = ret_value
216+
namespace.send_task(activation, wait_for_delivery=True)
217+
assert mock_producer.produce.call_count == 1
205218

206-
mock_call = mock_producer.produce.call_args
207-
assert mock_call[0][0].name == "task-worker"
219+
mock_call = mock_producer.produce.call_args
220+
assert mock_call[0][0].name == "task-worker"
208221

209-
proto_message = mock_call[0][1].value
210-
assert proto_message == activation.SerializeToString()
222+
proto_message = mock_call[0][1].value
223+
assert proto_message == activation.SerializeToString()
211224

212225

213226
@pytest.mark.django_db
@@ -217,7 +230,7 @@ def test_registry_get() -> None:
217230

218231
assert isinstance(ns, TaskNamespace)
219232
assert ns.name == "tests"
220-
assert ns.topic
233+
assert ns.router
221234
assert ns == registry.get("tests")
222235

223236
with pytest.raises(KeyError):
@@ -284,4 +297,6 @@ def test_registry_create_namespace_route_setting() -> None:
284297
assert profiling.topic == Topic.PROFILES
285298

286299
with pytest.raises(ValueError):
287-
registry.create_namespace(name="lol")
300+
ns = registry.create_namespace(name="lol")
301+
# Should raise as the name is routed to an invalid topic
302+
ns.topic

tests/sentry/taskworker/test_task.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,9 @@
77
ON_ATTEMPTS_EXCEEDED_DISCARD,
88
)
99

10-
from sentry.conf.types.kafka_definition import Topic
1110
from sentry.taskworker.registry import TaskNamespace
1211
from sentry.taskworker.retry import LastAction, Retry, RetryError
12+
from sentry.taskworker.router import DefaultRouter
1313
from sentry.taskworker.task import Task
1414
from sentry.testutils.helpers.task_runner import TaskRunner
1515
from sentry.utils import json
@@ -21,7 +21,7 @@ def do_things() -> None:
2121

2222
@pytest.fixture
2323
def task_namespace() -> TaskNamespace:
24-
return TaskNamespace(name="tests", topic=Topic.TASK_WORKER, retry=None)
24+
return TaskNamespace(name="tests", router=DefaultRouter(), retry=None)
2525

2626

2727
def test_define_task_defaults(task_namespace: TaskNamespace) -> None:

0 commit comments

Comments
 (0)