Skip to content

Commit a178f46

Browse files
markstoryandrewshie-sentry
authored andcommitted
feat(taskworker) Add tracing continutation to taskworker tasks (#81831)
Add sentry-trace and baggage to task headers. With these headers we can continue traces from within workers so that spawned tasks are included in the trace they originated from. This screenshot shows a task that was spawned by `OrganizationDetails.get()` included in the trace from the endpoint. Refs #80054 Refs #80254
1 parent 349f3c6 commit a178f46

File tree

4 files changed

+61
-11
lines changed

4 files changed

+61
-11
lines changed

src/sentry/taskworker/registry.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,11 +57,19 @@ def producer(self) -> KafkaProducer:
5757
return self._producer
5858

5959
def get(self, name: str) -> Task[Any, Any]:
60+
"""
61+
Get a registered task by name
62+
63+
Raises KeyError when an unknown task is provided.
64+
"""
6065
if name not in self._registered_tasks:
6166
raise KeyError(f"No task registered with the name {name}. Check your imports")
6267
return self._registered_tasks[name]
6368

6469
def contains(self, name: str) -> bool:
70+
"""
71+
Check if a task name has been registered
72+
"""
6573
return name in self._registered_tasks
6674

6775
def register(
@@ -80,6 +88,7 @@ def register(
8088
asynchronously via taskworkers.
8189
8290
Parameters
91+
----------
8392
8493
name: str
8594
The name of the task. This is serialized and must be stable across deploys.

src/sentry/taskworker/task.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from uuid import uuid4
88

99
import orjson
10+
import sentry_sdk
1011
from django.conf import settings
1112
from django.utils import timezone
1213
from google.protobuf.timestamp_pb2 import Timestamp
@@ -58,15 +59,31 @@ def retry(self) -> Retry | None:
5859
return self._retry
5960

6061
def __call__(self, *args: P.args, **kwargs: P.kwargs) -> R:
62+
"""
63+
Call the task function immediately.
64+
"""
6165
return self._func(*args, **kwargs)
6266

6367
def delay(self, *args: P.args, **kwargs: P.kwargs) -> None:
68+
"""
69+
Schedule a task to run later with a set of arguments.
70+
71+
The provided parameters will be JSON encoded and stored within
72+
a `TaskActivation` protobuf that is appended to kafka
73+
"""
6474
self.apply_async(*args, **kwargs)
6575

6676
def apply_async(self, *args: P.args, **kwargs: P.kwargs) -> None:
77+
"""
78+
Schedule a task to run later with a set of arguments.
79+
80+
The provided parameters will be JSON encoded and stored within
81+
a `TaskActivation` protobuf that is appended to kafka
82+
"""
6783
if settings.TASK_WORKER_ALWAYS_EAGER:
6884
self._func(*args, **kwargs)
6985
else:
86+
# TODO(taskworker) promote parameters to headers
7087
self._namespace.send_task(self.create_activation(*args, **kwargs))
7188

7289
def create_activation(self, *args: P.args, **kwargs: P.kwargs) -> TaskActivation:
@@ -81,10 +98,16 @@ def create_activation(self, *args: P.args, **kwargs: P.kwargs) -> TaskActivation
8198
if isinstance(expires, datetime.timedelta):
8299
expires = int(expires.total_seconds())
83100

101+
headers = {
102+
"sentry-trace": sentry_sdk.get_traceparent() or "",
103+
"baggage": sentry_sdk.get_baggage() or "",
104+
}
105+
84106
return TaskActivation(
85107
id=uuid4().hex,
86108
namespace=self._namespace.name,
87109
taskname=self.name,
110+
headers=headers,
88111
parameters=orjson.dumps({"args": args, "kwargs": kwargs}).decode("utf8"),
89112
retry_state=self._create_retry_state(),
90113
received_at=received_at,

src/sentry/taskworker/worker.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import grpc
1212
import orjson
13+
import sentry_sdk
1314
from django.conf import settings
1415
from django.core.cache import cache
1516
from sentry_protos.sentry.v1.taskworker_pb2 import (
@@ -30,11 +31,20 @@
3031
mp_context = multiprocessing.get_context("fork")
3132

3233

33-
def _process_activation(
34-
namespace: str, task_name: str, args: list[Any], kwargs: dict[str, Any]
35-
) -> None:
34+
def _process_activation(activation: TaskActivation) -> None:
3635
"""multiprocess worker method"""
37-
taskregistry.get(namespace).get(task_name)(*args, **kwargs)
36+
parameters = orjson.loads(activation.parameters)
37+
args = parameters.get("args", [])
38+
kwargs = parameters.get("kwargs", {})
39+
headers = {k: v for k, v in activation.headers.items()}
40+
41+
transaction = sentry_sdk.continue_trace(
42+
environ_or_headers=headers,
43+
op="task.taskworker",
44+
name=f"{activation.namespace}:{activation.taskname}",
45+
)
46+
with sentry_sdk.start_transaction(transaction):
47+
taskregistry.get(activation.namespace).get(activation.taskname)(*args, **kwargs)
3848

3949

4050
AT_MOST_ONCE_TIMEOUT = 60 * 60 * 24 # 1 day
@@ -187,17 +197,11 @@ def process_task(self, activation: TaskActivation) -> TaskActivation | None:
187197
result = None
188198
execution_start_time = 0.0
189199
try:
190-
task_data_parameters = orjson.loads(activation.parameters)
191200
execution_start_time = time.time()
192201

193202
result = self._pool.apply_async(
194203
func=_process_activation,
195-
args=(
196-
activation.namespace,
197-
activation.taskname,
198-
task_data_parameters["args"],
199-
task_data_parameters["kwargs"],
200-
),
204+
args=(activation,),
201205
)
202206
# Will trigger a TimeoutError if the task execution runs long
203207
result.get(timeout=processing_timeout)

tests/sentry/taskworker/test_task.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import logging
33

44
import pytest
5+
import sentry_sdk
56

67
from sentry.conf.types.kafka_definition import Topic
78
from sentry.taskworker.registry import TaskNamespace
@@ -174,3 +175,16 @@ def with_parameters(one: str, two: int, org_id: int) -> None:
174175
assert params["args"]
175176
assert params["args"] == ["one", 22]
176177
assert params["kwargs"] == {"org_id": 99}
178+
179+
180+
def test_create_activation_tracing(task_namespace: TaskNamespace) -> None:
181+
@task_namespace.register(name="test.parameters")
182+
def with_parameters(one: str, two: int, org_id: int) -> None:
183+
pass
184+
185+
with sentry_sdk.start_transaction(op="test.task"):
186+
activation = with_parameters.create_activation("one", 22, org_id=99)
187+
188+
headers = activation.headers
189+
assert headers["sentry-trace"]
190+
assert "baggage" in headers

0 commit comments

Comments
 (0)