Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,35 @@
.. code:: python
import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
# Instrument kafka
AIOKafkaInstrumentor().instrument()
# report a span of type producer with the default settings
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.send('my-topic', b'raw_bytes')
async def produce():
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.start()
try:
await producer.send_and_wait('my-topic', b'raw_bytes')
finally:
await producer.stop()
# report a span of type consumer with the default settings
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
async for message in consumer:
# process message
async def consume():
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
await consumer.start()
try:
async for message in consumer:
# process message
print(message)
finally:
await consumer.stop()
asyncio.run(produce())
asyncio.run(consume())
The _instrument() method accepts the following keyword args:
tracer_provider (TracerProvider) - an optional tracer provider
Expand All @@ -47,12 +62,14 @@ def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
.. code:: python
from opentelemetry.instrumentation.kafka import AIOKafkaInstrumentor
import asyncio
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
async def async_produce_hook(span, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")
async def async_consume_hook(span, record, args, kwargs):
if span and span.is_recording():
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
Expand All @@ -62,8 +79,15 @@ async def async_consume_hook(span, record, args, kwargs):
# Using kafka as normal now will automatically generate spans,
# including user custom attributes added from the hooks
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.send('my-topic', b'raw_bytes')
async def produce():
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
await producer.start()
try:
await producer.send_and_wait('my-topic', b'raw_bytes')
finally:
await producer.stop()
asyncio.run(produce())
API
___
Expand Down