pychanasync is a lightweight python package which brings Go-style channels to
python's asyncio concurrency world. It is an async-channel implementation,
providing a channel shaped tool for channel shaped problems.
pychanasync is implemented entirely around pythons asyncio event loop.
The implementation is lock free ,taking advantage of the single threaded
cooperative concurrency model.
It is designed and implemented to work with coroutines and not threads, providing safe and deterministic communication patterns without blocking the event loop.
Visit documentation site for more details.
- Buffered and unbuffered channel semantics - use either synchronous or buffered communication
- Async iteration over channels - Consume messages from a channel using
async forloops. - Context manager support - close channels and release resources when done with
async with. - Blocking/ awaitable operations -
await chan.push(value)andawait chan.pull()for safe, cooperative communication. - Non-blocking operations -
chan.push_nowait(value)andchan.pull_nowait()for buffered channels when you don’t want to suspend. - Select-like utility - wait on multiple channel operations concurrently, similar to Go’s select statement, in a clean and Pythonic way
pychanasync is available on PyPi
pip install pychanasyncChannels can be both buffered and unbuffered.
unbuffered channels have no internal buffer capacity. What this means is
every producer (push) will block/suspend until there is a ready consumer on
the other end of the channel (pull) and every consumer until there is a
ready producer on the other end of the channel.
from pychanasync import channel
#create unbuffered channel
ch = Channel()
# send
async ch.push("item") #blocks here
# receive
value = async ch.pull()buffered channels have an internal buffer capacity and can hold (N)
number of items at a time. When doing a push into a buffered channel, the
operation will only block when the buffer is full and until there is available
space to send the new item. Other than that the operation completes
and returns quickly.
Below is a buffered channel that can hold 300 items at a time.
from pychanasync import channel
ch = Channel(buffer=300)
# send
async ch.push("item")
# receive
value = async ch.pull()pychanasync supports async iteration, allowing you to consume items from a channel
in a clean way using async for loop.
We can rewrite our consumer above as
async def consumer(ch):
async for msg in ch:
print(f"Received: {msg}")Once the producer closes the channel , the iteration ends .
pychanasync has support for asynchronous context managers for automatic cleanup.
We can rewrite out producer component as
async def producer(channel):
async with channel as ch:
for i in range(3):
await ch.push(f"msg {i}")
print(f"Sent msg {i}")When the async-with block exits , the channel is closed automatically.
The chanselect utility method allows you to start and wait on multiple channel operations simultaneously,
returning the one that completes first.
synthax
chan, value = await chanselect(
(chan_a, chan_a.pull()),
(chan_b, chan_b.pull())
)checkout more features
import asyncio
from pychanasync import Channel
async def producer(ch):
for i in range(3):
await ch.push(f"msg {i}")
print(f"Sent msg {i}")
ch.close() # gracefully close when done
async def consumer(ch):
while True:
try:
msg = await ch.pull()
print(f"Received {msg}")
except ChannelClosed:
break
async def main():
ch = Channel(buffer=2)
await asyncio.gather(producer(ch), consumer(ch))
asyncio.run(main())To contribute or set up the project locally.
find the project source code on github
Clone the project
git clone https://github.com/Gwali-1/PY_CHANNELS_ASYNC
cd PY_CHANNELS_ASYNCInstall dependencies
pipenv install --dev
Running tests From the project root
pipenv run pytestInstalling the package locally From the project root
pip install -e .
