End Specific Function in Micropython #10140
Replies: 3 comments 1 reply
-
Evidently you need a way to cancel the Under |
Beta Was this translation helpful? Give feedback.
-
The I made some examples how you can handle this: # import statements for all examples
import time
try:
import uasyncio as asyncio
# Micropython
except ImportError:
# Python
try:
import asyncio
except ImportError:
raise RuntimeError("asyncio is not supported")
# With a class 1
class ServoCommands:
def __init__(self, servo):
self.servo = servo
def do(self, action, *args, **kwargs):
if action == "a":
self.do_a(*args, **kwargs)
else:
print(f"Action do_{action} does not exist.")
def do_a(self, *args, **kwargs):
print("do_a:", args, kwargs)
servo_commands = ServoCommands(None)
while True:
# in your mainloop you could call this methods:
servo_commands.do("a")
# do_b does not exist, nothing will happen
servo_commands.do("b")
time.sleep(1)
# With a class 2
class ServoCommands:
def __init__(self, servo):
self.servo = servo
def do(self, action, *args, **kwargs):
try:
getattr(self, f"do_{action}")(*args, **kwargs)
except AttributeError:
print(f"Action do_{action} does not exist.")
def do_a(self, *args, **kwargs):
print("do_a:", args, kwargs)
servo_commands = ServoCommands(None)
while True:
# in your mainloop you could call this methods:
servo_commands.do("a")
# do_b does not exist, nothing will happen
servo_commands.do("b")
time.sleep(1)
# With a class 3 (async)
class ServoCommands:
def __init__(self, servo):
self.servo = servo
async def do(self, action, *args, **kwargs):
try:
await getattr(self, f"do_{action}")(*args, **kwargs)
except AttributeError:
print(f"Action do_{action} does not exist.")
async def do_a(self, *args, **kwargs):
print("do_a:", args, kwargs)
# move command 1
# allows to cancel the task between synchronous calls
# time.sleep() will block the whole eventloop
# don't use time.sleep in async functions/methods
await asyncio.sleep(0)
# move command 2
async def do_b(self, *args, **kwargs):
"""
Simulated slow async method
"""
await asyncio.sleep(10) # <- task gets canceled here
# ... synchronous code # <- task could not cancelled during synchronous code
# calling here time.sleep() will block the whole eventloop
#
# await asyncio.sleep(0) # <- or task gets canceled here
# ... synchronous code
async def main():
servo_commands = ServoCommands(None)
while True:
await servo_commands.do("a")
# await waits until servo_commands.do("a") has been finished
# creating a Task
task = asyncio.create_task(servo_commands.do("b"))
# task is not yet executing
await asyncio.sleep(1) # now the task is started by the loop
# cancel the task
task.cancel() # will silently cancel the task
await asyncio.sleep(1)
# Running main
asyncio.run(main())
# With a generator (most people don't know/use this)
def send_servo_command():
while True:
action = yield # <- first gen.send(None) returns None from this yield
# then with the next gen.send("str") the yield returns "str"
# and is assigned to action
if action == "a":
print("Do action a")
elif action == "b":
print("Do action b")
else:
print(f"Unknown command: '{action}'")
servo_gen = send_servo_command()
servo_gen.send(None) # starts the generator and it's mandatory to send as first a None
while True:
# later in your mainloop
servo_gen.send("a")
servo_gen.send("b")
servo_gen.send("c")
time.sleep(1) |
Beta Was this translation helpful? Give feedback.
-
@NahianMugdho There are two issues here, function dispatch and concurrency. @sosi-deadeye has suggested some solutions to function dispatch, but from the point of view of a beginner programmer there is nothing wrong with using The big problem is concurrency: you need to be a able to cancel a running task. The only manageable way to do this is with import uasyncio as asyncio
busy = None # Currently idle
async def forward(time): # Example moves forward for a period then stops
dc_motor.forward(100) # These functions are assumed to return immediately
dc_motor2.forward(100)
await asyncio.sleep_ms(time)
dc_motor.stop()
dc_motor2.stop()
busy = None
async def handle_commands():
while True:
f = await received_data() # also needs to be asynchronous
if busy is not None: # The robot is doing something
busy.cancel() # Stop it
if f == "f":
busy = asyncio.create_task(forward(500)) # Move forward for 500ms
elif f == "b":
# TODO
asyncio.run(handle_commands()) The assumption here is that |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
I want to temporarily end the task of a specific function by sending a string and again start it by sending another string. Here is my code-
I want to activate the auto() function by sending a string "a". This part works ok. But when I send any other function it doesn't stop. What should I do to stop this specific auto() function by sending a string? The code for the auto() function is below-
Beta Was this translation helpful? Give feedback.
All reactions