Proper Asyncio programming construct #12527
Replies: 7 comments 6 replies
-
The program structure looks OK. I think the line self.lcd_display("Sample Message") should read: self.lcd_display.display("Sample Message") Re async def display(self):
for n in range(10):
self.write(self.buf[1000*n: 1000*(n + 1)]) # Write out 1/10 of the buffer then
asyncio.sleep_ms(0) # Let other tasks run Whether to make a function synchronous or asynchronous depends on what the code must do in the context of the application as a whole. As a rule of thumb if something blocks for < 1ms it's best being synchronous. If it blocks for > 10ms then it's worth considering the effect on other tasks. The reason is that the overhead in task switching is about 300μs on most platforms. As for Lastly you don't need to trap the keyboard interrupt. This is in some tutorial samples to show how this can be done should you wish. |
Beta Was this translation helpful? Give feedback.
-
Hi Peter, thank you very much for helping out beginners like me. I especially like this nugget wisdom of yours below as I struggle to think when should I make it a synchronous or an asynchronous call
I hope you will be patient even more as I have some additional doubts. When I scanned something on my RFID reader and it detected a card I was hoping to validate it on my API server.
My initial work is to add an async code that will use I am second-guessing myself if this is the most python-asyncio way of doing this.
Hoping for your guidance. Thanks! |
Beta Was this translation helpful? Give feedback.
-
I have no experience of RFID so I'd appreciate some picture of the physical design of the system. Is the RFID reader a physical device attached to your MicroPython host? If so, how is it connected? What is the resource which you are accessing via HTTP? Is it on a LAN or on the internet? Please give me some picture of what the system aims to achieve. Re your code, if class RFIDReader:
async def get_card_info(self, card_id):
url = f"http://my-api-host:3000/cards/{card_id}"
r = urequests.get(url)
# MORE CODE
return card_details # Need this line
async def scan_cards(self):
while True:
print("Waiting for cards")
## CARD DETECTED -> VALIDATE WITH API
if stat == self.reader.OK:
(stat, uid) = self.reader.SelectTagSN()
card_details = await self.get_card_info(uid)
await asyncio.sleep_ms(100) I'm not sure what is your query about the method call, but the method of retrieving Until I have some grasp of the physical design it's hard to comment in detail but |
Beta Was this translation helpful? Give feedback.
-
Thank you for clarifying the design. I don't know of an async HTTP library for MicroPython, but this is not something I've investigated in detail. However the blocking of Re keypads they normally are wired as a crosspoint array. To read them you assert each column line in turn and read out any rows that are active. It would be feasible to write an async driver, and it's something I've considered occasionally, but as far as I'm aware nobody has actually done it. My switch and pushbutton drivers assume discrete switches wired either to Vcc or to Gnd. They would not work with a crosspoint array. |
Beta Was this translation helpful? Give feedback.
-
There's an async http client library here: https://github.com/micropython/micropython-lib/tree/master/micropython/uaiohttpclient |
Beta Was this translation helpful? Give feedback.
-
@neliel123 A wet afternoon got the better of me so I wrote this keypad driver. It handles key rollover (where a second key is pressed before the first is released). In principle it can handle any sized keypad array (because of Python's arbitrary precision arithmetic). # keys.py
import uasyncio as asyncio
class keypad:
def __init__(self, rowpins, colpins, callback): # Tuples of pins. Rows are OUT, cols are IN
self.rowpins = rowpins
self.colpins = colpins
self.nbuttons = len(rowpins) * len(colpins)
self.callback = callback
self.cur = 0
self.prev = 0
for opin in self.rowpins: # Initialise output pins
opin(0)
asyncio.create_task(self.scan())
async def scan(self):
while True:
await asyncio.sleep_ms(0)
self.cur = 0
for opin in self.rowpins:
opin(1) # Assert output
for ipin in self.colpins:
self.cur <<= 1
self.cur |= ipin()
opin(0)
if self.cur == self.prev: # No change
continue # No change
pressed = self.cur & ~self.prev
self.prev = self.cur
if not pressed: # Button release
continue
# A key was pressed
for v in range(self.nbuttons): # Find button index
if pressed & 1:
break
pressed >>= 1
self.callback(v)
await asyncio.sleep_ms(50) # Wait out bounce
# test script. Pins based on Rpi Pico.
from machine import Pin
rowpins = [Pin(p, Pin.OUT) for p in range(10, 14)]
colpins = [Pin(p, Pin.IN, Pin.PULL_DOWN) for p in range(16, 20)]
def cb(v):
print(v) # Print scan code.
async def main():
kp = keypad(rowpins, colpins, cb)
await asyncio.sleep(60)
asyncio.run(main()) It is rather crude, running a callback on a keypress but you might want to experiment. I plan to improve the API replacing the callback with a design based on an async iterator, which I'll post in my async library. Comments welcome. [EDIT] |
Beta Was this translation helpful? Give feedback.
-
For the benefit of anyone wanting to use this driver the following demonstrates its use. Pin definitions are for a Pico/Pico W: import asyncio
from primitives import Keyboard
from machine import Pin
rowpins = [Pin(p, Pin.OPEN_DRAIN) for p in range(10, 14)]
colpins = [Pin(p, Pin.IN, Pin.PULL_UP) for p in range(16, 20)]
async def main():
kp = Keyboard(rowpins, colpins)
async for scan_code in kp:
print(scan_code)
if not scan_code:
break # Quit on key with code 0
asyncio.run(main()) |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I am trying to understand and make use of Asyncio in my MicroPython project and I have been reading the awesome tutorial by Peter Hinch about this subject to the best of my ability. I have not used Asyncio so I am just learning through various available resources.
So I have an RFID reader with an LCD and some other buttons. This is my first attempt at making my own program that will run in a loop.
My doubts are:
await asyncio.sleep(1)
?I am a newbie so please enlighten me, especially on the asyncio part. I am learning things as I do it so I would like some guidance from experts.
Thanks in advance.
Beta Was this translation helpful? Give feedback.
All reactions