Skip to content

Commit fe85968

Browse files
authored
Merge pull request #716 from confluentinc/asyncioeg
Added asyncio producer example
2 parents 6f7f2c1 + 71eb7a3 commit fe85968

File tree

1 file changed

+178
-0
lines changed

1 file changed

+178
-0
lines changed

examples/asyncio.py

Lines changed: 178 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,178 @@
1+
# flake8: noqa
2+
# Copyright 2019 Confluent Inc.
3+
#
4+
# Licensed under the Apache License, Version 2.0 (the "License");
5+
# you may not use this file except in compliance with the License.
6+
# You may obtain a copy of the License at
7+
#
8+
# http://www.apache.org/licenses/LICENSE-2.0
9+
#
10+
# Unless required by applicable law or agreed to in writing, software
11+
# distributed under the License is distributed on an "AS IS" BASIS,
12+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
# See the License for the specific language governing permissions and
14+
# limitations under the License.
15+
16+
# Companion code to the blog post "Integrating Kafka With Python
17+
# Asyncio Web Applications"
18+
# https://www.confluent.io/blog/[path-to-blog-post]
19+
20+
# Example Siege [https://github.com/JoeDog/siege] test:
21+
# $ siege -c 400 -r 200 'http://localhost:8000/items1 POST {"name":"testuser"}'
22+
23+
import asyncio
24+
import confluent_kafka
25+
from confluent_kafka import KafkaException
26+
from fastapi import FastAPI, HTTPException
27+
from pydantic import BaseModel
28+
from time import time
29+
from threading import Thread
30+
import uvicorn
31+
32+
33+
class AIOProducer:
34+
def __init__(self, configs, loop=None):
35+
self._loop = loop or asyncio.get_event_loop()
36+
self._producer = confluent_kafka.Producer(configs)
37+
self._cancelled = False
38+
self._poll_thread = Thread(target=self._poll_loop)
39+
self._poll_thread.start()
40+
41+
def _poll_loop(self):
42+
while not self._cancelled:
43+
self._producer.poll(0.1)
44+
45+
def close(self):
46+
self._cancelled = True
47+
self._poll_thread.join()
48+
49+
def produce(self, topic, value):
50+
"""
51+
An awaitable produce method.
52+
"""
53+
result = self._loop.create_future()
54+
55+
def ack(err, msg):
56+
if err:
57+
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
58+
else:
59+
self._loop.call_soon_threadsafe(result.set_result, msg)
60+
self._producer.produce(topic, value, on_delivery=ack)
61+
return result
62+
63+
def produce2(self, topic, value, on_delivery):
64+
"""
65+
A produce method in which delivery notifications are made available
66+
via both the returned future and on_delivery callback (if specified).
67+
"""
68+
result = self._loop.create_future()
69+
70+
def ack(err, msg):
71+
if err:
72+
self._loop.call_soon_threadsafe(
73+
result.set_exception, KafkaException(err))
74+
else:
75+
self._loop.call_soon_threadsafe(
76+
result.set_result, msg)
77+
if on_delivery:
78+
self._loop.call_soon_threadsafe(
79+
on_delivery, err, msg)
80+
self._producer.produce(topic, value, on_delivery=ack)
81+
return result
82+
83+
84+
class Producer:
85+
def __init__(self, configs):
86+
self._producer = confluent_kafka.Producer(configs)
87+
self._cancelled = False
88+
self._poll_thread = Thread(target=self._poll_loop)
89+
self._poll_thread.start()
90+
91+
def _poll_loop(self):
92+
while not self._cancelled:
93+
self._producer.poll(0.1)
94+
95+
def close(self):
96+
self._cancelled = True
97+
self._poll_thread.join()
98+
99+
def produce(self, topic, value, on_delivery=None):
100+
self._producer.produce(topic, value, on_delivery=on_delivery)
101+
102+
103+
config = {"bootstrap.servers": "localhost:9092"}
104+
105+
app = FastAPI()
106+
107+
108+
class Item(BaseModel):
109+
name: str
110+
111+
112+
aio_producer = None
113+
producer = None
114+
115+
116+
@app.on_event("startup")
117+
async def startup_event():
118+
global producer, aio_producer
119+
aio_producer = AIOProducer(config)
120+
producer = Producer(config)
121+
122+
123+
@app.on_event("shutdown")
124+
def shutdown_event():
125+
aio_producer.close()
126+
producer.close()
127+
128+
129+
@app.post("/items1")
130+
async def create_item1(item: Item):
131+
try:
132+
result = await aio_producer.produce("items", item.name)
133+
return {"timestamp": result.timestamp()}
134+
except KafkaException as ex:
135+
raise HTTPException(status_code=500, detail=ex.args[0].str())
136+
137+
cnt = 0
138+
139+
140+
def ack(err, msg):
141+
global cnt
142+
cnt = cnt + 1
143+
144+
145+
@app.post("/items2")
146+
async def create_item2(item: Item):
147+
try:
148+
aio_producer.produce2("items", item.name, on_delivery=ack)
149+
return {"timestamp": time()}
150+
except KafkaException as ex:
151+
raise HTTPException(status_code=500, detail=ex.args[0].str())
152+
153+
154+
@app.post("/items3")
155+
async def create_item3(item: Item):
156+
try:
157+
producer.produce("items", item.name, on_delivery=ack)
158+
return {"timestamp": time()}
159+
except KafkaException as ex:
160+
raise HTTPException(status_code=500, detail=ex.args[0].str())
161+
162+
163+
@app.post("/items4")
164+
async def create_item4(item: Item):
165+
try:
166+
producer.produce("items", item.name)
167+
return {"timestamp": time()}
168+
except KafkaException as ex:
169+
raise HTTPException(status_code=500, detail=ex.args[0].str())
170+
171+
172+
@app.post("/items5")
173+
async def create_item5(item: Item):
174+
return {"timestamp": time()}
175+
176+
177+
if __name__ == '__main__':
178+
uvicorn.run(app, host='127.0.0.1', port=8000)

0 commit comments

Comments
 (0)