-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer.py
More file actions
22 lines (19 loc) · 739 Bytes
/
consumer.py
File metadata and controls
22 lines (19 loc) · 739 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from kafka import KafkaConsumer
import json
bootstrap_servers = ['localhost:9092']
# Creating a Kafka consumer
consumer = KafkaConsumer('source',
bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
value_deserializer=lambda v: json.loads(v.decode('utf-8')))
def consume_messages():
"""Consumes messages from a Kafka topic and prints them."""
try:
for msg in consumer:
print(f"Received message:",json.loads(msg.value))
except (KeyboardInterrupt,SystemExit):
print("Consumer stopped. ")
except Exception as e:
print(f"Error consuming message : {e}")
if __name__ == '__main__':
consume_messages()