Skip to content

Commit 43dc4f7

Browse files
committed
Changes to produce implementations + flake8
1 parent 10a3dd7 commit 43dc4f7

File tree

1 file changed

+76
-17
lines changed

1 file changed

+76
-17
lines changed

examples/asyncio.py

Lines changed: 76 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,15 @@
1212
# See the License for the specific language governing permissions and
1313
# limitations under the License.
1414

15-
# Companion code to the blog post "Integrating Kafka With Python Asyncio Web Applications"
15+
# Companion code to the blog post "Integrating Kafka With Python
16+
# Asyncio Web Applications"
1617
# https://www.confluent.io/blog/[path-to-blog-post]
1718

19+
# Example Siege [https://github.com/JoeDog/siege] test:
20+
# siege -c 400 -r 200 'http://localhost:8000/items1 POST {"name":"testuser"}'
21+
1822
import asyncio
19-
from confluent_kafka import Producer
23+
import confluent_kafka
2024
from confluent_kafka import KafkaException
2125
from fastapi import FastAPI, HTTPException
2226
from pydantic import BaseModel
@@ -26,9 +30,9 @@
2630

2731

2832
class AIOProducer:
29-
def __init__(self, configs, loop):
33+
def __init__(self, configs, loop=None):
3034
self._loop = loop or asyncio.get_event_loop()
31-
self._producer = Producer(configs)
35+
self._producer = confluent_kafka.Producer(configs)
3236
self._cancelled = False
3337
self._poll_thread = Thread(target=self._poll_loop)
3438
self._poll_thread.start()
@@ -41,8 +45,9 @@ def close(self):
4145
self._cancelled = True
4246
self._poll_thread.join()
4347

44-
def async_produce(self, topic, value):
48+
def produce(self, topic, value):
4549
result = self._loop.create_future()
50+
4651
def ack(err, msg):
4752
if err:
4853
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
@@ -51,60 +56,114 @@ def ack(err, msg):
5156
self._producer.produce(topic, value, on_delivery=ack)
5257
return result
5358

54-
def produce(self, topic, value, ack=None):
59+
def produce2(self, topic, value, on_delivery):
60+
result = self._loop.create_future()
61+
62+
def ack(err, msg):
63+
if err:
64+
self._loop.call_soon_threadsafe(
65+
result.set_exception, KafkaException(err))
66+
else:
67+
self._loop.call_soon_threadsafe(
68+
result.set_result, msg)
69+
if on_delivery:
70+
self._loop.call_soon_threadsafe(
71+
on_delivery, err, msg)
5572
self._producer.produce(topic, value, on_delivery=ack)
73+
return result
74+
75+
76+
class Producer:
77+
def __init__(self, configs):
78+
self._producer = confluent_kafka.Producer(configs)
79+
self._cancelled = False
80+
self._poll_thread = Thread(target=self._poll_loop)
81+
self._poll_thread.start()
82+
83+
def _poll_loop(self):
84+
while not self._cancelled:
85+
self._producer.poll(0.1)
86+
87+
def close(self):
88+
self._cancelled = True
89+
self._poll_thread.join()
90+
91+
def produce(self, topic, value, on_delivery=None):
92+
self._producer.produce(topic, value, on_delivery=on_delivery)
5693

5794

95+
config = {"bootstrap.servers": "localhost:9092"}
96+
5897
app = FastAPI()
5998

99+
60100
class Item(BaseModel):
61101
name: str
62102

103+
104+
aio_producer = None
63105
producer = None
64106

107+
65108
@app.on_event("startup")
66109
async def startup_event():
67-
global producer
68-
producer = AIOProducer(
69-
{"bootstrap.servers": "localhost:9092"},
70-
asyncio.get_event_loop())
110+
global producer, aio_producer
111+
aio_producer = AIOProducer(config, asyncio.get_event_loop())
112+
producer = Producer(config)
113+
71114

72115
@app.on_event("shutdown")
73116
def shutdown_event():
117+
aio_producer.close()
74118
producer.close()
75119

120+
76121
@app.post("/items1")
77122
async def create_item1(item: Item):
78123
try:
79-
result = await producer.async_produce("items", item.name)
80-
return { "timestamp": result.timestamp() }
124+
result = await aio_producer.produce("items", item.name)
125+
return {"timestamp": result.timestamp()}
81126
except KafkaException as ex:
82127
raise HTTPException(status_code=500, detail=ex.args[0].str())
83128

84129
cnt = 0
130+
131+
85132
def ack(err, msg):
86133
global cnt
87134
cnt = cnt + 1
88135

136+
89137
@app.post("/items2")
90138
async def create_item2(item: Item):
91139
try:
92-
result = producer.produce("items", item.name, ack=ack)
93-
return { "timestamp": time() }
140+
aio_producer.produce2("items", item.name, on_delivery=ack)
141+
return {"timestamp": time()}
94142
except KafkaException as ex:
95143
raise HTTPException(status_code=500, detail=ex.args[0].str())
96144

145+
97146
@app.post("/items3")
98147
async def create_item3(item: Item):
99148
try:
100-
result = producer.produce("items", item.name)
101-
return { "timestamp": time() }
149+
producer.produce("items", item.name, on_delivery=ack)
150+
return {"timestamp": time()}
102151
except KafkaException as ex:
103152
raise HTTPException(status_code=500, detail=ex.args[0].str())
104153

154+
105155
@app.post("/items4")
106156
async def create_item4(item: Item):
107-
return { "timestamp": time() }
157+
try:
158+
producer.produce("items", item.name)
159+
return {"timestamp": time()}
160+
except KafkaException as ex:
161+
raise HTTPException(status_code=500, detail=ex.args[0].str())
162+
163+
164+
@app.post("/items5")
165+
async def create_item5(item: Item):
166+
return {"timestamp": time()}
108167

109168

110169
if __name__ == '__main__':

0 commit comments

Comments
 (0)