Skip to content

Commit 48f9c27

Browse files
authored
Fix deprecation of asyncio.get_event_loop() (#1235)
* Fix deprecation in test case * Modernize example * Better test_asyncio_notifier() * Format code with black * Attempt to fix typing; improve docs * try to fix mypy failure with type annotation * Better bus handling * Doc fix * Fix type errors in notifier.py * Format code with black * Fix example typing * Fix too long line in doc * Switch name from Listenable to MessageRecipient Co-authored-by: felixdivo <[email protected]>
1 parent f1808ed commit 48f9c27

File tree

5 files changed

+98
-98
lines changed

5 files changed

+98
-98
lines changed

can/interface.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def __new__( # type: ignore # pylint: disable=keyword-arg-before-vararg
103103
# resolve the bus class to use for that interface
104104
cls = _get_class_for_interface(kwargs["interface"])
105105

106-
# remove the 'interface' key so it doesn't get passed to the backend
106+
# remove the "interface" key, so it doesn't get passed to the backend
107107
del kwargs["interface"]
108108

109109
# make sure the bus can handle this config format

can/notifier.py

Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,30 @@
22
This module contains the implementation of :class:`~can.Notifier`.
33
"""
44

5-
from typing import Any, cast, Iterable, List, Optional, Union, Awaitable
5+
import asyncio
6+
import logging
7+
import threading
8+
import time
9+
from typing import Any, Callable, cast, Iterable, List, Optional, Union, Awaitable
610

711
from can.bus import BusABC
812
from can.listener import Listener
913
from can.message import Message
1014

11-
import threading
12-
import logging
13-
import time
14-
import asyncio
15-
1615
logger = logging.getLogger("can.Notifier")
1716

17+
MessageRecipient = Union[Listener, Callable[[Message], None]]
18+
1819

1920
class Notifier:
2021
def __init__(
2122
self,
2223
bus: Union[BusABC, List[BusABC]],
23-
listeners: Iterable[Listener],
24+
listeners: Iterable[MessageRecipient],
2425
timeout: float = 1.0,
2526
loop: Optional[asyncio.AbstractEventLoop] = None,
2627
) -> None:
27-
"""Manages the distribution of :class:`can.Message` instances to listeners.
28+
"""Manages the distribution of :class:`~can.Message` instances to listeners.
2829
2930
Supports multiple buses and listeners.
3031
@@ -35,11 +36,13 @@ def __init__(
3536
3637
3738
:param bus: A :ref:`bus` or a list of buses to listen to.
38-
:param listeners: An iterable of :class:`~can.Listener`
39-
:param timeout: An optional maximum number of seconds to wait for any message.
40-
:param loop: An :mod:`asyncio` event loop to schedule listeners in.
39+
:param listeners:
40+
An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
41+
and return nothing.
42+
:param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
43+
:param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
4144
"""
42-
self.listeners: List[Listener] = list(listeners)
45+
self.listeners: List[MessageRecipient] = list(listeners)
4346
self.bus = bus
4447
self.timeout = timeout
4548
self._loop = loop
@@ -101,8 +104,8 @@ def stop(self, timeout: float = 5) -> None:
101104
# reader is a file descriptor
102105
self._loop.remove_reader(reader)
103106
for listener in self.listeners:
104-
if hasattr(listener, "stop"):
105-
listener.stop()
107+
# Mypy prefers this over a hasattr(...) check
108+
getattr(listener, "stop", lambda: None)()
106109

107110
def _rx_thread(self, bus: BusABC) -> None:
108111
msg = None
@@ -150,9 +153,12 @@ def _on_error(self, exc: Exception) -> bool:
150153
was_handled = False
151154

152155
for listener in self.listeners:
153-
if hasattr(listener, "on_error"):
156+
on_error = getattr(
157+
listener, "on_error", None
158+
) # Mypy prefers this over hasattr(...)
159+
if on_error is not None:
154160
try:
155-
listener.on_error(exc)
161+
on_error(exc)
156162
except NotImplementedError:
157163
pass
158164
else:

examples/asyncio_demo.py

Lines changed: 38 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -5,57 +5,53 @@
55
"""
66

77
import asyncio
8+
from typing import List
9+
810
import can
11+
from can.notifier import MessageRecipient
912

1013

11-
def print_message(msg):
14+
def print_message(msg: can.Message) -> None:
1215
"""Regular callback function. Can also be a coroutine."""
1316
print(msg)
1417

1518

16-
async def main():
19+
async def main() -> None:
1720
"""The main function that runs in the loop."""
1821

19-
bus = can.Bus("vcan0", bustype="virtual", receive_own_messages=True)
20-
reader = can.AsyncBufferedReader()
21-
logger = can.Logger("logfile.asc")
22-
23-
listeners = [
24-
print_message, # Callback function
25-
reader, # AsyncBufferedReader() listener
26-
logger, # Regular Listener object
27-
]
28-
# Create Notifier with an explicit loop to use for scheduling of callbacks
29-
loop = asyncio.get_event_loop()
30-
notifier = can.Notifier(bus, listeners, loop=loop)
31-
# Start sending first message
32-
bus.send(can.Message(arbitration_id=0))
33-
34-
print("Bouncing 10 messages...")
35-
for _ in range(10):
36-
# Wait for next message from AsyncBufferedReader
37-
msg = await reader.get_message()
38-
# Delay response
39-
await asyncio.sleep(0.5)
40-
msg.arbitration_id += 1
41-
bus.send(msg)
42-
# Wait for last message to arrive
43-
await reader.get_message()
44-
print("Done!")
45-
46-
# Clean-up
47-
notifier.stop()
48-
bus.shutdown()
22+
with can.Bus( # type: ignore
23+
interface="virtual", channel="my_channel_0", receive_own_messages=True
24+
) as bus:
25+
reader = can.AsyncBufferedReader()
26+
logger = can.Logger("logfile.asc")
27+
28+
listeners: List[MessageRecipient] = [
29+
print_message, # Callback function
30+
reader, # AsyncBufferedReader() listener
31+
logger, # Regular Listener object
32+
]
33+
# Create Notifier with an explicit loop to use for scheduling of callbacks
34+
loop = asyncio.get_running_loop()
35+
notifier = can.Notifier(bus, listeners, loop=loop)
36+
# Start sending first message
37+
bus.send(can.Message(arbitration_id=0))
38+
39+
print("Bouncing 10 messages...")
40+
for _ in range(10):
41+
# Wait for next message from AsyncBufferedReader
42+
msg = await reader.get_message()
43+
# Delay response
44+
await asyncio.sleep(0.5)
45+
msg.arbitration_id += 1
46+
bus.send(msg)
47+
48+
# Wait for last message to arrive
49+
await reader.get_message()
50+
print("Done!")
51+
52+
# Clean-up
53+
notifier.stop()
4954

5055

5156
if __name__ == "__main__":
52-
try:
53-
# Get the default event loop
54-
LOOP = asyncio.get_event_loop()
55-
# Run until main coroutine finishes
56-
LOOP.run_until_complete(main())
57-
finally:
58-
LOOP.close()
59-
60-
# or on Python 3.7+ simply
61-
# asyncio.run(main())
57+
asyncio.run(main())

examples/serial_com.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ def receive(bus, stop_event):
4747

4848

4949
def main():
50-
"""Controles the sender and receiver."""
51-
with can.interface.Bus(bustype="serial", channel="/dev/ttyS10") as server:
52-
with can.interface.Bus(bustype="serial", channel="/dev/ttyS11") as client:
50+
"""Controls the sender and receiver."""
51+
with can.interface.Bus(interface="serial", channel="/dev/ttyS10") as server:
52+
with can.interface.Bus(interface="serial", channel="/dev/ttyS11") as client:
5353

5454
tx_msg = can.Message(
5555
arbitration_id=0x01,

test/notifier_test.py

Lines changed: 34 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -9,48 +9,46 @@
99

1010
class NotifierTest(unittest.TestCase):
1111
def test_single_bus(self):
12-
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
13-
reader = can.BufferedReader()
14-
notifier = can.Notifier(bus, [reader], 0.1)
15-
msg = can.Message()
16-
bus.send(msg)
17-
self.assertIsNotNone(reader.get_message(1))
18-
notifier.stop()
19-
bus.shutdown()
12+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
13+
reader = can.BufferedReader()
14+
notifier = can.Notifier(bus, [reader], 0.1)
15+
msg = can.Message()
16+
bus.send(msg)
17+
self.assertIsNotNone(reader.get_message(1))
18+
notifier.stop()
2019

2120
def test_multiple_bus(self):
22-
bus1 = can.Bus(0, bustype="virtual", receive_own_messages=True)
23-
bus2 = can.Bus(1, bustype="virtual", receive_own_messages=True)
24-
reader = can.BufferedReader()
25-
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
26-
msg = can.Message()
27-
bus1.send(msg)
28-
time.sleep(0.1)
29-
bus2.send(msg)
30-
recv_msg = reader.get_message(1)
31-
self.assertIsNotNone(recv_msg)
32-
self.assertEqual(recv_msg.channel, 0)
33-
recv_msg = reader.get_message(1)
34-
self.assertIsNotNone(recv_msg)
35-
self.assertEqual(recv_msg.channel, 1)
36-
notifier.stop()
37-
bus1.shutdown()
38-
bus2.shutdown()
21+
with can.Bus(0, interface="virtual", receive_own_messages=True) as bus1:
22+
with can.Bus(1, interface="virtual", receive_own_messages=True) as bus2:
23+
reader = can.BufferedReader()
24+
notifier = can.Notifier([bus1, bus2], [reader], 0.1)
25+
msg = can.Message()
26+
bus1.send(msg)
27+
time.sleep(0.1)
28+
bus2.send(msg)
29+
recv_msg = reader.get_message(1)
30+
self.assertIsNotNone(recv_msg)
31+
self.assertEqual(recv_msg.channel, 0)
32+
recv_msg = reader.get_message(1)
33+
self.assertIsNotNone(recv_msg)
34+
self.assertEqual(recv_msg.channel, 1)
35+
notifier.stop()
3936

4037

4138
class AsyncNotifierTest(unittest.TestCase):
4239
def test_asyncio_notifier(self):
43-
loop = asyncio.get_event_loop()
44-
bus = can.Bus("test", bustype="virtual", receive_own_messages=True)
45-
reader = can.AsyncBufferedReader()
46-
notifier = can.Notifier(bus, [reader], 0.1, loop=loop)
47-
msg = can.Message()
48-
bus.send(msg)
49-
future = asyncio.wait_for(reader.get_message(), 1.0)
50-
recv_msg = loop.run_until_complete(future)
51-
self.assertIsNotNone(recv_msg)
52-
notifier.stop()
53-
bus.shutdown()
40+
async def run_it():
41+
with can.Bus("test", interface="virtual", receive_own_messages=True) as bus:
42+
reader = can.AsyncBufferedReader()
43+
notifier = can.Notifier(
44+
bus, [reader], 0.1, loop=asyncio.get_running_loop()
45+
)
46+
bus.send(can.Message())
47+
recv_msg = await asyncio.wait_for(reader.get_message(), 0.5)
48+
self.assertIsNotNone(recv_msg)
49+
notifier.stop()
50+
51+
asyncio.run(run_it())
5452

5553

5654
if __name__ == "__main__":

0 commit comments

Comments
 (0)