Skip to content

Commit fa09ca0

Browse files
committed
Improve kafka-python instrumentation examples
1 parent cc86d47 commit fa09ca0

File tree

1 file changed

+9
-0
lines changed
  • instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka

1 file changed

+9
-0
lines changed

instrumentation/opentelemetry-instrumentation-kafka-python/src/opentelemetry/instrumentation/kafka/__init__.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ def consume_hook(span: Span, record: kafka.record.ABCRecord, args, kwargs)
5757
def produce_hook(span, args, kwargs):
5858
if span and span.is_recording():
5959
span.set_attribute("custom_user_attribute_from_produce_hook", "some-value")
60+
6061
def consume_hook(span, record, args, kwargs):
6162
if span and span.is_recording():
6263
span.set_attribute("custom_user_attribute_from_consume_hook", "some-value")
@@ -69,6 +70,14 @@ def consume_hook(span, record, args, kwargs):
6970
producer = KafkaProducer(bootstrap_servers=['localhost:9092'])
7071
producer.send('my-topic', b'raw_bytes')
7172
73+
def process_msg(message):
74+
print(message)
75+
76+
consumer = KafkaConsumer('my-topic', group_id='my-group', bootstrap_servers=['localhost:9092'])
77+
for message in consumer:
78+
# process message
79+
process_msg(message)
80+
7281
API
7382
___
7483
"""

0 commit comments

Comments
 (0)