diff --git a/.riot/requirements/13e0d21.txt b/.riot/requirements/13e0d21.txt new file mode 100644 index 00000000000..d998a4fab3a --- /dev/null +++ b/.riot/requirements/13e0d21.txt @@ -0,0 +1,23 @@ +# +# This file is autogenerated by pip-compile with Python 3.13 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/13e0d21.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 diff --git a/.riot/requirements/1c72bfb.txt b/.riot/requirements/1c72bfb.txt new file mode 100644 index 00000000000..aa5a1c198ef --- /dev/null +++ b/.riot/requirements/1c72bfb.txt @@ -0,0 +1,24 @@ +# +# This file is autogenerated by pip-compile with Python 3.13 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/1c72bfb.in +# +aiokafka==0.12.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +typing-extensions==4.15.0 diff --git a/.riot/requirements/1ded764.txt b/.riot/requirements/1ded764.txt new file mode 100644 index 00000000000..645d52121b6 --- /dev/null +++ b/.riot/requirements/1ded764.txt @@ -0,0 +1,23 @@ +# +# This file is autogenerated by pip-compile with Python 3.14 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/1ded764.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 diff --git a/.riot/requirements/2c5336f.txt b/.riot/requirements/2c5336f.txt new file mode 100644 index 00000000000..27870829a63 --- /dev/null +++ b/.riot/requirements/2c5336f.txt @@ -0,0 +1,27 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/2c5336f.in +# +aiokafka==0.11.0 +async-timeout==4.0.3 +attrs==24.2.0 +coverage[toml]==7.6.1 +exceptiongroup==1.2.2 +hypothesis==6.45.0 +importlib-metadata==8.2.0 +iniconfig==2.0.0 +mock==5.1.0 +opentracing==2.4.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.3.2 +pytest-asyncio==0.23.8 +pytest-cov==5.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +tomli==2.0.1 +typing-extensions==4.12.2 +zipp==3.19.2 diff --git a/.riot/requirements/329b0ed.txt b/.riot/requirements/329b0ed.txt new file mode 100644 index 00000000000..21aeb755fc9 --- /dev/null +++ b/.riot/requirements/329b0ed.txt @@ -0,0 +1,24 @@ +# +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/329b0ed.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +typing-extensions==4.15.0 diff --git a/.riot/requirements/4532043.txt b/.riot/requirements/4532043.txt new file mode 100644 index 00000000000..24ed3bff903 --- /dev/null +++ b/.riot/requirements/4532043.txt @@ -0,0 +1,23 @@ +# +# This file is autogenerated by pip-compile with Python 3.11 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/4532043.in +# +aiokafka==0.11.0 +async-timeout==4.0.3 +attrs==24.2.0 +coverage[toml]==7.6.1 +hypothesis==6.45.0 +iniconfig==2.0.0 +mock==5.1.0 +opentracing==2.4.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.3.2 +pytest-asyncio==0.23.8 +pytest-cov==5.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +typing-extensions==4.12.2 diff --git a/.riot/requirements/538f024.txt b/.riot/requirements/538f024.txt new file mode 100644 index 00000000000..660bbec6c79 --- /dev/null +++ b/.riot/requirements/538f024.txt @@ -0,0 +1,29 @@ +# +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/538f024.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +backports-asyncio-runner==1.2.0 +coverage[toml]==7.10.7 +exceptiongroup==1.3.0 +hypothesis==6.45.0 +importlib-metadata==8.7.0 +iniconfig==2.1.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +tomli==2.3.0 +typing-extensions==4.15.0 +zipp==3.23.0 diff --git a/.riot/requirements/65c09d3.txt b/.riot/requirements/65c09d3.txt new file mode 100644 index 00000000000..549e58ce5e2 --- /dev/null +++ b/.riot/requirements/65c09d3.txt @@ -0,0 +1,27 @@ +# +# This file is autogenerated by pip-compile with Python 3.9 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/65c09d3.in +# +aiokafka==0.11.0 +async-timeout==4.0.3 +attrs==24.2.0 +coverage[toml]==7.6.1 +exceptiongroup==1.2.2 +hypothesis==6.45.0 +importlib-metadata==8.2.0 +iniconfig==2.0.0 +mock==5.1.0 +opentracing==2.4.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.3.2 +pytest-asyncio==0.23.8 +pytest-cov==5.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +tomli==2.0.1 +typing-extensions==4.12.2 +zipp==3.19.2 diff --git a/.riot/requirements/a0d2343.txt b/.riot/requirements/a0d2343.txt new file mode 100644 index 00000000000..acba8792c10 --- /dev/null +++ b/.riot/requirements/a0d2343.txt @@ -0,0 +1,27 @@ +# +# This file is autogenerated by pip-compile with Python 3.8 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/a0d2343.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.3.0 +coverage[toml]==7.6.1 +exceptiongroup==1.3.0 +hypothesis==6.45.0 +importlib-metadata==8.5.0 +iniconfig==2.1.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.5.0 +pytest==8.3.5 +pytest-asyncio==0.24.0 +pytest-cov==5.0.0 +pytest-mock==3.14.1 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +tomli==2.3.0 +typing-extensions==4.13.2 +zipp==3.20.2 diff --git a/.riot/requirements/c6f2827.txt b/.riot/requirements/c6f2827.txt new file mode 100644 index 00000000000..d9752d780ae --- /dev/null +++ b/.riot/requirements/c6f2827.txt @@ -0,0 +1,27 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/c6f2827.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +backports-asyncio-runner==1.2.0 +coverage[toml]==7.11.0 +exceptiongroup==1.3.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +tomli==2.3.0 +typing-extensions==4.15.0 diff --git a/.riot/requirements/e580d94.txt b/.riot/requirements/e580d94.txt new file mode 100644 index 00000000000..32abb1ef2ab --- /dev/null +++ b/.riot/requirements/e580d94.txt @@ -0,0 +1,23 @@ +# +# This file is autogenerated by pip-compile with Python 3.12 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/e580d94.in +# +aiokafka==0.11.0 +async-timeout==4.0.3 +attrs==24.2.0 +coverage[toml]==7.6.1 +hypothesis==6.45.0 +iniconfig==2.0.0 +mock==5.1.0 +opentracing==2.4.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.3.2 +pytest-asyncio==0.23.8 +pytest-cov==5.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +typing-extensions==4.12.2 diff --git a/.riot/requirements/fbd9c5b.txt b/.riot/requirements/fbd9c5b.txt new file mode 100644 index 00000000000..c2a541d823b --- /dev/null +++ b/.riot/requirements/fbd9c5b.txt @@ -0,0 +1,24 @@ +# +# This file is autogenerated by pip-compile with Python 3.11 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/fbd9c5b.in +# +aiokafka==0.9.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +typing-extensions==4.15.0 diff --git a/.riot/requirements/fbe6b3d.txt b/.riot/requirements/fbe6b3d.txt new file mode 100644 index 00000000000..142529fb6f2 --- /dev/null +++ b/.riot/requirements/fbe6b3d.txt @@ -0,0 +1,25 @@ +# +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/fbe6b3d.in +# +aiokafka==0.11.0 +async-timeout==4.0.3 +attrs==24.2.0 +coverage[toml]==7.6.1 +exceptiongroup==1.2.2 +hypothesis==6.45.0 +iniconfig==2.0.0 +mock==5.1.0 +opentracing==2.4.0 +packaging==24.1 +pluggy==1.5.0 +pytest==8.3.2 +pytest-asyncio==0.23.8 +pytest-cov==5.0.0 +pytest-mock==3.14.0 +pytest-randomly==3.15.0 +sortedcontainers==2.4.0 +tomli==2.0.1 +typing-extensions==4.12.2 diff --git a/.riot/requirements/fe1d595.txt b/.riot/requirements/fe1d595.txt new file mode 100644 index 00000000000..41677562755 --- /dev/null +++ b/.riot/requirements/fe1d595.txt @@ -0,0 +1,24 @@ +# +# This file is autogenerated by pip-compile with Python 3.14 +# by the following command: +# +# pip-compile --allow-unsafe --no-annotate .riot/requirements/fe1d595.in +# +aiokafka==0.12.0 +async-timeout==5.0.1 +attrs==25.4.0 +coverage[toml]==7.11.0 +hypothesis==6.45.0 +iniconfig==2.3.0 +mock==5.2.0 +opentracing==2.4.0 +packaging==25.0 +pluggy==1.6.0 +pygments==2.19.2 +pytest==8.4.2 +pytest-asyncio==1.2.0 +pytest-cov==7.0.0 +pytest-mock==3.15.1 +pytest-randomly==4.0.1 +sortedcontainers==2.4.0 +typing-extensions==4.15.0 diff --git a/ddtrace/_monkey.py b/ddtrace/_monkey.py index f943f465d1b..23491a5fee3 100644 --- a/ddtrace/_monkey.py +++ b/ddtrace/_monkey.py @@ -59,6 +59,7 @@ "kafka": True, "langgraph": True, "litellm": True, + "aiokafka": True, "mongoengine": True, "mysql": True, "mysqldb": True, diff --git a/ddtrace/_trace/trace_handlers.py b/ddtrace/_trace/trace_handlers.py index 75734de1dc1..33a572f08e5 100644 --- a/ddtrace/_trace/trace_handlers.py +++ b/ddtrace/_trace/trace_handlers.py @@ -36,6 +36,12 @@ from ddtrace.ext import net from ddtrace.ext import redis as redisx from ddtrace.ext import websocket +from ddtrace.ext.kafka import MESSAGE_KEY +from ddtrace.ext.kafka import MESSAGE_OFFSET +from ddtrace.ext.kafka import PARTITION +from ddtrace.ext.kafka import RECEIVED_MESSAGE +from ddtrace.ext.kafka import TOMBSTONE +from ddtrace.ext.kafka import TOPIC from ddtrace.internal import core from ddtrace.internal.compat import is_valid_ip from ddtrace.internal.compat import maybe_stringify @@ -1127,6 +1133,78 @@ def _on_asgi_request(ctx: core.ExecutionContext) -> None: scope["datadog"]["request_spans"].append(span) +def _on_aiokafka_send_start(_, send_value, send_key, headers, span, partition): + span.set_tag_str(SPAN_KIND, SpanKind.PRODUCER) + span.set_tag_str(MESSAGE_KEY, str(send_key)) + span.set_tag_str(PARTITION, str(partition)) + span.set_tag_str(TOMBSTONE, str(send_value is None)) + span.set_metric(_SPAN_MEASURED_KEY, 1) + + if config.aiokafka.distributed_tracing_enabled: + # inject headers with Datadog tags: + tracing_headers = {} + HTTPPropagator.inject(span.context, tracing_headers) + for key, value in tracing_headers.items(): + headers.append((key, value.encode("utf-8"))) + + +def _on_aiokafka_getone_message(_, span, message, err): + span.set_tag_str(RECEIVED_MESSAGE, str(message is not None)) + span.set_metric(_SPAN_MEASURED_KEY, 1) + + if message is not None: + message_key = message.key or "" + message_offset = message.offset or -1 + topic = str(message.topic) + span.set_tag_str(TOPIC, topic) + + if isinstance(message_key, str) or isinstance(message_key, bytes): + span.set_tag_str(MESSAGE_KEY, message_key) + + span.set_tag_str(TOMBSTONE, str(message.value is None)) + span.set_tag_str(PARTITION, str(message.partition)) + span.set_tag_str(MESSAGE_OFFSET, str(message_offset)) + + if err is not None: + span.set_exc_info(type(err), err, err.__traceback__) + + +def _on_aiokafka_getmany_message(_, span, messages): + span.set_tag_str(RECEIVED_MESSAGE, str(messages is not None)) + span.set_metric(_SPAN_MEASURED_KEY, 1) + + if messages is not None: + first_topic = next(iter(messages)).topic + span.set_tag_str(MESSAGING_DESTINATION_NAME, first_topic) + + topics_partitions = {} + for topic_partition in messages.keys(): + topic = topic_partition.topic + partition = topic_partition.partition + if topic not in topics_partitions: + topics_partitions[topic] = [] + topics_partitions[topic].append(partition) + + all_topics = list(topics_partitions.keys()) + span.set_tag(TOPIC, ",".join(all_topics)) + + for topic, partitions in topics_partitions.items(): + partition_list = ",".join(map(str, sorted(partitions))) + span.set_tag_str(f"kafka.partitions.{topic}", partition_list) + + for topic_partition, records in messages.items(): + for record in records: + if config.aiokafka.distributed_tracing_enabled and record.headers: + dd_headers = { + key: (val.decode("utf-8", errors="ignore") if isinstance(val, (bytes, bytearray)) else str(val)) + for key, val in record.headers + if val is not None + } + context = HTTPPropagator.extract(dd_headers) + + span.link_span(context) + + def listen(): core.on("wsgi.request.prepare", _on_request_prepare) core.on("wsgi.request.prepared", _on_request_prepared) @@ -1205,6 +1283,9 @@ def listen(): core.on("rq.worker.after.perform.job", _on_end_of_traced_method_in_fork) core.on("rq.queue.enqueue_job", _propagate_context) core.on("molten.router.match", _on_router_match) + core.on("aiokafka.send.start", _on_aiokafka_send_start) + core.on("aiokafka.getone.message", _on_aiokafka_getone_message) + core.on("aiokafka.getmany.message", _on_aiokafka_getmany_message) for context_name in ( # web frameworks @@ -1263,6 +1344,9 @@ def listen(): "azure.servicebus.patched_producer_schedule", "azure.servicebus.patched_producer_send", "psycopg.patched_connect", + "aiokafka.send", + "aiokafka.getone", + "aiokafka.getmany", ): core.on(f"context.started.{context_name}", _start_span) @@ -1295,6 +1379,8 @@ def listen(): "azure.eventhubs.patched_producer_batch", "azure.eventhubs.patched_producer_send", "azure.eventhubs.patched_producer_send_batch", + "aiokafka.getone", + "aiokafka.getmany", ): core.on(f"context.ended.{name}", _finish_span) diff --git a/ddtrace/contrib/integration_registry/registry.yaml b/ddtrace/contrib/integration_registry/registry.yaml index 4a467241940..b2cf73a8a36 100644 --- a/ddtrace/contrib/integration_registry/registry.yaml +++ b/ddtrace/contrib/integration_registry/registry.yaml @@ -33,6 +33,16 @@ integrations: min: 1.5.1 max: 1.6.0 +- integration_name: aiokafka + is_external_package: true + is_tested: true + dependency_names: + - aiokafka + tested_versions_by_dependency: + aiokafka: + min: 0.9.0 + max: 0.12.0 + - integration_name: aiomysql is_external_package: true is_tested: true diff --git a/ddtrace/contrib/internal/aiokafka/__init__.py b/ddtrace/contrib/internal/aiokafka/__init__.py new file mode 100644 index 00000000000..d353775735d --- /dev/null +++ b/ddtrace/contrib/internal/aiokafka/__init__.py @@ -0,0 +1,39 @@ +""" +This integration instruments the ``aiokafka`` +library to trace event streaming. + +Enabling +~~~~~~~~ + +The aiokafka integration is enabled automatically when using +:ref:`ddtrace-run` or :ref:`import ddtrace.auto`. + +Or use :func:`patch() ` to manually enable the integration:: + + from ddtrace import patch + patch(aiokafka=True) + import aiokafka + ... + +Global Configuration +~~~~~~~~~~~~~~~~~~~~ + +.. py:data:: ddtrace.config.aiokafka["service"] + + The service name reported by default for your kafka spans. + + This option can also be set with the ``DD_AIOKAFKA_SERVICE`` environment + variable. + + Default: ``"kafka"`` + +.. py:data:: ddtrace.config.aiokafka["distributed_tracing_enabled"] + + Whether to enable distributed tracing between Kafka messages. + + This option can also be set with the ``DD_KAFKA_PROPAGATION_ENABLED`` environment + variable. + + Default: ``"False"`` + +""" diff --git a/ddtrace/contrib/internal/aiokafka/patch.py b/ddtrace/contrib/internal/aiokafka/patch.py new file mode 100644 index 00000000000..5e555f782f6 --- /dev/null +++ b/ddtrace/contrib/internal/aiokafka/patch.py @@ -0,0 +1,201 @@ +import os +from time import time_ns +from typing import Dict + +import aiokafka +from wrapt import wrap_function_wrapper as _w + +from ddtrace import config +from ddtrace.constants import SPAN_KIND +from ddtrace.contrib import trace_utils +from ddtrace.ext import SpanKind +from ddtrace.ext import SpanTypes +from ddtrace.ext.kafka import CONSUME +from ddtrace.ext.kafka import GROUP_ID +from ddtrace.ext.kafka import HOST_LIST +from ddtrace.ext.kafka import PRODUCE +from ddtrace.ext.kafka import SERVICE +from ddtrace.ext.kafka import TOPIC +from ddtrace.internal import core +from ddtrace.internal.constants import COMPONENT +from ddtrace.internal.constants import MESSAGING_DESTINATION_NAME +from ddtrace.internal.constants import MESSAGING_SYSTEM +from ddtrace.internal.schema import schematize_messaging_operation +from ddtrace.internal.schema import schematize_service_name +from ddtrace.internal.schema.span_attribute_schema import SpanDirection +from ddtrace.internal.utils import get_argument_value +from ddtrace.internal.utils import set_argument_value +from ddtrace.internal.utils.formats import asbool +from ddtrace.internal.utils.wrappers import unwrap as _u +from ddtrace.propagation.http import HTTPPropagator + + +config._add( + "aiokafka", + dict( + _default_service=schematize_service_name("kafka"), + distributed_tracing_enabled=asbool(os.getenv("DD_KAFKA_PROPAGATION_ENABLED", default=False)), + ), +) + + +def get_version() -> str: + return getattr(aiokafka, "__version__", "") + + +def _supported_versions() -> Dict[str, str]: + return {"aiokafka": ">=0.9.0"} + + +def common_aiokafka_tags(topic, bootstrap_servers): + return { + COMPONENT: config.aiokafka.integration_name, + TOPIC: topic, + MESSAGING_DESTINATION_NAME: topic, + MESSAGING_SYSTEM: SERVICE, + HOST_LIST: bootstrap_servers, + } + + +def common_consume_aiokafka_tags(topic, bootstrap_servers, group_id): + tags = common_aiokafka_tags(topic, bootstrap_servers) + tags.update( + { + SPAN_KIND: SpanKind.CONSUMER, + GROUP_ID: group_id, + } + ) + return tags + + +def parse_send(instance, args, kwargs): + topic = get_argument_value(args, kwargs, 0, "topic") + value = get_argument_value(args, kwargs, 1, "value", True) + key = get_argument_value(args, kwargs, 2, "key", True) or None + partition = get_argument_value(args, kwargs, 3, "partition", True) + headers = get_argument_value(args, kwargs, 5, "headers", True) or [] + servers = instance.client._bootstrap_servers + + return topic, value, headers, partition, key, servers + + +async def traced_send(func, instance, args, kwargs): + topic, value, headers, partition, key, bootstrap_servers = parse_send(instance, args, kwargs) + + with core.context_with_data( + "aiokafka.send", + span_name=schematize_messaging_operation(PRODUCE, provider="kafka", direction=SpanDirection.OUTBOUND), + span_type=SpanTypes.WORKER, + service=trace_utils.ext_service(None, config.aiokafka), + tags=common_aiokafka_tags(topic, bootstrap_servers), + ) as ctx: + span = ctx.span + core.dispatch("aiokafka.send.start", (topic, value, key, headers, span, partition)) + args, kwargs = set_argument_value(args, kwargs, 5, "headers", headers, override_unset=True) + + try: + result = await func(*args, **kwargs) + except BaseException as e: + span.set_exc_info(type(e), e, e.__traceback__) + span.finish() + raise e + + def sent_callback(future): + try: + result = future.result() + core.dispatch("aiokafka.send.completed", (result,)) + except Exception as e: + span.set_exc_info(type(e), e, e.__traceback__) + finally: + span.finish() + + result.add_done_callback(sent_callback) + return result + + +async def traced_getone(func, instance, args, kwargs): + # we must get start time now since execute before starting a span in order to get distributed context + # if it exists + start_ns = time_ns() + err = None + message = None + parent_ctx = None + + group_id = instance._group_id + bootstrap_servers = instance._client._bootstrap_servers + + try: + message = await func(*args, **kwargs) + if config.aiokafka.distributed_tracing_enabled and message.headers: + dd_headers = { + key: (val.decode("utf-8", errors="ignore") if isinstance(val, (bytes, bytearray)) else str(val)) + for key, val in message.headers + if val is not None + } + parent_ctx = HTTPPropagator.extract(dd_headers) + except Exception as e: + err = e + raise err + finally: + with core.context_with_data( + "aiokafka.getone", + call_trace=False, + span_name=schematize_messaging_operation(CONSUME, provider="kafka", direction=SpanDirection.INBOUND), + span_type=SpanTypes.WORKER, + service=trace_utils.ext_service(None, config.aiokafka), + distributed_context=parent_ctx, + tags=common_consume_aiokafka_tags(getattr(message, "topic", None), bootstrap_servers, group_id), + ) as ctx: + span = ctx.span + span.start_ns = start_ns + + core.dispatch("aiokafka.getone.message", (instance, span, message, err)) + return message + + +async def traced_getmany(func, instance, args, kwargs): + group_id = instance._group_id + bootstrap_servers = instance._client._bootstrap_servers + + with core.context_with_data( + "aiokafka.getmany", + call_trace=False, + span_name=schematize_messaging_operation(CONSUME, provider="kafka", direction=SpanDirection.INBOUND), + span_type=SpanTypes.WORKER, + service=trace_utils.ext_service(None, config.aiokafka), + tags=common_consume_aiokafka_tags(None, bootstrap_servers, group_id), + ) as ctx: + span = ctx.span + messages = await func(*args, **kwargs) + + core.dispatch("aiokafka.getmany.message", (instance, span, messages)) + + return messages + + +async def traced_commit(func, instance, args, kwargs): + core.dispatch("aiokafka.commit.start", (instance, args, kwargs)) + return await func(*args, **kwargs) + + +def patch(): + if getattr(aiokafka, "_datadog_patch", False): + return + aiokafka._datadog_patch = True + + _w("aiokafka", "AIOKafkaProducer.send", traced_send) + _w("aiokafka", "AIOKafkaConsumer.getone", traced_getone) + _w("aiokafka", "AIOKafkaConsumer.getmany", traced_getmany) + _w("aiokafka", "AIOKafkaConsumer.commit", traced_commit) + + +def unpatch(): + if not getattr(aiokafka, "_datadog_patch", False): + return + + aiokafka._datadog_patch = False + + _u(aiokafka.AIOKafkaProducer, "send") + _u(aiokafka.AIOKafkaConsumer, "getone") + _u(aiokafka.AIOKafkaConsumer, "getmany") + _u(aiokafka.AIOKafkaConsumer, "commit") diff --git a/ddtrace/contrib/internal/kafka/patch.py b/ddtrace/contrib/internal/kafka/patch.py index dff5853c599..07a70448498 100644 --- a/ddtrace/contrib/internal/kafka/patch.py +++ b/ddtrace/contrib/internal/kafka/patch.py @@ -270,7 +270,7 @@ def _instrument_message(messages, pin, start_ns, instance, err): cluster_id = _get_cluster_id(instance, str(first_message.topic())) core.set_item("kafka_cluster_id", cluster_id) core.set_item("kafka_topic", str(first_message.topic())) - core.dispatch("kafka.consume.start", (instance, first_message, span)) + core.dispatch("kafka.consume.start", (instance, message, span)) span.set_tag_str(MESSAGING_SYSTEM, kafkax.SERVICE) span.set_tag_str(COMPONENT, config.kafka.integration_name) diff --git a/ddtrace/internal/datastreams/__init__.py b/ddtrace/internal/datastreams/__init__.py index 3afdd5ffa81..f7d85550462 100644 --- a/ddtrace/internal/datastreams/__init__.py +++ b/ddtrace/internal/datastreams/__init__.py @@ -3,13 +3,15 @@ from ...internal.utils.importlib import require_modules -required_modules = ["confluent_kafka", "botocore", "kombu"] +required_modules = ["confluent_kafka", "botocore", "kombu", "aiokafka"] _processor = None if config._data_streams_enabled: with require_modules(required_modules) as missing_modules: if "confluent_kafka" not in missing_modules: from . import kafka # noqa:F401 + if "aiokafka" not in missing_modules: + from . import aiokafka # noqa:F401 if "botocore" not in missing_modules: from . import botocore # noqa:F401 if "kombu" not in missing_modules: diff --git a/ddtrace/internal/datastreams/aiokafka.py b/ddtrace/internal/datastreams/aiokafka.py new file mode 100644 index 00000000000..7a0f81b203a --- /dev/null +++ b/ddtrace/internal/datastreams/aiokafka.py @@ -0,0 +1,103 @@ +import time + +from ddtrace import config +from ddtrace.internal import core +from ddtrace.internal.datastreams.processor import DsmPathwayCodec +from ddtrace.internal.datastreams.utils import _calculate_byte_size +from ddtrace.internal.utils import get_argument_value + + +INT_TYPES = (int,) + + +def dsm_aiokafka_send_start(topic, value, key, headers, span, _): + from . import data_streams_processor as processor + + payload_size = 0 + payload_size += _calculate_byte_size(value) + payload_size += _calculate_byte_size(key) + try: + header_dict = { + k: ( + v.decode("utf-8", errors="ignore") if isinstance(v, (bytes, bytearray)) else "" if v is None else str(v) + ) + for k, v in headers + } + except Exception: + header_dict = {} + payload_size += _calculate_byte_size(header_dict) + + edge_tags = ["direction:out", "topic:" + topic, "type:kafka"] + ctx = processor().set_checkpoint(edge_tags, payload_size=payload_size, span=span) + + dsm_headers = {} + DsmPathwayCodec.encode(ctx, dsm_headers) + for key, value in dsm_headers.items(): + headers.append((key, value.encode("utf-8"))) + + +def dsm_aiokafka_send_completed(record_metadata): + from . import data_streams_processor as processor + + reported_offset = record_metadata.offset if isinstance(record_metadata.offset, INT_TYPES) else -1 + processor().track_kafka_produce(record_metadata.topic, record_metadata.partition, reported_offset, time.time()) + + +def dsm_aiokafka_message_consume(instance, span, message, _): + from . import data_streams_processor as processor + + headers = { + key: val.decode("utf-8", errors="ignore") if isinstance(val, (bytes, bytearray)) else str(val) + for key, val in (message.headers or []) + if val is not None + } + group = instance._group_id + + payload_size = 0 + payload_size += _calculate_byte_size(message.value) + payload_size += _calculate_byte_size(message.key) + payload_size += _calculate_byte_size(headers) + + ctx = DsmPathwayCodec.decode(headers, processor()) + ctx.set_checkpoint( + ["direction:in", "group:" + group, "topic:" + message.topic, "type:kafka"], payload_size=payload_size, span=span + ) + + if instance._enable_auto_commit: + # it's not exactly true, but if auto commit is enabled, we consider that a message is acknowledged + # when it's read. We add one because the commit offset is the next message to read. + reported_offset = (message.offset + 1) if isinstance(message.offset, INT_TYPES) else -1 + processor().track_kafka_commit( + instance._group_id, message.topic, message.partition, reported_offset, time.time() + ) + + +def dsm_aiokafka_many_messages_consume(instance, span, messages): + if messages is not None: + for _, records in messages.items(): + for record in records: + dsm_aiokafka_message_consume(instance, span, record, None) + + +def dsm_aiokafka_messsage_commit(instance, args, kwargs): + from . import data_streams_processor as processor + + offsets = get_argument_value(args, kwargs, 0, "offsets", optional=True) + if offsets: + for tp, offset in offsets.items(): + # offset can be either an int or a tuple of (offset, metadata) + if isinstance(offset, INT_TYPES): + reported_offset = offset + elif isinstance(offset, tuple) and len(offset) > 0 and isinstance(offset[0], INT_TYPES): + reported_offset = offset[0] + else: + reported_offset = -1 + processor().track_kafka_commit(instance._group_id, tp.topic, tp.partition, reported_offset, time.time()) + + +if config._data_streams_enabled: + core.on("aiokafka.send.start", dsm_aiokafka_send_start) + core.on("aiokafka.send.completed", dsm_aiokafka_send_completed) + core.on("aiokafka.getone.message", dsm_aiokafka_message_consume) + core.on("aiokafka.getmany.message", dsm_aiokafka_many_messages_consume) + core.on("aiokafka.commit.start", dsm_aiokafka_messsage_commit) diff --git a/ddtrace/settings/_config.py b/ddtrace/settings/_config.py index 96c66bb5fce..6368845964c 100644 --- a/ddtrace/settings/_config.py +++ b/ddtrace/settings/_config.py @@ -105,6 +105,7 @@ "urllib3", "subprocess", "kafka", + "aiokafka", "futures", "unittest", "falcon", diff --git a/docs/integrations.rst b/docs/integrations.rst index c33138a508e..f00fa8f4003 100644 --- a/docs/integrations.rst +++ b/docs/integrations.rst @@ -47,6 +47,12 @@ aiohttp-jinja2 ^^^^^^^^^^^^^^ .. automodule:: ddtrace.contrib.internal.aiohttp_jinja2 +.. _aiokafka: + +aiokafka +^^^^^^^^ +.. automodule:: ddtrace.contrib.internal.aiokafka + .. _aiomysql: aiomysql diff --git a/docs/spelling_wordlist.txt b/docs/spelling_wordlist.txt index 19d3e99b2e8..52c28832f5f 100644 --- a/docs/spelling_wordlist.txt +++ b/docs/spelling_wordlist.txt @@ -4,6 +4,7 @@ adk agentless aiobotocore aiohttp +aiokafka aiomysql aiopg aioredis @@ -203,6 +204,7 @@ namespace NeedsAppKey NSight obfuscator +observability ObjectProxy oce openai diff --git a/releasenotes/notes/aiokafka-798694b0f8232505.yaml b/releasenotes/notes/aiokafka-798694b0f8232505.yaml new file mode 100644 index 00000000000..d2a1a6f6eca --- /dev/null +++ b/releasenotes/notes/aiokafka-798694b0f8232505.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + aiokafka: Introduces automatic instrumentation of the ``aiokafka`` package diff --git a/releasenotes/notes/feat-aiokafka-support-18ea0a4ec66a0827.yaml b/releasenotes/notes/feat-aiokafka-support-18ea0a4ec66a0827.yaml new file mode 100644 index 00000000000..3e68d69e6ac --- /dev/null +++ b/releasenotes/notes/feat-aiokafka-support-18ea0a4ec66a0827.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + aiokafka: Adds instrumentation support for ``aiokafka>=0.9.0``. See the + ``aiokafka`` + documentation for more information. diff --git a/releasenotes/notes/fix-kafka-consume-dispatch-48cd5912b1282ab9.yaml b/releasenotes/notes/fix-kafka-consume-dispatch-48cd5912b1282ab9.yaml new file mode 100644 index 00000000000..cc4310e4fae --- /dev/null +++ b/releasenotes/notes/fix-kafka-consume-dispatch-48cd5912b1282ab9.yaml @@ -0,0 +1,4 @@ +--- +fixes: + - | + kafka: This fix resolves an issue where only the first message in a batch was dispatched to Data Streams Monitoring (DSM) when consuming multiple Kafka messages \ No newline at end of file diff --git a/riotfile.py b/riotfile.py index f12379c4b8c..378adc8ef1b 100644 --- a/riotfile.py +++ b/riotfile.py @@ -3172,6 +3172,16 @@ def select_pys(min_version: str = MIN_PYTHON_VERSION, max_version: str = MAX_PYT ), ], ), + Venv( + name="aiokafka", + env={ + "_DD_TRACE_STATS_WRITER_INTERVAL": "1000000000", + "DD_DATA_STREAMS_ENABLED": "true", + }, + command="pytest {cmdargs} tests/contrib/aiokafka/", + pys=select_pys(), + pkgs={"pytest-asyncio": [latest], "pytest-randomly": latest, "aiokafka": ["~=0.9.0", latest]}, + ), Venv( name="aws_lambda", command="pytest --no-ddtrace {cmdargs} tests/contrib/aws_lambda", diff --git a/supported_versions_output.json b/supported_versions_output.json index c1ea2d658ad..2b19d432dba 100644 --- a/supported_versions_output.json +++ b/supported_versions_output.json @@ -27,6 +27,13 @@ "max_tracer_supported": "1.6", "auto-instrumented": true }, + { + "dependency": "aiokafka", + "integration": "aiokafka", + "minimum_tracer_supported": "0.9.0", + "max_tracer_supported": "0.12.0", + "auto-instrumented": true + }, { "dependency": "aiomysql", "integration": "aiomysql", @@ -106,7 +113,7 @@ "minimum_tracer_supported": "5.12.2", "max_tracer_supported": "5.15.0", "pinned": "true", - "auto-instrumented": false + "auto-instrumented": true }, { "dependency": "azure-functions", @@ -489,7 +496,8 @@ "dependency": "openai", "integration": "openai", "minimum_tracer_supported": "1.0.0", - "max_tracer_supported": "1.91.0", + "max_tracer_supported": "1.109.1", + "pinned": "true", "auto-instrumented": true }, { @@ -504,7 +512,7 @@ "integration": "protobuf", "minimum_tracer_supported": "5.29.3", "max_tracer_supported": "6.32.0", - "auto-instrumented": false + "auto-instrumented": true }, { "dependency": "psycopg", diff --git a/supported_versions_table.csv b/supported_versions_table.csv index 94af82316ad..8d3d2389114 100644 --- a/supported_versions_table.csv +++ b/supported_versions_table.csv @@ -3,6 +3,7 @@ aiobotocore,aiobotocore,1.0.7,2.24.1,False aiohttp,aiohttp,3.7.4.post0,3.12.15,True aiohttp-jinja2,aiohttp_jinja2,1.5.1,1.6,True aiohttp_jinja2,aiohttp_jinja2,1.5.1,1.6,True +aiokafka,aiokafka,0.9.0,0.12.0,True aiomysql,aiomysql,0.1.1,0.2.0,True aiopg,aiopg *,0.16.0,1.4.0,True algoliasearch,algoliasearch *,2.5.0,2.6.3,True @@ -13,7 +14,7 @@ asyncpg,asyncpg,0.22.0,0.30.0,True avro,avro,1.12.0,1.12.0,True datadog-lambda,aws_lambda,6.105.0,6.105.0,True datadog_lambda,aws_lambda,6.105.0,6.105.0,True -azure-eventhub,azure_eventhubs *,5.12.2,5.15.0,False +azure-eventhub,azure_eventhubs *,5.12.2,5.15.0,True azure-functions,azure_functions *,1.10.1,1.23.0,True azure-servicebus,azure_servicebus *,7.14.2,7.14.2,True boto3,botocore *,1.34.49,1.38.26,True @@ -67,9 +68,9 @@ molten,molten,1.0.2,1.0.2,True mongoengine,mongoengine,0.23.1,0.29.1,True mysql-connector-python,mysql,8.0.5,9.4.0,True mysqlclient,mysqldb,2.2.1,2.2.6,True -openai,openai,1.0.0,1.91.0,True +openai,openai *,1.0.0,1.109.1,True openai-agents,openai_agents,0.0.8,0.0.16,True -protobuf,protobuf,5.29.3,6.32.0,False +protobuf,protobuf,5.29.3,6.32.0,True psycopg,psycopg,3.0.18,3.2.10,True psycopg2-binary,psycopg,2.8.6,2.9.10,True pydantic-ai-slim,pydantic_ai *,0.3.0,0.4.4,True diff --git a/tests/contrib/aiokafka/__init__.py b/tests/contrib/aiokafka/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/contrib/aiokafka/test_aiokafka.py b/tests/contrib/aiokafka/test_aiokafka.py new file mode 100644 index 00000000000..e3b513df228 --- /dev/null +++ b/tests/contrib/aiokafka/test_aiokafka.py @@ -0,0 +1,176 @@ +from aiokafka.errors import MessageSizeTooLargeError +from aiokafka.structs import TopicPartition +import pytest + +from ddtrace.contrib.internal.aiokafka.patch import patch +from ddtrace.contrib.internal.aiokafka.patch import unpatch +from tests.utils import DummyTracer +from tests.utils import override_config +from tests.utils import override_global_tracer + +from .utils import BOOTSTRAP_SERVERS +from .utils import KEY +from .utils import PAYLOAD +from .utils import consumer_ctx +from .utils import create_topic +from .utils import find_span_by_name +from .utils import has_header +from .utils import producer_ctx + + +@pytest.fixture(autouse=True) +def patch_aiokafka(): + """Automatically patch aiokafka for all tests in this class""" + patch() + yield + unpatch() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("key", [KEY, None]) +@pytest.mark.snapshot() +async def test_send_and_wait_key(key): + topic = await create_topic("send_and_wait_key") + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=key) + + async with consumer_ctx([topic]) as consumer: + await consumer.getone() + await consumer.commit() + + +@pytest.mark.asyncio +@pytest.mark.parametrize("value", [PAYLOAD, None]) +@pytest.mark.snapshot() +async def test_send_and_wait_value(value): + topic = await create_topic("send_and_wait_value") + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=value, key=KEY) + + async with consumer_ctx([topic]) as consumer: + await consumer.getone() + await consumer.commit() + + +@pytest.mark.asyncio +@pytest.mark.snapshot() +async def test_send_and_wait_commit_with_offset(): + topic = await create_topic("send_and_wait_commit_with_offset") + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic]) as consumer: + result = await consumer.getone() + await consumer.commit({TopicPartition(result.topic, result.partition): result.offset + 1}) + + assert not has_header(result.headers, "x-datadog-trace-id") + + +@pytest.mark.asyncio +@pytest.mark.snapshot() +async def test_send_commit(): + topic = await create_topic("send_commit") + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + fut = await producer.send(topic, value=PAYLOAD, key=KEY) + await fut + + async with consumer_ctx([topic]) as consumer: + result = await consumer.getone() + await consumer.commit() + + assert not has_header(result.headers, "x-datadog-trace-id") + + +@pytest.mark.asyncio +@pytest.mark.snapshot() +async def test_send_multiple_servers(): + topic = await create_topic("send_multiple_servers") + async with producer_ctx([BOOTSTRAP_SERVERS] * 3) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + +@pytest.mark.asyncio +@pytest.mark.snapshot(ignores=["meta.error.stack"]) +async def test_send_and_wait_failure(): + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + with pytest.raises(MessageSizeTooLargeError): + await producer.send_and_wait("nonexistent_topic", value=b"x" * (10 * 1024 * 1024), key=KEY) + + +@pytest.mark.asyncio +@pytest.mark.snapshot() +async def test_getmany_single_message(): + topic = await create_topic("getmany_single_message") + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic]) as consumer: + await consumer.getmany(timeout_ms=1000) + await consumer.commit() + + +@pytest.mark.asyncio +@pytest.mark.snapshot(ignores=["meta.messaging.destination.name", "meta.kafka.topic"]) +async def test_getmany_multiple_messages_multiple_topics(): + topic = await create_topic("getmany_multiple_messages_multiple_topics") + topic_2 = await create_topic("getmany_multiple_messages_multiple_topics_2") + + async with producer_ctx([BOOTSTRAP_SERVERS] * 3) as producer: + await producer.send_and_wait(topic, PAYLOAD) + await producer.send_and_wait(topic, PAYLOAD) + await producer.send_and_wait(topic_2, PAYLOAD) + + async with consumer_ctx([topic, topic_2]) as consumer: + await consumer.getmany(timeout_ms=1000) + await consumer.commit() + + +@pytest.mark.asyncio +@pytest.mark.snapshot(ignores=["meta.tracestate"]) +async def test_send_and_wait_with_distributed_tracing(): + topic = await create_topic("send_and_wait_with_distributed_tracing") + + with override_config("aiokafka", dict(distributed_tracing_enabled=True)): + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait( + topic, value=PAYLOAD, key=KEY, headers=[("some_header", "some_value".encode("utf-8"))] + ) + + async with consumer_ctx([topic]) as consumer: + result = await consumer.getone() + await consumer.commit() + + assert has_header(result.headers, "x-datadog-trace-id") + assert has_header(result.headers, "some_header") + + +@pytest.mark.asyncio +@pytest.mark.snapshot(ignores=["meta._dd.span_links", "meta.messaging.destination.name", "meta.kafka.topic"]) +async def test_getmany_multiple_messages_multiple_topics_with_distributed_tracing(): + tracer = DummyTracer() + + topic = await create_topic("getmany_distributed_tracing") + topic_2 = await create_topic("getmany_distributed_tracing_2") + + with override_global_tracer(tracer): + with override_config("aiokafka", dict(distributed_tracing_enabled=True)): + async with producer_ctx([BOOTSTRAP_SERVERS] * 3) as producer: + await producer.send_and_wait(topic, PAYLOAD) + await producer.send_and_wait(topic, PAYLOAD) + await producer.send_and_wait(topic_2, PAYLOAD) + + async with consumer_ctx([topic, topic_2]) as consumer: + result = await consumer.getmany(timeout_ms=1000) + await consumer.commit() + + headers = next(iter(result.values()))[0].headers + assert has_header(headers, "x-datadog-trace-id") + + spans = tracer.pop() + consumer_span = find_span_by_name(spans, "kafka.consume") + + assert consumer_span is not None, "Consumer span not found" + + span_links = consumer_span._links + assert span_links is not None, "Consumer span should have span links" + assert len(span_links) == 3, "Consumer span should have at least one span link" diff --git a/tests/contrib/aiokafka/test_aiokafka_dsm.py b/tests/contrib/aiokafka/test_aiokafka_dsm.py new file mode 100644 index 00000000000..3720542935d --- /dev/null +++ b/tests/contrib/aiokafka/test_aiokafka_dsm.py @@ -0,0 +1,344 @@ +from aiokafka.structs import TopicPartition +import mock +import pytest + +from ddtrace.contrib.internal.aiokafka.patch import patch +from ddtrace.contrib.internal.aiokafka.patch import unpatch +from ddtrace.internal.datastreams.processor import PROPAGATION_KEY_BASE_64 +from ddtrace.internal.datastreams.processor import ConsumerPartitionKey +from ddtrace.internal.datastreams.processor import PartitionKey +from tests.utils import DummyTracer +from tests.utils import override_global_tracer + +from .utils import BOOTSTRAP_SERVERS +from .utils import GROUP_ID +from .utils import KEY +from .utils import PAYLOAD +from .utils import consumer_ctx +from .utils import create_topic +from .utils import producer_ctx + + +@pytest.fixture(autouse=True) +def patch_aiokafka(): + patch() + yield + unpatch() + + +@pytest.fixture +def tracer(): + tracer = DummyTracer() + with override_global_tracer(tracer): + yield tracer + tracer.flush() + + +@pytest.fixture +def dsm_processor(tracer): + processor = tracer.data_streams_processor + with mock.patch("ddtrace.internal.datastreams.data_streams_processor", return_value=processor): + yield processor + # flush buckets for the next test run + processor.periodic() + + +@pytest.mark.asyncio +async def test_data_streams_headers(dsm_processor): + """Test that DSM pathway context headers are injected during send""" + topic = await create_topic("data_streams_headers") + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD) + + async with consumer_ctx([topic]) as consumer: + result = await consumer.getone() + await consumer.commit() + + assert any(header[0] == PROPAGATION_KEY_BASE_64 for header in result.headers) + assert len(dsm_processor._buckets) >= 1 + + +@pytest.mark.asyncio +async def test_data_streams_pathway_stats(dsm_processor): + topic = await create_topic("data_streams_pathway_stats") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic]) as consumer: + await consumer.getone() + await consumer.commit() + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + pathway_stats = bucket.pathway_stats + + producer_checkpoints = [k for k in pathway_stats.keys() if "direction:out" in k[0]] + assert len(producer_checkpoints) == 1, "Producer checkpoint should exist" + + consumer_checkpoints = [k for k in pathway_stats.keys() if "direction:in" in k[0]] + assert len(consumer_checkpoints) == 1 + + for key in pathway_stats.keys(): + stats = pathway_stats[key] + assert stats.full_pathway_latency.count >= 1 + assert stats.edge_latency.count >= 1 + + +@pytest.mark.asyncio +async def test_data_streams_offset_monitoring_auto_commit(dsm_processor): + topic = await create_topic("data_streams_offset_monitoring_auto_commit") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic], enable_auto_commit=True) as consumer: + message1 = await consumer.getone() + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + + assert len(bucket.latest_produce_offsets) > 0 + produce_offset = bucket.latest_produce_offsets.get(PartitionKey(topic, 0)) + assert produce_offset is not None and produce_offset >= 0 + + commit_offset = bucket.latest_commit_offsets.get(ConsumerPartitionKey(GROUP_ID, topic, 0)) + assert commit_offset is not None + assert commit_offset == message1.offset + 1 + + +@pytest.mark.asyncio +async def test_data_streams_offset_monitoring_manual_commit(dsm_processor): + topic = await create_topic("data_streams_offset_monitoring_manual_commit") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic], enable_auto_commit=False) as consumer: + await consumer.getone() + message2 = await consumer.getone() + await consumer.commit({TopicPartition(message2.topic, message2.partition): message2.offset + 1}) + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + + produce_offset = bucket.latest_produce_offsets.get(PartitionKey(topic, 0)) + assert produce_offset is not None and produce_offset >= 1 + + commit_offset = bucket.latest_commit_offsets.get(ConsumerPartitionKey(GROUP_ID, topic, 0)) + assert commit_offset is not None + assert commit_offset == message2.offset + 1 + + +@pytest.mark.asyncio +async def test_data_streams_offset_monitoring_manual_commit_tuple(dsm_processor): + topic = await create_topic("data_streams_offset_monitoring_manual_commit_tuple") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic], enable_auto_commit=False) as consumer: + # Read two messages, commit using tuple (offset, metadata) + await consumer.getone() + message2 = await consumer.getone() + await consumer.commit({TopicPartition(message2.topic, message2.partition): (message2.offset + 1, "meta")}) + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + + # latest commit offset should reflect the value from the tuple + commit_key = ConsumerPartitionKey(GROUP_ID, topic, 0) + commit_offset = bucket.latest_commit_offsets.get(commit_key) + assert commit_offset is not None + assert commit_offset == message2.offset + 1 + + +@pytest.mark.asyncio +async def test_data_streams_getmany(dsm_processor): + topic = await create_topic("data_streams_getmany") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic]) as consumer: + await consumer.getmany(timeout_ms=2000) + await consumer.commit() + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + pathway_stats = bucket.pathway_stats + + consumer_checkpoints = [k for k in pathway_stats.keys() if "direction:in" in k[0]] + assert len(consumer_checkpoints) >= 1 + + +@pytest.mark.asyncio +async def test_data_streams_payload_size_tracking(dsm_processor): + """Test exact payload size calculation including custom headers""" + topic = await create_topic("data_streams_payload_size") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + test_payload = b"test message with some content" + test_key = b"test_key" + custom_headers = [("custom-header", b"custom-value")] + + # Calculate expected size: + # - payload bytes + # - key bytes + # - custom headers (key + value) + # - DSM header key (PROPAGATION_KEY_BASE_64) + # - DSM header value (pathway context, ~28 bytes) + expected_payload_size = len(test_payload) + len(test_key) + for h_key, h_val in custom_headers: + expected_payload_size += len(h_key) + len(h_val) + expected_payload_size += len(PROPAGATION_KEY_BASE_64) # DSM header key + expected_payload_size += 28 # DSM header value (pathway context) + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=test_payload, key=test_key, headers=custom_headers) + + async with consumer_ctx([topic]) as consumer: + await consumer.getone() + await consumer.commit() + + buckets = dsm_processor._buckets + assert len(buckets) == 1 + + bucket = list(buckets.values())[0] + pathway_stats = bucket.pathway_stats + + for key, stats in pathway_stats.items(): + assert stats.payload_size.count >= 1 + assert stats.payload_size.sum == expected_payload_size + + +@pytest.mark.asyncio +async def test_data_streams_with_none_values(dsm_processor): + topic = await create_topic("data_streams_none_values") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=None) + + async with consumer_ctx([topic]) as consumer: + message = await consumer.getone() + await consumer.commit() + + buckets = dsm_processor._buckets + assert len(buckets) >= 1 + assert any(header[0] == PROPAGATION_KEY_BASE_64 for header in message.headers) + + +@pytest.mark.asyncio +async def test_data_streams_multiple_topics(dsm_processor): + topic1 = await create_topic("data_streams_multi_topic_1") + topic2 = await create_topic("data_streams_multi_topic_2") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic1, value=PAYLOAD, key=KEY) + await producer.send_and_wait(topic2, value=PAYLOAD, key=KEY) + + async with consumer_ctx([topic1, topic2]) as consumer: + await consumer.getmany(timeout_ms=2000) + await consumer.commit() + + buckets = dsm_processor._buckets + assert len(buckets) >= 1 + + bucket = list(buckets.values())[0] + pathway_stats = bucket.pathway_stats + + checkpoint_topics = set() + for key in pathway_stats.keys(): + edge_tags = key[0] + if "topic:" in edge_tags: + for tag in edge_tags.split(","): + if tag.startswith("topic:"): + checkpoint_topics.add(tag.split(":", 1)[1]) + + assert topic1 in checkpoint_topics or topic2 in checkpoint_topics + + +@pytest.mark.asyncio +async def test_data_streams_headers_edge_cases(dsm_processor): + """Non-UTF8 header bytes should decode with errors='ignore'; None-valued headers are ignored.""" + topic = await create_topic("data_streams_headers_edge_cases") + + try: + del dsm_processor._current_context.value + except AttributeError: + pass + + bad_bytes = b"\xff\xfe\xfa" # invalid UTF-8 + headers = [("non-utf8", bad_bytes), ("none-header", None)] + + async with producer_ctx([BOOTSTRAP_SERVERS]) as producer: + await producer.send_and_wait(topic, value=PAYLOAD, key=KEY, headers=headers) + + async with consumer_ctx([topic]) as consumer: + message = await consumer.getone() + await consumer.commit() + + # Sanity: message should carry our headers (Kafka allows None header values) + assert any(k == "non-utf8" for k, _ in (message.headers or [])) + assert any(k == "none-header" for k, _ in (message.headers or [])) + + # DSM should have processed without crashing and created buckets/stats + buckets = dsm_processor._buckets + assert len(buckets) >= 1 + bucket = list(buckets.values())[0] + assert len(bucket.pathway_stats) >= 1 diff --git a/tests/contrib/aiokafka/test_aiokafka_patch.py b/tests/contrib/aiokafka/test_aiokafka_patch.py new file mode 100644 index 00000000000..ee9e22c32ab --- /dev/null +++ b/tests/contrib/aiokafka/test_aiokafka_patch.py @@ -0,0 +1,26 @@ +from ddtrace.contrib.internal.aiokafka.patch import get_version +from ddtrace.contrib.internal.aiokafka.patch import patch + + +try: + from ddtrace.contrib.internal.aiokafka.patch import unpatch +except ImportError: + unpatch = None +from tests.contrib.patch import PatchTestCase + + +class TestAIOKafkaPatch(PatchTestCase.Base): + __integration_name__ = "aiokafka" + __module_name__ = "aiokafka" + __patch_func__ = patch + __unpatch_func__ = unpatch + __get_version__ = get_version + + def assert_module_patched(self, aiokafka): + pass + + def assert_not_module_patched(self, aiokafka): + pass + + def assert_not_module_double_patched(self, aiokafka): + pass diff --git a/tests/contrib/aiokafka/utils.py b/tests/contrib/aiokafka/utils.py new file mode 100644 index 00000000000..e6ba37f01b5 --- /dev/null +++ b/tests/contrib/aiokafka/utils.py @@ -0,0 +1,73 @@ +from contextlib import asynccontextmanager +import logging + +import aiokafka +from aiokafka.admin import AIOKafkaAdminClient +from aiokafka.admin import NewTopic + +from tests.contrib.config import KAFKA_CONFIG + + +logger = logging.getLogger(__name__) + +GROUP_ID = "test_group" +BOOTSTRAP_SERVERS = f"127.0.0.1:{KAFKA_CONFIG['port']}" +KEY = "test_key".encode("utf-8") +PAYLOAD = "hueh hueh hueh".encode("utf-8") +ENABLE_AUTO_COMMIT = True + + +def has_header(headers, header_name): + """Check if a header with the given name exists in the headers list.""" + return any(header[0] == header_name for header in headers) + + +def find_span_by_name(spans, span_name): + """Find the first span with the given name in a list of spans.""" + return next((span for span in spans if span.name == span_name), None) + + +async def create_topic(topic_name): + """Create a Kafka topic for testing""" + logger.debug("Creating topic %s", topic_name) + + client = AIOKafkaAdminClient(bootstrap_servers=[BOOTSTRAP_SERVERS]) + await client.start() + + try: + await client.delete_topics([topic_name]) + await client.create_topics([NewTopic(topic_name, 1, 1)]) + except Exception as e: + logger.error("Failed to delete/create topic %s: %s", topic_name, e) + finally: + await client.close() + + return topic_name + + +@asynccontextmanager +async def producer_ctx(bootstrap_servers): + """Context manager for producer - automatically starts and stops""" + producer = aiokafka.AIOKafkaProducer(bootstrap_servers=bootstrap_servers) + await producer.start() + try: + yield producer + finally: + await producer.stop() + + +@asynccontextmanager +async def consumer_ctx(topics, enable_auto_commit=ENABLE_AUTO_COMMIT): + """Context manager for consumer - automatically starts and stops""" + consumer = aiokafka.AIOKafkaConsumer( + bootstrap_servers=[BOOTSTRAP_SERVERS], + auto_offset_reset="earliest", + group_id=GROUP_ID, + enable_auto_commit=enable_auto_commit, + ) + consumer.subscribe(topics) + await consumer.start() + try: + yield consumer + finally: + await consumer.stop() diff --git a/tests/contrib/suitespec.yml b/tests/contrib/suitespec.yml index 830cc5511b3..761d964a0e6 100644 --- a/tests/contrib/suitespec.yml +++ b/tests/contrib/suitespec.yml @@ -6,6 +6,9 @@ components: - ddtrace/contrib/internal/aiohttp_jinja2/* aiopg: - ddtrace/contrib/internal/aiopg/* + aiokafka: + - ddtrace/contrib/internal/aiokafka/* + - ddtrace/ext/kafka.py algoliasearch: - ddtrace/contrib/internal/algoliasearch/* asgi: @@ -246,6 +249,24 @@ suites: services: - mysql snapshot: true + aiokafka: + env: + TEST_KAFKA_HOST: kafka + TEST_KAFKA_PORT: '29092' + parallelism: 2 + paths: + - '@bootstrap' + - '@core' + - '@tracing' + - '@contrib' + - '@aiokafka' + - '@datastreams' + - tests/contrib/aiokafka/* + - tests/snapshots/tests.contrib.aiokafka.* + runner: riot + services: + - kafka + snapshot: true aiopg: paths: - '@bootstrap' diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics.json new file mode 100644 index 00000000000..5ee3052c206 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics.json @@ -0,0 +1,149 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ce500000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.partitions.getmany_multiple_messages_multiple_topics": "0", + "kafka.partitions.getmany_multiple_messages_multiple_topics_2": "0", + "kafka.received_message": "True", + "kafka.topic": "getmany_multiple_messages_multiple_topics_2,getmany_multiple_messages_multiple_topics", + "language": "python", + "messaging.destination.name": "getmany_multiple_messages_multiple_topics_2", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "13276206259012605249", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 5781292, + "start": 1760967909477776763 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ce200000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_multiple_messages_multiple_topics", + "language": "python", + "messaging.destination.name": "getmany_multiple_messages_multiple_topics", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "5178547398973423610", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 1901084, + "start": 1760967906462330095 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 2, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ce200000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_multiple_messages_multiple_topics", + "language": "python", + "messaging.destination.name": "getmany_multiple_messages_multiple_topics", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "5178547398973423610", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 663459, + "start": 1760967906464408845 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 3, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ce200000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_multiple_messages_multiple_topics_2", + "language": "python", + "messaging.destination.name": "getmany_multiple_messages_multiple_topics_2", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "875021794267563953", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 646250, + "start": 1760967906465196054 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics_with_distributed_tracing.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics_with_distributed_tracing.json new file mode 100644 index 00000000000..dcbfd113ad6 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_multiple_messages_multiple_topics_with_distributed_tracing.json @@ -0,0 +1,150 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ce200000000", + "_dd.span_links": "[{\"trace_id\": \"68f63cdf000000001cc9579226385f5b\", \"span_id\": \"e54b6f614e404016\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68f63cdf00000000\", \"flags\": 1}, {\"trace_id\": \"68f63cdf00000000e03f84a05ca090d9\", \"span_id\": \"e3801c106e04c433\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68f63cdf00000000\", \"flags\": 1}, {\"trace_id\": \"68f63cdf0000000017d9677f07d90bc8\", \"span_id\": \"402ec13cabab97db\", \"tracestate\": \"dd=s:1;t.dm:-0;t.tid:68f63cdf00000000\", \"flags\": 1}]", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.partitions.getmany_distributed_tracing": "0", + "kafka.partitions.getmany_distributed_tracing_2": "0", + "kafka.received_message": "True", + "kafka.topic": "getmany_distributed_tracing_2,getmany_distributed_tracing", + "language": "python", + "messaging.destination.name": "getmany_distributed_tracing_2", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "1921471380508156691", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 9448209, + "start": 1760967906300032970 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdf00000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_distributed_tracing", + "language": "python", + "messaging.destination.name": "getmany_distributed_tracing", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "6910789329466797100", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2472542, + "start": 1760967903276925552 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 2, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdf00000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_distributed_tracing", + "language": "python", + "messaging.destination.name": "getmany_distributed_tracing", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "6910789329466797100", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 831209, + "start": 1760967903279592677 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 3, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdf00000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_distributed_tracing_2", + "language": "python", + "messaging.destination.name": "getmany_distributed_tracing_2", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "11432892279258327942", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 859375, + "start": 1760967903280579094 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_single_message.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_single_message.json new file mode 100644 index 00000000000..7a741d203cb --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_getmany_single_message.json @@ -0,0 +1,74 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ccf00000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.partitions.getmany_single_message": "0", + "kafka.received_message": "True", + "kafka.topic": "getmany_single_message", + "language": "python", + "messaging.destination.name": "getmany_single_message", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "8337510751329555796", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 4600208, + "start": 1760967887437949045 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ccc00000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "getmany_single_message", + "language": "python", + "messaging.destination.name": "getmany_single_message", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "16470507574917487756", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 0.75, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2989125, + "start": 1760967884417766835 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_commit_with_offset.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_commit_with_offset.json new file mode 100644 index 00000000000..ac626d2fdd5 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_commit_with_offset.json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd800000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_commit_with_offset", + "language": "python", + "messaging.destination.name": "send_and_wait_commit_with_offset", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "17411224485259863134", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 4602416, + "start": 1760967896864841841 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd500000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_commit_with_offset", + "language": "python", + "messaging.destination.name": "send_and_wait_commit_with_offset", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "15511482528767751360", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2693750, + "start": 1760967893847947798 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_failure.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_failure.json new file mode 100644 index 00000000000..cd8ed40c49d --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_failure.json @@ -0,0 +1,40 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 1, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f7841500000000", + "component": "aiokafka", + "error.message": "[Error 10] MessageSizeTooLargeError: The message is 10485790 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration", + "error.stack": "Traceback (most recent call last):\n File \"/home/bits/project/ddtrace/contrib/internal/aiokafka/patch.py\", line 96, in traced_send\n result = await func(*args, **kwargs)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n File \"/home/bits/project/.riot/venv_py3138_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_pytest-asyncio_pytest-randomly_aiokafka/lib/python3.13/site-packages/aiokafka/producer/producer.py\", line 517, in send\n key_bytes, value_bytes = self._serialize(topic, key, value)\n ~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^\n File \"/home/bits/project/.riot/venv_py3138_mock_pytest_pytest-mock_coverage_pytest-cov_opentracing_hypothesis6451_pytest-asyncio_pytest-randomly_aiokafka/lib/python3.13/site-packages/aiokafka/producer/producer.py\", line 419, in _serialize\n raise MessageSizeTooLargeError(\n ...<3 lines>...\n )\naiokafka.errors.MessageSizeTooLargeError: [Error 10] MessageSizeTooLargeError: The message is 10485790 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration\n", + "error.type": "aiokafka.errors.MessageSizeTooLargeError", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "nonexistent_topic", + "language": "python", + "messaging.destination.name": "nonexistent_topic", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "3702984373303822418", + "runtime-id": "1afe57a306314e9eb3a141f497b6e4ad", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 110108167, + "start": 1761051669828575716 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[None].json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[None].json new file mode 100644 index 00000000000..44f96010851 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[None].json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd500000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_key", + "language": "python", + "messaging.destination.name": "send_and_wait_key", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "16145856079313839398", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 6318542, + "start": 1760967893748593756 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd200000000", + "component": "aiokafka", + "kafka.message_key": "None", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_key", + "language": "python", + "messaging.destination.name": "send_and_wait_key", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "11242064880765793915", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2358959, + "start": 1760967890728968046 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[test_key].json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[test_key].json new file mode 100644 index 00000000000..6a3a6fdc174 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_key[test_key].json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ccc00000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_key", + "language": "python", + "messaging.destination.name": "send_and_wait_key", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "13905624306109291454", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 0.6666666666666667, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 9050667, + "start": 1760967884323094460 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cc900000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_key", + "language": "python", + "messaging.destination.name": "send_and_wait_key", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "17633263376796263666", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 0.5, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2945000, + "start": 1760967881306922792 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[None].json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[None].json new file mode 100644 index 00000000000..3d43d3919b8 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[None].json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdf00000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "True", + "kafka.topic": "send_and_wait_value", + "language": "python", + "messaging.destination.name": "send_and_wait_value", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "17189556702821699472", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 6829250, + "start": 1760967903109765052 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdc00000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "True", + "kafka.topic": "send_and_wait_value", + "language": "python", + "messaging.destination.name": "send_and_wait_value", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "12751285087032883523", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2672334, + "start": 1760967900089630217 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[hueh_hueh_hueh].json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[hueh_hueh_hueh].json new file mode 100644 index 00000000000..ce41579e142 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_value[hueh_hueh_hueh].json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cdb00000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_value", + "language": "python", + "messaging.destination.name": "send_and_wait_value", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "15018913676636240605", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 6557959, + "start": 1760967899986740842 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd800000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_value", + "language": "python", + "messaging.destination.name": "send_and_wait_value", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "12496240666428409309", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2872291, + "start": 1760967896968624633 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_with_distributed_tracing.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_with_distributed_tracing.json new file mode 100644 index 00000000000..2da67e7c810 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_and_wait_with_distributed_tracing.json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cc600000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_with_distributed_tracing", + "language": "python", + "messaging.destination.name": "send_and_wait_with_distributed_tracing", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "8657672220095517366", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 3649667, + "start": 1760967878191772888 + }, + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 2, + "parent_id": 1, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cc600000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_and_wait_with_distributed_tracing", + "language": "python", + "messaging.destination.name": "send_and_wait_with_distributed_tracing", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "7218040204031168113", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer", + "tracestate": "dd=p:e8c5a7b4c6d8232c;s:1;t.dm:-0;t.tid:68f63cc600000000" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 5274875, + "start": 1760967881209833333 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_commit.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_commit.json new file mode 100644 index 00000000000..8556118cef0 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_commit.json @@ -0,0 +1,77 @@ +[[ + { + "name": "kafka.consume", + "service": "kafka", + "resource": "kafka.consume", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63cd200000000", + "component": "aiokafka", + "kafka.group_id": "test_group", + "kafka.message_key": "test_key", + "kafka.message_offset": "-1", + "kafka.partition": "0", + "kafka.received_message": "True", + "kafka.tombstone": "False", + "kafka.topic": "send_commit", + "language": "python", + "messaging.destination.name": "send_commit", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "13566417131152319004", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "consumer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 6910792, + "start": 1760967890629236213 + }], +[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 1, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ccf00000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_commit", + "language": "python", + "messaging.destination.name": "send_commit", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "8923570457051879360", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 1790917, + "start": 1760967887613390711 + }]] diff --git a/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_multiple_servers.json b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_multiple_servers.json new file mode 100644 index 00000000000..a05329c7ed4 --- /dev/null +++ b/tests/snapshots/tests.contrib.aiokafka.test_aiokafka.test_send_multiple_servers.json @@ -0,0 +1,37 @@ +[[ + { + "name": "kafka.produce", + "service": "kafka", + "resource": "kafka.produce", + "trace_id": 0, + "span_id": 1, + "parent_id": 0, + "type": "worker", + "error": 0, + "meta": { + "_dd.base_service": "tests.contrib.aiokafka", + "_dd.p.dm": "-0", + "_dd.p.tid": "68f63ccf00000000", + "component": "aiokafka", + "kafka.message_key": "b'test_key'", + "kafka.partition": "None", + "kafka.tombstone": "False", + "kafka.topic": "send_multiple_servers", + "language": "python", + "messaging.destination.name": "send_multiple_servers", + "messaging.kafka.bootstrap.servers": "['127.0.0.1:29092', '127.0.0.1:29092', '127.0.0.1:29092']", + "messaging.system": "kafka", + "pathway.hash": "2090885717654752650", + "runtime-id": "4551835e79bb48a392bf2f601c73142b", + "span.kind": "producer" + }, + "metrics": { + "_dd.measured": 1, + "_dd.top_level": 1, + "_dd.tracer_kr": 1.0, + "_sampling_priority_v1": 1, + "process_id": 605 + }, + "duration": 2501500, + "start": 1760967887534415836 + }]]