Skip to content

Commit 10a3dd7

Browse files
committed
Added asyncio producer example
1 parent 2805c3f commit 10a3dd7

File tree

1 file changed

+111
-0
lines changed

1 file changed

+111
-0
lines changed

examples/asyncio.py

Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
# Copyright 2019 Confluent Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
# Companion code to the blog post "Integrating Kafka With Python Asyncio Web Applications"
16+
# https://www.confluent.io/blog/[path-to-blog-post]
17+
18+
import asyncio
19+
from confluent_kafka import Producer
20+
from confluent_kafka import KafkaException
21+
from fastapi import FastAPI, HTTPException
22+
from pydantic import BaseModel
23+
from time import time
24+
from threading import Thread
25+
import uvicorn
26+
27+
28+
class AIOProducer:
29+
def __init__(self, configs, loop):
30+
self._loop = loop or asyncio.get_event_loop()
31+
self._producer = Producer(configs)
32+
self._cancelled = False
33+
self._poll_thread = Thread(target=self._poll_loop)
34+
self._poll_thread.start()
35+
36+
def _poll_loop(self):
37+
while not self._cancelled:
38+
self._producer.poll(0.1)
39+
40+
def close(self):
41+
self._cancelled = True
42+
self._poll_thread.join()
43+
44+
def async_produce(self, topic, value):
45+
result = self._loop.create_future()
46+
def ack(err, msg):
47+
if err:
48+
self._loop.call_soon_threadsafe(result.set_exception, KafkaException(err))
49+
else:
50+
self._loop.call_soon_threadsafe(result.set_result, msg)
51+
self._producer.produce(topic, value, on_delivery=ack)
52+
return result
53+
54+
def produce(self, topic, value, ack=None):
55+
self._producer.produce(topic, value, on_delivery=ack)
56+
57+
58+
app = FastAPI()
59+
60+
class Item(BaseModel):
61+
name: str
62+
63+
producer = None
64+
65+
@app.on_event("startup")
66+
async def startup_event():
67+
global producer
68+
producer = AIOProducer(
69+
{"bootstrap.servers": "localhost:9092"},
70+
asyncio.get_event_loop())
71+
72+
@app.on_event("shutdown")
73+
def shutdown_event():
74+
producer.close()
75+
76+
@app.post("/items1")
77+
async def create_item1(item: Item):
78+
try:
79+
result = await producer.async_produce("items", item.name)
80+
return { "timestamp": result.timestamp() }
81+
except KafkaException as ex:
82+
raise HTTPException(status_code=500, detail=ex.args[0].str())
83+
84+
cnt = 0
85+
def ack(err, msg):
86+
global cnt
87+
cnt = cnt + 1
88+
89+
@app.post("/items2")
90+
async def create_item2(item: Item):
91+
try:
92+
result = producer.produce("items", item.name, ack=ack)
93+
return { "timestamp": time() }
94+
except KafkaException as ex:
95+
raise HTTPException(status_code=500, detail=ex.args[0].str())
96+
97+
@app.post("/items3")
98+
async def create_item3(item: Item):
99+
try:
100+
result = producer.produce("items", item.name)
101+
return { "timestamp": time() }
102+
except KafkaException as ex:
103+
raise HTTPException(status_code=500, detail=ex.args[0].str())
104+
105+
@app.post("/items4")
106+
async def create_item4(item: Item):
107+
return { "timestamp": time() }
108+
109+
110+
if __name__ == '__main__':
111+
uvicorn.run(app, host='0.0.0.0', port=8000)

0 commit comments

Comments
 (0)