-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
50 lines (37 loc) · 1017 Bytes
/
main.py
File metadata and controls
50 lines (37 loc) · 1017 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import asyncio
import ssl
from faststream import AckPolicy
from faststream.kafka import KafkaBroker, KafkaMessage
from faststream.security import BaseSecurity
ssl_context = ssl.create_default_context(
purpose=ssl.Purpose.SERVER_AUTH,
cafile="secrets/kafka/ca-cert.pem",
)
security = BaseSecurity(
ssl_context=ssl_context,
use_ssl=True,
)
broker = KafkaBroker(
bootstrap_servers="localhost:19092",
acks="all",
enable_idempotence=True,
client_id="Secure Kafka Broker",
security=security,
)
publisher = broker.publisher("")
@broker.subscriber(
"test-topic",
group_id="Secure Kafka Broker",
ack_policy=AckPolicy.MANUAL,
auto_offset_reset="earliest",
)
async def what(message: KafkaMessage):
print("RECEIVED:", message.body)
await message.ack()
async def main():
await broker.start()
await asyncio.sleep(2)
await publisher.publish("kafka hello", topic="test-topic")
await asyncio.sleep(5)
await broker.stop()
asyncio.run(main())