How to use asyncio with blocking method #9219
-
Is it possible to use For example I try the code below: import uasyncio
from pyb import LED, UART
import select
uart = UART(6, 9600)
p = select.poll()
p.register(uart, select.POLLIN)
async def ioCoroutine(led):
while True:
led.toggle()
await uasyncio.sleep(2)
async def uartCoroutine(led):
while True:
await p.ipoll()
data = uart.read()
# deal with uart data...
led.toggle()
await uasyncio.sleep(1)
async def main(led1, led2):
await uasyncio.gather(
ioCoroutine(led1),
uartCoroutine(led2)
)
uasyncio.run(main(LED(1), LED(3))) I wish the blocking method I know that I can use Similarity, I try: ...
from pyb import LED, CAN
...
can = CAN( ... )
can.setfilter(0, ... )
...
async def canCoroutine(led):
while True:
data = await can.recv(0, timeout=-1)
# deal with uart data...
led.toggle()
await uasyncio.sleep(1)
... The Would we need to add new code to micropython to let all these hardware bus blocking reader and |
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 1 reply
-
You need to use Unfortunately StreamReader doesn't quite work with CAN. It does support the relevant ioctl for asyncio to poll it, but does not support |
Beta Was this translation helpful? Give feedback.
You need to use
StreamReader
to use the UART asynchronously. Then you canawait uart.read()
. See @peterhinch's excellent docs here: https://github.com/peterhinch/micropython-async/blob/master/v3/docs/TUTORIAL.md#63-using-the-stream-mechanismUnfortunately StreamReader doesn't quite work with CAN. It does support the relevant ioctl for asyncio to poll it, but does not support
read
. What you could probably do is adapt StreamReader (see https://github.com/micropython/micropython/blob/master/extmod/uasyncio/stream.py ) to make a CANStreamReader. Basically it just needs a newread
method that callsrecv
instead.