Skip to content

Commit 3a45440

Browse files
examples fixes
1 parent 36e512e commit 3a45440

File tree

5 files changed

+82
-36
lines changed

5 files changed

+82
-36
lines changed

examples/aiokafka/apache.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def create_producer(client_id):
2121
await producer.start()
2222
return producer
2323

24-
async def send_messages_to_topics(producer, topics, producer_name, num_messages=50):
24+
async def send_messages_to_topics(producer, topics, producer_name, num_messages=5):
2525
"""Send random JSON messages to specified Kafka topics"""
2626

2727
successful = 0
@@ -36,7 +36,7 @@ async def send_messages_to_topics(producer, topics, producer_name, num_messages=
3636

3737
# Send message to each topic
3838
for topic in topics:
39-
result = await producer.send_and_wait(topic, message)
39+
result = await producer.send(topic, message)
4040

4141
successful += 1
4242

@@ -55,30 +55,29 @@ async def main():
5555
try:
5656
# Create two separate producers
5757
producer1 = await create_producer('aiokafka-producer-1')
58-
producer2 = await create_producer('aiokafka-producer-2')
58+
# producer2 = await create_producer('aiokafka-producer-2')
5959

6060
# First producer sends to test-topic and test-topic-1
6161
topics1 = ['test-topic', 'test-topic-1']
6262
await send_messages_to_topics(producer1, topics1, 'aiokafka-producer-1')
6363

6464
# Second producer sends to test-topic-2 and test-topic-3
6565
topics2 = ['test-topic-2', 'test-topic-3']
66-
await send_messages_to_topics(producer2, topics2, 'aiokafka-producer-2')
67-
66+
# await send_messages_to_topics(producer2, topics2, 'aiokafka-producer-2')
67+
# Sleep for 10 minutes at the end
68+
print("Sleeping for 10 minutes...")
69+
await asyncio.sleep(600)
70+
print("Sleep completed")
6871
except Exception as e:
6972
print(f"Error: {e}")
7073
finally:
7174
if producer1:
7275
await producer1.stop()
7376
print("Producer 1 closed")
74-
if producer2:
75-
await producer2.stop()
76-
print("Producer 2 closed")
77+
# if producer2:
78+
# await producer2.stop()
79+
# print("Producer 2 closed")
7780

78-
# Sleep for 10 minutes at the end
79-
print("Sleeping for 10 minutes...")
80-
await asyncio.sleep(600)
81-
print("Sleep completed")
8281

8382
if __name__ == "__main__":
8483
# Run the async main function

examples/confluent_kafka/aiven.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def create_producer(client_id):
4242
'ssl.ca.location': SSL_CAFILE,
4343
'ssl.certificate.location': SSL_CERTFILE,
4444
'ssl.key.location': SSL_KEYFILE,
45-
'batch.num.messages': BATCH_SIZE,
45+
'batch.size': BATCH_SIZE,
4646
'linger.ms': LINGER_MS
4747
})
4848

examples/confluent_kafka/apache.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,8 @@ def send_messages_to_topics(producer, topics, producer_name, num_messages=50):
5656
print(f"Failed to send message {i+1}: {e}")
5757

5858
# Small delay between messages (optional)
59-
time.sleep(0.01)
59+
time.sleep(2)
6060

61-
producer.flush(timeout=30)
6261
print(f"\n{producer_name} Summary: {successful} successful, {failed} failed")
6362

6463
def main():
@@ -67,23 +66,31 @@ def main():
6766
try:
6867
# Create two separate producers
6968
producer1 = create_producer('confluent-kafka-producer-1')
70-
producer2 = create_producer('confluent-kafka-producer-2')
69+
# producer2 = create_producer('confluent-kafka-producer-2')
7170

7271
# First producer sends to test-topic and test-topic-1
7372
topics1 = ['test-topic', 'test-topic-1']
7473
send_messages_to_topics(producer1, topics1, 'confluent-kafka-producer-1')
7574

7675
# Second producer sends to test-topic-2 and test-topic-3
7776
topics2 = ['test-topic-2', 'test-topic-3']
78-
send_messages_to_topics(producer2, topics2, 'confluent-kafka-producer-2')
79-
77+
# send_messages_to_topics(producer2, topics2, 'confluent-kafka-producer-2')
78+
print("Sleeping...")
79+
time.sleep(30)
80+
print("Sleep completed")
8081
except Exception as e:
8182
print(f"Error: {e}")
8283

83-
# Sleep for 10 minutes at the end
84-
print("Sleeping for 10 minutes...")
85-
time.sleep(600)
86-
print("Sleep completed")
84+
# Explicitly set producers to None to force garbage collection
85+
print("Setting producers to None to trigger Superstream cleanup...")
86+
producer1 = None
87+
# producer2 = None
88+
89+
# Force garbage collection to trigger __del__ immediately
90+
import gc
91+
print("Forcing garbage collection...")
92+
gc.collect()
93+
print("Garbage collection completed")
8794

8895
if __name__ == "__main__":
8996
main()

examples/confluent_kafka/confluent.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ def create_producer(client_id):
4040
'sasl.password': SASL_PASSWORD,
4141
'client.id': client_id,
4242
'linger.ms': LINGER_MS,
43-
'batch.num.messages': BATCH_SIZE
43+
'batch.size': BATCH_SIZE
4444
})
4545

4646
def send_messages_to_topics(producer, topics, producer_name, num_messages=50):

examples/kafkapy/apache.py

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,49 @@
66
import time
77
from kafka import KafkaProducer
88
from kafka.errors import KafkaError
9-
from json_generator import generate_random_json
9+
10+
import random
11+
import string
12+
from datetime import datetime
13+
14+
def generate_random_json(min_size_kb=1):
15+
"""Generate a random JSON object of at least min_size_kb size"""
16+
base_data = {
17+
"timestamp": datetime.now().isoformat(),
18+
"event_id": f"evt_{random.randint(100000, 999999)}",
19+
"user_id": f"user_{random.randint(1000, 9999)}",
20+
"session_id": f"session_{random.randint(10000, 99999)}",
21+
"event_type": random.choice(["click", "view", "purchase", "login", "logout"]),
22+
"device_type": random.choice(["mobile", "desktop", "tablet"]),
23+
"os": random.choice(["Windows", "macOS", "Linux", "iOS", "Android"]),
24+
"browser": random.choice(["Chrome", "Firefox", "Safari", "Edge"]),
25+
"country": random.choice(["US", "UK", "DE", "FR", "JP", "BR", "IN"]),
26+
"metrics": {
27+
"load_time": round(random.uniform(0.1, 5.0), 3),
28+
"response_time": round(random.uniform(0.01, 1.0), 3),
29+
"cpu_usage": round(random.uniform(0, 100), 2),
30+
"memory_usage": round(random.uniform(0, 100), 2)
31+
}
32+
}
33+
34+
# Calculate current size
35+
current_json = json.dumps(base_data)
36+
current_size = len(current_json.encode('utf-8'))
37+
target_size = min_size_kb * 1024
38+
39+
# Add padding data if needed to reach target size
40+
if current_size < target_size:
41+
padding_size = target_size - current_size
42+
# Generate random string data for padding
43+
padding_data = {
44+
"additional_data": {
45+
f"field_{i}": ''.join(random.choices(string.ascii_letters + string.digits, k=50))
46+
for i in range(padding_size // 50)
47+
}
48+
}
49+
base_data.update(padding_data)
50+
51+
return base_data
1052

1153
def create_producer(client_id):
1254
"""Create and configure Kafka producer"""
@@ -19,7 +61,7 @@ def create_producer(client_id):
1961
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
2062
)
2163

22-
def send_messages_to_topics(producer, topics, producer_name, num_messages=50):
64+
def send_messages_to_topics(producer, topics, producer_name, num_messages=5):
2365
"""Send random JSON messages to specified Kafka topics"""
2466

2567
successful = 0
@@ -57,30 +99,28 @@ def main():
5799
try:
58100
# Create two separate producers
59101
producer1 = create_producer('kafka-python-producer-1')
60-
producer2 = create_producer('kafka-python-producer-2')
102+
# producer2 = create_producer('kafka-python-producer-2')
61103

62104
# First producer sends to test-topic and test-topic-1
63105
topics1 = ['test-topic', 'test-topic-1']
64106
send_messages_to_topics(producer1, topics1, 'kafka-python-producer-1')
65-
107+
# Sleep for 10 minutes at the end
108+
print("Sleeping for 10 minutes...")
109+
time.sleep(600)
110+
print("Sleep completed")
66111
# Second producer sends to test-topic-2 and test-topic-3
67112
topics2 = ['test-topic-2', 'test-topic-3']
68-
send_messages_to_topics(producer2, topics2, 'kafka-python-producer-2')
113+
# send_messages_to_topics(producer2, topics2, 'kafka-python-producer-2')
69114

70115
except Exception as e:
71116
print(f"Error: {e}")
72117
finally:
73118
if producer1:
74119
producer1.close()
75120
print("Producer 1 closed")
76-
if producer2:
77-
producer2.close()
78-
print("Producer 2 closed")
79-
80-
# Sleep for 10 minutes at the end
81-
print("Sleeping for 10 minutes...")
82-
time.sleep(600)
83-
print("Sleep completed")
121+
# if producer2:
122+
# producer2.close()
123+
# print("Producer 2 closed")
84124

85125
if __name__ == "__main__":
86126
main()

0 commit comments

Comments
 (0)