Replies: 6 comments 6 replies
-
|
Hello. I think it would be great to have a taskiq instrumentation, but as I understand taskiq already has a simple integration with otlp out of the box. Since we use mainstream libraries for brokers, you can enable instrumentation for those libraries to get all the traces. |
Beta Was this translation helpful? Give feedback.
-
|
It would be better if logfire could be easily integrated. Thank you guys |
Beta Was this translation helpful? Give feedback.
-
|
Hi, any updates here? I was considering using taskiq for our usecase (currently celery+fastapi). This seems to be a deal breaker for us since we rely heavily on end to end traces for a lot of our monitoring. |
Beta Was this translation helpful? Give feedback.
-
|
Hey! What’s the current roadblock with integrating OpenTelemetry reporting? From my brief look, it seems that simply supplying the trace ID to an executing task would be enough. Regarding the issue brought up by @nickderobertis: from what I know, OpenTelemetry does not allow suspending spans. So having one span last several hours is actually expected. You can always scope down to a more specific subspan when analyzing. Also, if spans are implemented, the loggers should automatically pick up on that. That way, we get two features for the price of one. I chose TaskIQ over Celery because it allows for easy async. However, not having telemetry in a distributed system could be a dealbreaker. I’ll need it for a personal project anyway, so I can start working on the basic instrumentation design if that would be helpful. |
Beta Was this translation helpful? Give feedback.
-
Beta Was this translation helpful? Give feedback.
-
|
I saw this implementation of middleware for Opentelemtry if you need it right now: Details
import typing as tp
from collections.abc import Coroutine
from opentelemetry import context as context_api
from opentelemetry import trace
from opentelemetry.sdk.trace import Span, TracerProvider
from opentelemetry.trace.propagation import _SPAN_KEY
from taskiq import TaskiqMessage, TaskiqMiddleware, TaskiqResult
class OpenTelemetryTaskiqMiddleware(TaskiqMiddleware):
INSTRUMENTATION_NAME = 'taskiq'
TASK_ARGS_LENGTH_LIMIT = 1000
MSG_SPAN_ATTRIBUTE = '__span'
MSG_CTX_TOKEN_ATTRIBUTE = '__otlp_ctx_token' # noqa: S105
def __init__(self, tracer_provider: TracerProvider | None = None) -> None:
if not tracer_provider:
tracer_provider = tp.cast('TracerProvider', trace.get_tracer_provider())
self.tracer_provider = tracer_provider
self.tracer = tracer_provider.get_tracer(self.INSTRUMENTATION_NAME, '0.1.0')
super().__init__()
def __serialize_args_and_kwargs(self, message: TaskiqMessage) -> tuple[str, str]:
task_args_str = str(message.args)
if len(task_args_str) > self.TASK_ARGS_LENGTH_LIMIT:
task_args_str = f'{task_args_str[: self.TASK_ARGS_LENGTH_LIMIT]}...'
task_kwargs_str = str(message.kwargs)
if len(task_kwargs_str) > self.TASK_ARGS_LENGTH_LIMIT:
task_kwargs_str = f'{task_kwargs_str[: self.TASK_ARGS_LENGTH_LIMIT]}...'
return task_args_str, task_kwargs_str
def __serialize_labels(self, message: TaskiqMessage) -> str:
labels_str = str(message.labels)
if len(labels_str) > self.TASK_ARGS_LENGTH_LIMIT:
labels_str = f'{labels_str[: self.TASK_ARGS_LENGTH_LIMIT]}...'
return labels_str
def pre_send(self, message: TaskiqMessage) -> TaskiqMessage:
"""Pre-send hook to attach task name to Taskiq message."""
task_args_str, task_kwargs_str = self.__serialize_args_and_kwargs(message)
span = self.tracer.start_span(
f'Post Task: {message.task_name}',
kind=trace.SpanKind.PRODUCER,
attributes={
'task.name': message.task_name,
'task.id': message.task_id,
'task.args': task_args_str,
'task.kwargs': task_kwargs_str,
'messaging.message.id': message.task_id,
},
)
setattr(message, self.MSG_SPAN_ATTRIBUTE, span)
ctx = span.get_span_context()
service_name = self.tracer_provider.resource.attributes.get('service.name')
message.labels['otlp.caller_trace_id'] = format(ctx.trace_id, '032x')
message.labels['otlp.caller_span_id'] = format(ctx.span_id, '016x')
message.labels['otlp.caller_service_name'] = service_name
return message
def post_send(self, message: TaskiqMessage) -> None:
"""Post-send hook."""
span: Span | None = getattr(message, self.MSG_SPAN_ATTRIBUTE, None)
if not span:
return
labels_str = self.__serialize_labels(message)
span.set_attribute('task.labels', labels_str)
span.set_status(trace.Status(trace.StatusCode.OK))
span.end()
def pre_execute(
self,
message: 'TaskiqMessage',
) -> TaskiqMessage | Coroutine[tp.Any, tp.Any, TaskiqMessage]:
"""Post execute hook."""
producer_span_id = message.labels.get('otlp.caller_span_id')
producer_trace_id = message.labels.get('otlp.caller_trace_id')
producer_context: trace.SpanContext | None = None
if producer_span_id and producer_trace_id:
producer_context = trace.SpanContext(
trace_id=int(producer_trace_id, 16),
span_id=int(producer_span_id, 16),
is_remote=True,
)
span = self.tracer.start_span(
f'Handle: {message.task_name} / {message.task_id}',
kind=trace.SpanKind.CONSUMER,
links=(
[
trace.Link(
producer_context,
{
'kind': 'producer',
'caller.service.name': message.labels.get('otlp.caller_service_name', ''),
'messaging.message.id': message.task_id,
},
),
]
if producer_context
else None
),
attributes={
'messaging.message.id': message.task_id,
'task.name': message.task_name,
'task.id': message.task_id,
'task.args': str(message.args),
'task.kwargs': str(message.kwargs),
'task.labels': str(message.labels),
'caller.otlp.trace_id': message.labels.get('otlp.caller_trace_id', ''),
'caller.otlp.span_id': message.labels.get('otlp.caller_span_id', ''),
'caller.otlp.service.name': message.labels.get('otlp.caller_service_name', ''),
},
)
trace.use_span(span, set_status_on_exception=True)
context_token = context_api.attach(context_api.set_value(_SPAN_KEY, span))
setattr(message, self.MSG_SPAN_ATTRIBUTE, span)
setattr(message, self.MSG_CTX_TOKEN_ATTRIBUTE, context_token)
return message
def post_execute(
self,
message: 'TaskiqMessage',
result: 'TaskiqResult[tp.Any]', # noqa: ARG002
) -> None:
"""Post-execute hook."""
span: Span | None = getattr(message, self.MSG_SPAN_ATTRIBUTE, None)
ctx_token = getattr(message, self.MSG_CTX_TOKEN_ATTRIBUTE, None)
if ctx_token:
context_api.detach(ctx_token)
if span:
span.set_status(trace.Status(trace.StatusCode.OK))
span.end()
def on_error(
self,
message: 'TaskiqMessage',
result: TaskiqResult[tp.Any],
exception: BaseException,
) -> None:
"""On error hook."""
span: Span | None = getattr(message, self.MSG_SPAN_ATTRIBUTE, None)
if span:
span.set_status(trace.Status(trace.StatusCode.ERROR, str(exception)))
span.end()
super().on_error(message, result, exception)Also Opentelemetry instrumentation was implemented in this MR (huge thanks for @soapun btw): #525. So I hope it will be a part of takiq in a future release |
Beta Was this translation helpful? Give feedback.

Uh oh!
There was an error while loading. Please reload this page.
-
Having tracing, metrics and logs supported for OpenTelemetry would be a great addition.
Beta Was this translation helpful? Give feedback.
All reactions