Skip to content

Commit ae1cedb

Browse files
committed
Add MultiButton support - so you can await on multiple buttons
1 parent 718ecd5 commit ae1cedb

File tree

4 files changed

+246
-13
lines changed

4 files changed

+246
-13
lines changed

async_button.py

Lines changed: 116 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,11 +31,12 @@
3131
__repo__ = "https://github.com/furbrain/CircuitPython_async_button.git"
3232

3333
import asyncio
34+
from asyncio import Task, Event
3435

3536
from adafruit_ticks import ticks_add, ticks_less, ticks_ms
3637

3738
try:
38-
from typing import Dict, Sequence, Awaitable
39+
from typing import Dict, Sequence, Awaitable, Any, Union
3940
except ImportError:
4041
pass
4142

@@ -100,6 +101,52 @@ async def released(self):
100101
await asyncio.sleep(self.interval)
101102

102103

104+
class TaskWrapper:
105+
"""
106+
Create a task to run coro, then trigger event when finished
107+
Shouldn't really need this but CircuitPython does not have asyncio.wait or Task.result()
108+
"""
109+
110+
def __init__(self, coro: Awaitable, event: Event):
111+
"""
112+
:param Awaitable coro: coroutine to run
113+
:param asyncio.Event event:
114+
"""
115+
self._result = None
116+
self.coro = coro
117+
self.event = event
118+
self.task = asyncio.create_task(self._wait())
119+
120+
async def _wait(self):
121+
self._result = await self.coro
122+
self.event.set()
123+
124+
def done(self):
125+
"""
126+
Check whether task has completed
127+
128+
:return: True if task has completed
129+
"""
130+
return self.task.done()
131+
132+
def result(self):
133+
"""
134+
Get return value from task
135+
136+
:return: Whatever the task returned.
137+
"""
138+
if not self.done():
139+
raise AssertionError("Task has not yet completed")
140+
return self._result
141+
142+
def cancel(self):
143+
"""
144+
Cancel the task
145+
:return:
146+
"""
147+
self.task.cancel()
148+
149+
103150
class Button:
104151
"""
105152
This object will monitor the specified pin for changes and will report
@@ -251,17 +298,12 @@ def _trigger(self, event: int):
251298
evt.set()
252299
evt.clear()
253300

254-
@staticmethod
255-
async def _set_event_when_done(coro: Awaitable, event: asyncio.Event):
256-
await coro
257-
event.set()
258-
259-
async def wait(self, click_types: Sequence[int] = ALL_EVENTS):
301+
async def wait(self, click_types: Union[int, Sequence[int]] = ALL_EVENTS):
260302
"""
261303
Wait for the first of the specified events.
262304
263-
:param List[int] click_types: One or more events to listen for. Default is to listen
264-
for all events
305+
:param (List[int] | int) click_types: List of events to listen for. You can also pass a
306+
single event type in. Default is to listen for all events.
265307
:return: A list of the clicks that actually happened.
266308
267309
:example:
@@ -274,13 +316,19 @@ async def wait(self, click_types: Sequence[int] = ALL_EVENTS):
274316
>>> # do something
275317
276318
"""
277-
evts: Dict[int, asyncio.Task] = {}
319+
if isinstance(click_types, int):
320+
click_types = [click_types]
321+
# shortcut for efficiency: if only one click type awaited, just await that
322+
if len(click_types) == 1:
323+
await self.events[click_types[0]].wait()
324+
return [click_types[0]]
325+
326+
# multiple click types - needs more complex algorithm
327+
evts: Dict[int, TaskWrapper] = {}
278328
one_event_done = asyncio.Event()
279329
for evt_type in click_types:
280330
coro = self.events[evt_type].wait()
281-
evts[evt_type] = asyncio.create_task(
282-
self._set_event_when_done(coro, one_event_done)
283-
)
331+
evts[evt_type] = TaskWrapper(coro, one_event_done)
284332
await one_event_done.wait()
285333
if len(evts) > 1:
286334
await asyncio.sleep(0) # ensure all event types get an opportunity to run
@@ -307,3 +355,58 @@ def deinit(self):
307355
"""
308356
self.monitor_task.cancel()
309357
self.keys.deinit()
358+
359+
360+
class MultiButton:
361+
"""
362+
This class allows you to await the first click from any of two or more buttons
363+
"""
364+
365+
def __init__(self, **kwargs):
366+
"""
367+
368+
:param kwargs: pass each button that you want to be able to listen to with its name
369+
370+
:example:
371+
.. code-block:: python
372+
373+
>>> multi = MultiButton(a = button_a, b=button_b)
374+
>>> button, result = await multi.wait(a=Button.SINGLE, b= (Button.DOUBLE, Button.LONG))
375+
>>> # Long click on button B
376+
>>> print(button, result) # "b", Button.Long
377+
"""
378+
for button in kwargs.values():
379+
if not isinstance(button, Button):
380+
raise TypeError("Must pass in async_button.Button as parameters")
381+
self.buttons: Dict[Any, Button] = kwargs
382+
383+
async def wait(self, **kwargs):
384+
"""
385+
Wait for any specified clicks
386+
387+
:param kwargs: pass by keyword what clicks you want to listen for
388+
:return: button, click type
389+
:example:
390+
.. code-block:: python
391+
392+
>>> multi = MultiButton(a = button_a, b=button_b)
393+
>>> button, result = await multi.wait(a=Button.SINGLE, b= (Button.DOUBLE, Button.LONG))
394+
>>> # Long click on button B
395+
>>> print(button, result) # "b", Button.Long
396+
"""
397+
if len(kwargs) == 1:
398+
button = list(kwargs)[0]
399+
results = await self.buttons[button].wait(kwargs[button])
400+
return button, results[0]
401+
tasks: Dict[Any, Task] = {}
402+
click_happened = asyncio.Event()
403+
for key, value in kwargs.items():
404+
tasks[key] = TaskWrapper(self.buttons[key].wait(value), click_happened)
405+
await click_happened.wait()
406+
result = (None, None)
407+
for key, task in tasks.items():
408+
if task.done():
409+
result = (key, task.result()[0])
410+
else:
411+
task.cancel()
412+
return result

examples/async_multibutton_test.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# SPDX-FileCopyrightText: 2017 Scott Shawcroft, written for Adafruit Industries
2+
# SPDX-FileCopyrightText: Copyright (c) 2023 Phil Underwood for Underwood Underground
3+
#
4+
# SPDX-License-Identifier: Unlicense
5+
import asyncio
6+
7+
import board
8+
9+
from async_button import Button, MultiButton
10+
11+
CLICK_NAMES = {
12+
Button.SINGLE: "Single click",
13+
Button.DOUBLE: "Double click",
14+
Button.TRIPLE: "Triple click",
15+
Button.LONG: "Long click",
16+
}
17+
18+
19+
async def counter():
20+
i = 0
21+
while True:
22+
print(f"COUNTER: {i}")
23+
await asyncio.sleep(1)
24+
i += 1
25+
26+
27+
async def click_watcher(button: MultiButton):
28+
while True:
29+
button_name, click = await button.wait(a=Button.ANY_CLICK, b=Button.ANY_CLICK)
30+
print(f"{button_name}: {CLICK_NAMES[click]} seen")
31+
32+
33+
async def main():
34+
# note Button must be created in an async environment
35+
button_a = Button(
36+
board.D3,
37+
value_when_pressed=False,
38+
long_click_enable=True,
39+
)
40+
button_b = Button(
41+
board.D4,
42+
value_when_pressed=False,
43+
long_click_enable=True,
44+
)
45+
multibutton = MultiButton(a=button_a, b=button_b)
46+
47+
await asyncio.gather(counter(), click_watcher(multibutton))
48+
49+
50+
asyncio.run(main())

tests/test_async_button.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,12 @@ async def test_single_click(self):
120120
await self.wait_event_with_timeout([async_button.Button.SINGLE])
121121
self.assertAlmostEqual(self.time_count, 0.20, delta=0.1)
122122

123+
async def test_single_click_specified_without_list(self):
124+
self.button = FastButton(self.pin, True)
125+
self.button_timings = [0.10, 0.20]
126+
await self.wait_event_with_timeout(async_button.Button.SINGLE)
127+
self.assertAlmostEqual(self.time_count, 0.20, delta=0.1)
128+
123129
async def test_double_click(self):
124130
self.button = FastButton(self.pin, True)
125131
self.button_timings = [0.10, 0.30, 0.5, 0.7]

tests/test_async_multi_button.py

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
# SPDX-FileCopyrightText: Copyright (c) 2023 Phil Underwood for Underwood Underground
2+
#
3+
# SPDX-License-Identifier: MIT
4+
from functools import partial
5+
from unittest import IsolatedAsyncioTestCase
6+
from unittest.mock import MagicMock, AsyncMock
7+
import sys
8+
import asyncio
9+
10+
sys.modules["countio"] = MagicMock()
11+
12+
import async_button # pylint: disable=wrong-import-position
13+
14+
SINGLE = async_button.Button.SINGLE
15+
DOUBLE = async_button.Button.DOUBLE
16+
LONG = async_button.Button.LONG
17+
18+
19+
async def wait_and_return(delay: float, click_type):
20+
await asyncio.sleep(delay)
21+
if isinstance(click_type, int):
22+
click_type = [click_type]
23+
return click_type
24+
25+
26+
class TestButton(IsolatedAsyncioTestCase):
27+
# pylint: disable=invalid-name, too-many-public-methods
28+
def setUp(self) -> None:
29+
self.button_a = MagicMock(async_button.Button)
30+
self.button_a.wait = AsyncMock()
31+
self.button_b = MagicMock(async_button.Button)
32+
self.button_b.wait = AsyncMock()
33+
self.button_c = MagicMock(async_button.Button)
34+
self.button_c.wait = AsyncMock()
35+
36+
def testInitialise(self):
37+
async_button.MultiButton(a=self.button_a, b=self.button_b, c=self.button_c)
38+
39+
def testInitialiseFailsWithNonButtons(self):
40+
with self.assertRaises(TypeError):
41+
async_button.MultiButton(a=self.button_a, b=self.button_b, c=12)
42+
43+
async def testSimpleCase(self):
44+
multi = async_button.MultiButton(a=self.button_a)
45+
self.button_a.wait.return_value = [SINGLE]
46+
result = await multi.wait(a=SINGLE)
47+
self.assertEqual(("a", SINGLE), result)
48+
self.button_a.wait.assert_awaited_once()
49+
self.button_a.wait.assert_called_with(SINGLE)
50+
51+
async def testTwoButtons(self):
52+
multi = async_button.MultiButton(a=self.button_a, b=self.button_b)
53+
self.button_a.wait = partial(wait_and_return, 0.2)
54+
self.button_b.wait = partial(wait_and_return, 0.1)
55+
result = await multi.wait(a=SINGLE, b=DOUBLE)
56+
self.assertEqual(("b", DOUBLE), result)
57+
58+
async def testThreeButtons(self):
59+
multi = async_button.MultiButton(
60+
a=self.button_a, b=self.button_b, c=self.button_c
61+
)
62+
self.button_a.wait = partial(wait_and_return, 0.2)
63+
self.button_b.wait = partial(wait_and_return, 0.1)
64+
self.button_c.wait = partial(wait_and_return, 0.3)
65+
result = await multi.wait(a=SINGLE, b=DOUBLE, c=LONG)
66+
self.assertEqual(("b", DOUBLE), result)
67+
68+
async def testOneButtonTwoClicks(self):
69+
multi = async_button.MultiButton(a=self.button_a)
70+
self.button_a.wait.return_value = [DOUBLE]
71+
result = await multi.wait(a=[SINGLE, DOUBLE])
72+
self.assertEqual(("a", DOUBLE), result)
73+
self.button_a.wait.assert_awaited_once()
74+
self.button_a.wait.assert_called_with([SINGLE, DOUBLE])

0 commit comments

Comments
 (0)