Skip to content

Commit a8cd5c8

Browse files
authored
Merge pull request #1961 from dhalbert/asyncio-1
move Cooperative Multitasking code to repo
2 parents 12b023f + 042d233 commit a8cd5c8

File tree

8 files changed

+357
-0
lines changed

8 files changed

+357
-0
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import asyncio
6+
7+
import board
8+
import keypad
9+
import neopixel
10+
from rainbowio import colorwheel
11+
12+
pixel_pin = board.A0
13+
num_pixels = 24
14+
15+
pixels = neopixel.NeoPixel(pixel_pin, num_pixels, brightness=0.03, auto_write=False)
16+
17+
18+
class Controls:
19+
def __init__(self):
20+
self.reverse = False
21+
self.wait = 0.0
22+
23+
24+
async def rainbow_cycle(controls):
25+
while True:
26+
# Increment by 2 instead of 1 to speed the cycle up a bit.
27+
for j in range(255, -1, -2) if controls.reverse else range(0, 256, 2):
28+
for i in range(num_pixels):
29+
rc_index = (i * 256 // num_pixels) + j
30+
pixels[i] = colorwheel(rc_index & 255)
31+
pixels.show()
32+
await asyncio.sleep(controls.wait)
33+
34+
35+
async def monitor_buttons(reverse_pin, slower_pin, faster_pin, controls):
36+
"""Monitor buttons that reverse direction and change animation speed.
37+
Assume buttons are active low.
38+
"""
39+
with keypad.Keys(
40+
(reverse_pin, slower_pin, faster_pin), value_when_pressed=False, pull=True
41+
) as keys:
42+
while True:
43+
key_event = keys.events.get()
44+
if key_event and key_event.pressed:
45+
key_number = key_event.key_number
46+
if key_number == 0:
47+
controls.reverse = not controls.reverse
48+
elif key_number == 1:
49+
# Lengthen the interval.
50+
controls.wait = controls.wait + 0.001
51+
elif key_number == 2:
52+
# Shorten the interval.
53+
controls.wait = max(0.0, controls.wait - 0.001)
54+
# Let another task run.
55+
await asyncio.sleep(0)
56+
57+
58+
async def main():
59+
controls = Controls()
60+
61+
buttons_task = asyncio.create_task(
62+
monitor_buttons(board.A1, board.A2, board.A3, controls)
63+
)
64+
animation_task = asyncio.create_task(rainbow_cycle(controls))
65+
66+
# This will run forever, because no tasks ever finish.
67+
await asyncio.gather(buttons_task, animation_task)
68+
69+
70+
asyncio.run(main())
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import time
6+
7+
import board
8+
import digitalio
9+
10+
11+
def blink(pin, interval, count):
12+
with digitalio.DigitalInOut(pin) as led:
13+
led.switch_to_output(value=False)
14+
for _ in range(count):
15+
led.value = True
16+
time.sleep(interval)
17+
led.value = False
18+
time.sleep(interval)
19+
20+
21+
def main():
22+
blink(board.D1, 0.25, 10)
23+
print("done")
24+
25+
26+
main()
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
# DOESN'T WORK
6+
import time
7+
8+
import board
9+
import digitalio
10+
11+
12+
def blink(pin, interval, count):
13+
with digitalio.DigitalInOut(pin) as led:
14+
led.switch_to_output(value=False)
15+
for _ in range(count):
16+
led.value = True
17+
time.sleep(interval)
18+
led.value = False
19+
time.sleep(interval)
20+
21+
22+
def main():
23+
blink(board.D1, 0.25, 10)
24+
# DOESN'T WORK
25+
# Second LED blinks only after the first one is finished.
26+
blink(board.D2, 0.1, 20)
27+
28+
29+
main()
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import time
6+
7+
import board
8+
import digitalio
9+
10+
11+
class Blinker:
12+
def __init__(self, led, interval, count):
13+
self.led = led
14+
self.interval = interval
15+
# Count both on and off.
16+
self.count2 = count * 2
17+
self.last_transition = 0
18+
19+
def blink(self):
20+
"""Return False when blinking is finished."""
21+
if self.count2 <= 0:
22+
return False
23+
now = time.monotonic()
24+
if now > self.last_transition + self.interval:
25+
self.led.value = not self.led.value
26+
self.last_transition = now
27+
self.count2 -= 1
28+
return True
29+
30+
31+
def main():
32+
with digitalio.DigitalInOut(board.D1) as led1, digitalio.DigitalInOut(
33+
board.D2
34+
) as led2:
35+
led1.switch_to_output(value=False)
36+
led2.switch_to_output(value=False)
37+
38+
blinker1 = Blinker(led1, 0.25, 10)
39+
blinker2 = Blinker(led2, 0.1, 20)
40+
running1 = True
41+
running2 = True
42+
while running1 or running2:
43+
running1 = blinker1.blink()
44+
running2 = blinker2.blink()
45+
print("done")
46+
47+
48+
main()
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import asyncio
6+
import board
7+
import digitalio
8+
import keypad
9+
10+
11+
class Interval:
12+
"""Simple class to hold an interval value. Use .value to to read or write."""
13+
14+
def __init__(self, initial_interval):
15+
self.value = initial_interval
16+
17+
18+
async def monitor_interval_buttons(pin_slower, pin_faster, interval):
19+
"""Monitor two buttons: one lengthens the interval, the other shortens it.
20+
Change interval.value as appropriate.
21+
"""
22+
# Assume buttons are active low.
23+
with keypad.Keys(
24+
(pin_slower, pin_faster), value_when_pressed=False, pull=True
25+
) as keys:
26+
while True:
27+
key_event = keys.events.get()
28+
if key_event and key_event.pressed:
29+
if key_event.key_number == 0:
30+
# Lengthen the interval.
31+
interval.value += 0.1
32+
else:
33+
# Shorten the interval.
34+
interval.value = max(0.1, interval.value - 0.1)
35+
print("interval is now", interval.value)
36+
# Let another task run.
37+
await asyncio.sleep(0)
38+
39+
40+
async def blink(pin, interval):
41+
"""Blink the given pin forever.
42+
The blinking rate is controlled by the supplied Interval object.
43+
"""
44+
with digitalio.DigitalInOut(pin) as led:
45+
led.switch_to_output()
46+
while True:
47+
led.value = not led.value
48+
await asyncio.sleep(interval.value)
49+
50+
51+
async def main():
52+
# Start blinking 0.5 sec on, 0.5 sec off.
53+
interval = Interval(0.5)
54+
55+
led_task = asyncio.create_task(blink(board.D1, interval))
56+
interval_task = asyncio.create_task(
57+
monitor_interval_buttons(board.D3, board.D4, interval)
58+
)
59+
# This will run forever, because neither task ever exits.
60+
await asyncio.gather(led_task, interval_task)
61+
62+
63+
asyncio.run(main())
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import asyncio
6+
import board
7+
import digitalio
8+
9+
10+
async def blink(pin, interval, count): # Don't forget the async!
11+
with digitalio.DigitalInOut(pin) as led:
12+
led.switch_to_output(value=False)
13+
for _ in range(count):
14+
led.value = True
15+
await asyncio.sleep(interval) # Don't forget the await!
16+
led.value = False
17+
await asyncio.sleep(interval) # Don't forget the await!
18+
19+
20+
async def main(): # Don't forget the async!
21+
led_task = asyncio.create_task(blink(board.D1, 0.25, 10))
22+
await asyncio.gather(led_task) # Don't forget the await!
23+
print("done")
24+
25+
26+
asyncio.run(main())
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import asyncio
6+
import board
7+
import digitalio
8+
import keypad
9+
10+
11+
class Interval:
12+
"""Simple class to hold an interval value. Use .value to to read or write."""
13+
14+
def __init__(self, initial_interval):
15+
self.value = initial_interval
16+
17+
18+
async def monitor_interval_buttons(pin_slower, pin_faster, interval):
19+
"""Monitor two buttons: one lengthens the interval, the other shortens it.
20+
Change interval.value as appropriate.
21+
"""
22+
# Assume buttons are active low.
23+
with keypad.Keys(
24+
(pin_slower, pin_faster), value_when_pressed=False, pull=True
25+
) as keys:
26+
while True:
27+
key_event = keys.events.get()
28+
if key_event and key_event.pressed:
29+
if key_event.key_number == 0:
30+
# Lengthen the interval.
31+
interval.value += 0.1
32+
else:
33+
# Shorten the interval.
34+
interval.value = max(0.1, interval.value - 0.1)
35+
print("interval is now", interval.value)
36+
# Let another task run.
37+
await asyncio.sleep(0)
38+
39+
40+
async def blink(pin, interval):
41+
"""Blink the given pin forever.
42+
The blinking rate is controlled by the supplied Interval object.
43+
"""
44+
with digitalio.DigitalInOut(pin) as led:
45+
led.switch_to_output()
46+
while True:
47+
led.value = not led.value
48+
await asyncio.sleep(interval.value)
49+
50+
51+
async def main():
52+
interval1 = Interval(0.5)
53+
interval2 = Interval(1.0)
54+
55+
led1_task = asyncio.create_task(blink(board.D1, interval1))
56+
led2_task = asyncio.create_task(blink(board.D2, interval2))
57+
interval1_task = asyncio.create_task(
58+
monitor_interval_buttons(board.D3, board.D4, interval1)
59+
)
60+
interval2_task = asyncio.create_task(
61+
monitor_interval_buttons(board.D5, board.D6, interval2)
62+
)
63+
64+
await asyncio.gather(led1_task, led2_task, interval1_task, interval2_task)
65+
66+
67+
asyncio.run(main())
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# SPDX-FileCopyrightText: 2022 Dan Halbert for Adafruit Industries
2+
#
3+
# SPDX-License-Identifier: MIT
4+
5+
import asyncio
6+
import board
7+
import digitalio
8+
9+
10+
async def blink(pin, interval, count):
11+
with digitalio.DigitalInOut(pin) as led:
12+
led.switch_to_output(value=False)
13+
for _ in range(count):
14+
led.value = True
15+
await asyncio.sleep(interval) # Don't forget the "await"!
16+
led.value = False
17+
await asyncio.sleep(interval) # Don't forget the "await"!
18+
19+
20+
async def main():
21+
led1_task = asyncio.create_task(blink(board.D1, 0.25, 10))
22+
led2_task = asyncio.create_task(blink(board.D2, 0.1, 20))
23+
24+
await asyncio.gather(led1_task, led2_task) # Don't forget "await"!
25+
print("done")
26+
27+
28+
asyncio.run(main())

0 commit comments

Comments
 (0)