Skip to content

Commit dbaeb87

Browse files
committed
Refactor on async callbacks
1 parent 6aac78c commit dbaeb87

File tree

13 files changed

+1041
-95
lines changed

13 files changed

+1041
-95
lines changed

canopen/emcy.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import struct
33
import logging
44
import threading
5+
import asyncio
56
import time
67
from typing import Callable, List, Optional, TYPE_CHECKING
78

@@ -22,13 +23,15 @@ def __init__(self):
2223
#: Only active EMCYs. Will be cleared on Error Reset
2324
self.active: List["EmcyError"] = []
2425
self.callbacks = []
25-
self.emcy_received = threading.Condition()
26+
self.emcy_received = threading.Condition() # FIXME Async
27+
self.aemcy_received = asyncio.Condition()
2628

2729
def on_emcy(self, can_id, data, timestamp):
30+
# NOTE: Callback. Will be called from another thread
2831
code, register, data = EMCY_STRUCT.unpack(data)
2932
entry = EmcyError(code, register, data, timestamp)
3033

31-
with self.emcy_received:
34+
with self.emcy_received: # FIXME: Blocking
3235
if code & 0xFF00 == 0:
3336
# Error reset
3437
self.active = []
@@ -37,11 +40,31 @@ def on_emcy(self, can_id, data, timestamp):
3740
self.log.append(entry)
3841
self.emcy_received.notify_all()
3942

43+
# NOTE: Will be called from another thread
4044
for callback in self.callbacks:
41-
callback(entry)
45+
callback(entry) # FIXME: Assert if coroutine?
46+
47+
async def aon_emcy(self, can_id, data, timestamp):
48+
code, register, data = EMCY_STRUCT.unpack(data)
49+
entry = EmcyError(code, register, data, timestamp)
50+
51+
async with self.aemcy_received:
52+
if code & 0xFF00 == 0:
53+
# Error reset
54+
self.active = []
55+
else:
56+
self.active.append(entry)
57+
self.log.append(entry)
58+
self.aemcy_received.notify_all()
59+
60+
for callback in self.callbacks:
61+
res = callback(entry)
62+
if res is not None and asyncio.iscoroutine(res):
63+
await res
4264

4365
def add_callback(self, callback: Callable[["EmcyError"], None]):
44-
"""Get notified on EMCY messages from this node.
66+
"""Get notified on EMCY messages from this node. The callback must
67+
be multi-threaded.
4568
4669
:param callback:
4770
Callable which must take one argument of an
@@ -66,9 +89,9 @@ def wait(
6689
"""
6790
end_time = time.time() + timeout
6891
while True:
69-
with self.emcy_received:
92+
with self.emcy_received: # FIXME: Blocking
7093
prev_log_size = len(self.log)
71-
self.emcy_received.wait(timeout)
94+
self.emcy_received.wait(timeout) # FIXME: Blocking
7295
if len(self.log) == prev_log_size:
7396
# Resumed due to timeout
7497
return None

canopen/lss.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import logging
44
import time
55
import struct
6+
import asyncio
67
try:
78
import queue
89
except ImportError:
@@ -90,7 +91,8 @@ def __init__(self):
9091
self.network: Optional[Network] = None
9192
self._node_id = 0
9293
self._data = None
93-
self.responses = queue.Queue()
94+
self.responses = queue.Queue() # FIXME Async
95+
self.aresponses = asyncio.Queue()
9496

9597
def send_switch_state_global(self, mode):
9698
"""switch mode to CONFIGURATION_STATE or WAITING_STATE
@@ -247,12 +249,12 @@ def send_identify_non_configured_remote_slave(self):
247249
self.__send_command(message)
248250

249251
def fast_scan(self):
250-
"""This command sends a series of fastscan message
252+
"""This command sends a series of fastscan message
251253
to find unconfigured slave with lowest number of LSS idenities
252254
253255
:return:
254256
True if a slave is found.
255-
False if there is no candidate.
257+
False if there is no candidate.
256258
list is the LSS identities [vendor_id, product_code, revision_number, seerial_number]
257259
:rtype: bool, list
258260
"""
@@ -270,21 +272,21 @@ def fast_scan(self):
270272

271273
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
272274
lss_id[lss_sub] |= 1<<lss_bit_check
273-
275+
274276
time.sleep(0.01)
275-
277+
276278
lss_next = (lss_sub + 1) & 3
277279
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
278280
return False, None
279281

280282
time.sleep(0.01)
281-
283+
282284
# Now the next 32 bits will be scanned
283285
lss_sub += 1
284286

285287
# Now lss_id contains the entire 128 bits scanned
286288
return True, lss_id
287-
289+
288290
return False, None
289291

290292
def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next):
@@ -298,7 +300,7 @@ def __send_fast_scan_message(self, id_number, bit_checker, lss_sub, lss_next):
298300
cs = struct.unpack_from("<B", recv_msg)[0]
299301
if cs == CS_IDENTIFY_SLAVE:
300302
return True
301-
303+
302304
return False
303305

304306
def __send_lss_address(self, req_cs, number):
@@ -383,7 +385,7 @@ def __send_command(self, message):
383385
response = None
384386
if not self.responses.empty():
385387
logger.info("There were unexpected messages in the queue")
386-
self.responses = queue.Queue()
388+
self.responses = queue.Queue() # FIXME Async
387389

388390
self.network.send_message(self.LSS_TX_COBID, message)
389391

@@ -393,15 +395,19 @@ def __send_command(self, message):
393395
# Wait for the slave to respond
394396
# TODO check if the response is LSS response message
395397
try:
396-
response = self.responses.get(
398+
response = self.responses.get( # FIXME: Blocking
397399
block=True, timeout=self.RESPONSE_TIMEOUT)
398400
except queue.Empty:
399401
raise LssError("No LSS response received")
400402

401403
return response
402404

403405
def on_message_received(self, can_id, data, timestamp):
404-
self.responses.put(bytes(data))
406+
# NOTE: Callback. Will be called from another thread
407+
self.responses.put(bytes(data)) # FIXME: Blocking
408+
409+
async def aon_message_received(self, can_id, data, timestamp):
410+
await self.aresponses.put(bytes(data))
405411

406412

407413
class LssError(Exception):

canopen/network.py

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@
66
import logging
77
import threading
88
from typing import Callable, Dict, Iterable, List, Optional, Union, TYPE_CHECKING
9+
import asyncio
910

1011
if TYPE_CHECKING:
1112
from can import BusABC, Notifier
13+
from asyncio import AbstractEventLoop
1214

1315
try:
1416
import can
@@ -36,14 +38,19 @@
3638
class Network(MutableMapping):
3739
"""Representation of one CAN bus containing one or more nodes."""
3840

39-
def __init__(self, bus=None):
41+
def __init__(
42+
self,
43+
bus: Optional[BusABC] = None,
44+
loop: Optional[AbstractEventLoop] = None
45+
):
4046
"""
4147
:param can.BusABC bus:
4248
A python-can bus instance to re-use.
4349
"""
4450
#: A python-can :class:`can.BusABC` instance which is set after
4551
#: :meth:`canopen.Network.connect` is called
4652
self.bus: Optional[BusABC] = bus
53+
self.loop: Optional[asyncio.AbstractEventLoop] = loop
4754
#: A :class:`~canopen.network.NodeScanner` for detecting nodes
4855
self.scanner = NodeScanner(self)
4956
#: List of :class:`can.Listener` objects.
@@ -52,15 +59,19 @@ def __init__(self, bus=None):
5259
self.notifier: Optional[Notifier] = None
5360
self.nodes: Dict[int, Union[RemoteNode, LocalNode]] = {}
5461
self.subscribers: Dict[int, List[Callback]] = {}
55-
self.send_lock = threading.Lock()
62+
self.send_lock = threading.Lock() # FIXME Async
5663
self.sync = SyncProducer(self)
5764
self.time = TimeProducer(self)
5865
self.nmt = NmtMaster(0)
5966
self.nmt.network = self
6067

6168
self.lss = LssMaster()
6269
self.lss.network = self
63-
self.subscribe(self.lss.LSS_RX_COBID, self.lss.on_message_received)
70+
71+
if self.loop:
72+
self.subscribe(self.lss.LSS_RX_COBID, self.lss.aon_message_received)
73+
else:
74+
self.subscribe(self.lss.LSS_RX_COBID, self.lss.on_message_received)
6475

6576
def subscribe(self, can_id: int, callback: Callback) -> None:
6677
"""Listen for messages with a specific CAN ID.
@@ -119,8 +130,9 @@ def connect(self, *args, **kwargs) -> "Network":
119130
kwargs_notifier = {}
120131
if "loop" in kwargs:
121132
kwargs_notifier["loop"] = kwargs["loop"]
133+
self.loop = kwargs["loop"]
122134
del kwargs["loop"]
123-
self.bus = can.interface.Bus(*args, **kwargs)
135+
self.bus = can.Bus(*args, **kwargs)
124136
logger.info("Connected to '%s'", self.bus.channel_info)
125137
self.notifier = can.Notifier(self.bus, self.listeners, 1, **kwargs_notifier)
126138
return self
@@ -220,7 +232,9 @@ def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None:
220232
arbitration_id=can_id,
221233
data=data,
222234
is_remote_frame=remote)
223-
with self.send_lock:
235+
# NOTE: This lock is ok for async, because ther is only one thread
236+
# calling this function when using async, so it'll never lock.
237+
with self.send_lock: # FIXME: Blocking
224238
self.bus.send(msg)
225239
self.check()
226240

@@ -256,10 +270,13 @@ def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
256270
:param timestamp:
257271
Timestamp of the message, preferably as a Unix timestamp
258272
"""
259-
if can_id in self.subscribers:
260-
callbacks = self.subscribers[can_id]
273+
# NOTE: Callback. Will be called from another thread
274+
callbacks = self.subscribers.get(can_id)
275+
if callbacks is not None:
261276
for callback in callbacks:
262-
callback(can_id, data, timestamp)
277+
res = callback(can_id, data, timestamp)
278+
if res is not None and self.loop is not None and asyncio.iscoroutine(res):
279+
self.loop.create_task(res)
263280
self.scanner.on_message_received(can_id)
264281

265282
def check(self) -> None:
@@ -360,6 +377,7 @@ def __init__(self, network: Network):
360377
self.network = network
361378

362379
def on_message_received(self, msg):
380+
# NOTE: Callback. Will be called from another thread
363381
if msg.is_error_frame or msg.is_remote_frame:
364382
return
365383

@@ -394,9 +412,11 @@ def __init__(self, network: Optional[Network] = None):
394412
self.nodes: List[int] = []
395413

396414
def on_message_received(self, can_id: int):
415+
# NOTE: Callback. Will be called from another thread
397416
service = can_id & 0x780
398417
node_id = can_id & 0x7F
399418
if node_id not in self.nodes and node_id != 0 and service in self.SERVICES:
419+
# NOTE: Assume this is thread-safe
400420
self.nodes.append(node_id)
401421

402422
def reset(self):

0 commit comments

Comments
 (0)