diff --git a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py index 4b14ace4fb..5af87c6859 100644 --- a/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py @@ -20,6 +20,7 @@ .. code:: python + import asyncio from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor from aiokafka import AIOKafkaProducer, AIOKafkaConsumer @@ -27,13 +28,27 @@ AIOKafkaInstrumentor().instrument() # report a span of type producer with the default settings - producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092']) - await producer.send('my-topic', b'raw_bytes') + async def produce(): + producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092']) + await producer.start() + try: + await producer.send_and_wait('my-topic', b'raw_bytes') + finally: + await producer.stop() # report a span of type consumer with the default settings - consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) - async for message in consumer: - # process message + async def consume(): + consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092']) + await consumer.start() + try: + async for message in consumer: + # process message + print(message) + finally: + await consumer.stop() + + asyncio.run(produce()) + asyncio.run(consume()) The _instrument() method accepts the following keyword args: tracer_provider (TracerProvider) - an optional tracer provider @@ -47,12 +62,14 @@ def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs) .. code:: python - from opentelemetry.instrumentation.kafka import AIOKafkaInstrumentor + import asyncio + from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor from aiokafka import AIOKafkaProducer, AIOKafkaConsumer async def async_produce_hook(span, args, kwargs): if span and span.is_recording(): span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value") + async def async_consume_hook(span, record, args, kwargs): if span and span.is_recording(): span.set_attribute("custom_user_attribute_from_consume_hook", "some-value") @@ -62,8 +79,15 @@ async def async_consume_hook(span, record, args, kwargs): # Using kafka as normal now will automatically generate spans, # including user custom attributes added from the hooks - producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092']) - await producer.send('my-topic', b'raw_bytes') + async def produce(): + producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092']) + await producer.start() + try: + await producer.send_and_wait('my-topic', b'raw_bytes') + finally: + await producer.stop() + + asyncio.run(produce()) API ___