Skip to content

Commit 45797ec

Browse files
authored
Improve aiokafka instrumentation examples (#3466)
1 parent 5c76d04 commit 45797ec

File tree

1 file changed

+32
-8
lines changed
  • instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka

1 file changed

+32
-8
lines changed

instrumentation/opentelemetry-instrumentation-aiokafka/src/opentelemetry/instrumentation/aiokafka/__init__.py

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,35 @@
2020
2121
.. code:: python
2222
23+
import asyncio
2324
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
2425
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
2526
2627
# Instrument kafka
2728
AIOKafkaInstrumentor().instrument()
2829
2930
# report a span of type producer with the default settings
30-
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
31-
await producer.send('my-topic', b'raw_bytes')
31+
async def produce():
32+
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
33+
await producer.start()
34+
try:
35+
await producer.send_and_wait('my-topic', b'raw_bytes')
36+
finally:
37+
await producer.stop()
3238
3339
# report a span of type consumer with the default settings
34-
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
35-
async for message in consumer:
36-
# process message
40+
async def consume():
41+
consumer = AIOKafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
42+
await consumer.start()
43+
try:
44+
async for message in consumer:
45+
# process message
46+
print(message)
47+
finally:
48+
await consumer.stop()
49+
50+
asyncio.run(produce())
51+
asyncio.run(consume())
3752
3853
The _instrument() method accepts the following keyword args:
3954
tracer_provider (TracerProvider) - an optional tracer provider
@@ -47,12 +62,14 @@ def async_consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
4762
4863
.. code:: python
4964
50-
from opentelemetry.instrumentation.kafka import AIOKafkaInstrumentor
65+
import asyncio
66+
from opentelemetry.instrumentation.aiokafka import AIOKafkaInstrumentor
5167
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
5268
5369
async def async_produce_hook(span, args, kwargs):
5470
if span and span.is_recording():
5571
span.set_attribute("custom_user_attribute_from_async_response_hook", "some-value")
72+
5673
async def async_consume_hook(span, record, args, kwargs):
5774
if span and span.is_recording():
5875
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
@@ -62,8 +79,15 @@ async def async_consume_hook(span, record, args, kwargs):
6279
6380
# Using kafka as normal now will automatically generate spans,
6481
# including user custom attributes added from the hooks
65-
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
66-
await producer.send('my-topic', b'raw_bytes')
82+
async def produce():
83+
producer = AIOKafkaProducer(bootstrap_servers=['localhost:9092'])
84+
await producer.start()
85+
try:
86+
await producer.send_and_wait('my-topic', b'raw_bytes')
87+
finally:
88+
await producer.stop()
89+
90+
asyncio.run(produce())
6791
6892
API
6993
___

0 commit comments

Comments
 (0)