-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathconsumer_adv.py
More file actions
22 lines (20 loc) · 819 Bytes
/
consumer_adv.py
File metadata and controls
22 lines (20 loc) · 819 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from kafka import KafkaConsumer, KafkaProducer
import json
from producer import producer
from consumer import consumer
bootstrap_servers = ['localhost:9092']
def transfer_messages(source_topic, target_topic):
"""Consumes messages from a source topic, processes them (if needed),
and produces them to a target topic."""
for msg in consumer:
data = msg.value # Accessing the decoded JSON data
try:
producer.send(target_topic, data)
producer.flush()
print(f"Transferred message from topic '{source_topic}' to topic '{target_topic}'")
except Exception as e:
print(f"Error transferring message: {e}")
if __name__ == '__main__':
source_topic = 'source'
target_topic = 'target'
transfer_messages(source_topic, target_topic)