Skip to content

Commit e3c84eb

Browse files
committed
Implement the framwork for sync back-end
* All callbacks are synchronous and same in both sync and async mode * Sync waiting is done with `asyncio.to_thread()` from async
1 parent c1e3659 commit e3c84eb

File tree

11 files changed

+127
-137
lines changed

11 files changed

+127
-137
lines changed

README.rst

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@ The library supports Python 3.8 or newer.
1111
This library is the asyncio port of CANopen. See below for code example.
1212

1313

14+
Branch notes
15+
------------
16+
This branch is work in progress, where the intent is to concept test running
17+
the backend callbacks and unchanged from the sync version. The sync-async
18+
crossing is done via sync waiting via `asyncio.to_thread()` in each class
19+
that needs it.
20+
21+
The goal was to simplify the impact of the async changes. Having an async
22+
backend requires a lot of duplication of code.
23+
24+
1425
Async status
1526
------------
1627

canopen/emcy.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing import Callable, List, Optional
88

99
from canopen.async_guard import ensure_not_async
10+
from canopen.utils import call_callbacks
1011
import canopen.network
1112

1213

@@ -25,7 +26,6 @@ def __init__(self):
2526
self.active: List[EmcyError] = []
2627
self.callbacks = []
2728
self.emcy_received = threading.Condition()
28-
self.aemcy_received = asyncio.Condition()
2929

3030
# @callback # NOTE: called from another thread
3131
@ensure_not_async # NOTE: Safeguard for accidental async use
@@ -43,28 +43,9 @@ def on_emcy(self, can_id, data, timestamp):
4343
self.log.append(entry)
4444
self.emcy_received.notify_all()
4545

46-
for callback in self.callbacks:
47-
# FIXME: Assert if callback is a coroutine?
48-
callback(entry)
49-
50-
# @callback
51-
async def aon_emcy(self, can_id, data, timestamp):
52-
code, register, data = EMCY_STRUCT.unpack(data)
53-
entry = EmcyError(code, register, data, timestamp)
54-
55-
async with self.aemcy_received:
56-
if code & 0xFF00 == 0:
57-
# Error reset
58-
self.active = []
59-
else:
60-
self.active.append(entry)
61-
self.log.append(entry)
62-
self.aemcy_received.notify_all()
63-
64-
for callback in self.callbacks:
65-
res = callback(entry)
66-
if res is not None and asyncio.iscoroutine(res):
67-
await res
46+
# Call all registered callbacks
47+
# FIXME: Add the nework loop to the callback
48+
call_callbacks(self.callbacks, None, entry)
6849

6950
def add_callback(self, callback: Callable[[EmcyError], None]):
7051
"""Get notified on EMCY messages from this node.
@@ -111,13 +92,17 @@ def wait(
11192
# This is the one we're interested in
11293
return emcy
11394

114-
def async_wait(
95+
async def async_wait(
11596
self, emcy_code: Optional[int] = None, timeout: float = 10
11697
) -> EmcyError:
117-
# FIXME: Implement this function
118-
raise NotImplementedError(
119-
"async_wait is not implemented."
120-
)
98+
"""Wait for a new EMCY to arrive.
99+
100+
:param emcy_code: EMCY code to wait for
101+
:param timeout: Max time in seconds to wait
102+
103+
:return: The EMCY exception object or None if timeout
104+
"""
105+
return await asyncio.to_thread(self.wait, emcy_code, timeout)
121106

122107

123108
class EmcyProducer:

canopen/lss.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,6 @@ def __init__(self) -> None:
8989
self._node_id = 0
9090
self._data = None
9191
self.responses = queue.Queue()
92-
self.aresponses = asyncio.Queue()
9392

9493
def send_switch_state_global(self, mode):
9594
"""switch mode to CONFIGURATION_STATE or WAITING_STATE
@@ -416,10 +415,6 @@ def on_message_received(self, can_id, data, timestamp):
416415
# NOTE: Blocking call
417416
self.responses.put(bytes(data))
418417

419-
# @callback
420-
async def aon_message_received(self, can_id, data, timestamp):
421-
await self.aresponses.put(bytes(data))
422-
423418

424419
class LssError(Exception):
425420
"""Some LSS operation failed."""

canopen/network.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,10 +62,7 @@ def __init__(self, bus: Optional[can.BusABC] = None, notifier: Optional[can.Noti
6262
# work. See async_guard.py
6363
set_async_sentinel(self.is_async())
6464

65-
if self.is_async():
66-
self.subscribe(self.lss.LSS_RX_COBID, self.lss.aon_message_received)
67-
else:
68-
self.subscribe(self.lss.LSS_RX_COBID, self.lss.on_message_received)
65+
self.subscribe(self.lss.LSS_RX_COBID, self.lss.on_message_received)
6966

7067
def subscribe(self, can_id: int, callback: Callback) -> None:
7168
"""Listen for messages with a specific CAN ID.
@@ -122,7 +119,7 @@ def connect(self, *args, **kwargs) -> Network:
122119
self.bus = can.Bus(*args, **kwargs)
123120
logger.info("Connected to '%s'", self.bus.channel_info)
124121
if self.notifier is None:
125-
self.notifier = can.Notifier(self.bus, [], self.NOTIFIER_CYCLE, loop=self.loop)
122+
self.notifier = can.Notifier(self.bus, [], self.NOTIFIER_CYCLE)
126123
for listener in self.listeners:
127124
self.notifier.add_listener(listener)
128125
return self
@@ -266,12 +263,10 @@ def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
266263
:param timestamp:
267264
Timestamp of the message, preferably as a Unix timestamp
268265
"""
269-
callbacks = self.subscribers.get(can_id)
270-
if callbacks is not None:
266+
if can_id in self.subscribers:
267+
callbacks = self.subscribers[can_id]
271268
for callback in callbacks:
272-
res = callback(can_id, data, timestamp)
273-
if res is not None and self.loop is not None and asyncio.iscoroutine(res):
274-
self.loop.create_task(res)
269+
callback(can_id, data, timestamp)
275270
self.scanner.on_message_received(can_id)
276271

277272
def check(self) -> None:

canopen/nmt.py

Lines changed: 10 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from typing import Callable, Optional, TYPE_CHECKING
77

88
from canopen.async_guard import ensure_not_async
9+
from canopen.utils import call_callbacks
910
import canopen.network
1011

1112
if TYPE_CHECKING:
@@ -121,22 +122,19 @@ def __init__(self, node_id: int):
121122
#: Timestamp of last heartbeat message
122123
self.timestamp: Optional[float] = None
123124
self.state_update = threading.Condition()
124-
self.astate_update = asyncio.Condition()
125125
self._callbacks = []
126126

127127
# @callback # NOTE: called from another thread
128128
@ensure_not_async # NOTE: Safeguard for accidental async use
129129
def on_heartbeat(self, can_id, data, timestamp):
130+
new_state, = struct.unpack_from("B", data)
131+
# Mask out toggle bit
132+
new_state &= 0x7F
133+
logger.debug("Received heartbeat can-id %d, state is %d", can_id, new_state)
134+
130135
# NOTE: Blocking lock
131136
with self.state_update:
132137
self.timestamp = timestamp
133-
new_state, = struct.unpack_from("B", data)
134-
# Mask out toggle bit
135-
new_state &= 0x7F
136-
logger.debug("Received heartbeat can-id %d, state is %d", can_id, new_state)
137-
for callback in self._callbacks:
138-
# FIXME: Assert if callback is coroutine?
139-
callback(new_state)
140138
if new_state == 0:
141139
# Boot-up, will go to PRE-OPERATIONAL automatically
142140
self._state = 127
@@ -145,25 +143,8 @@ def on_heartbeat(self, can_id, data, timestamp):
145143
self._state_received = new_state
146144
self.state_update.notify_all()
147145

148-
# @callback
149-
async def aon_heartbeat(self, can_id, data, timestamp):
150-
async with self.astate_update:
151-
self.timestamp = timestamp
152-
new_state, = struct.unpack_from("B", data)
153-
# Mask out toggle bit
154-
new_state &= 0x7F
155-
logger.debug("Received heartbeat can-id %d, state is %d", can_id, new_state)
156-
for callback in self._callbacks:
157-
res = callback(new_state)
158-
if res is not None and asyncio.iscoroutine(res):
159-
await res
160-
if new_state == 0:
161-
# Boot-up, will go to PRE-OPERATIONAL automatically
162-
self._state = 127
163-
else:
164-
self._state = new_state
165-
self._state_received = new_state
166-
self.astate_update.notify_all()
146+
# Call all registered callbacks
147+
call_callbacks(self._callbacks, self.network.loop, new_state)
167148

168149
def send_command(self, code: int):
169150
"""Send an NMT command code to the node.
@@ -190,13 +171,7 @@ def wait_for_heartbeat(self, timeout: float = 10):
190171

191172
async def await_for_heartbeat(self, timeout: float = 10):
192173
"""Wait until a heartbeat message is received."""
193-
async with self.astate_update:
194-
self._state_received = None
195-
try:
196-
await asyncio.wait_for(self.astate_update.wait(), timeout=timeout)
197-
except asyncio.TimeoutError:
198-
raise NmtError("No boot-up or heartbeat received")
199-
return self.state
174+
return await asyncio.to_thread(self.wait_for_heartbeat, timeout)
200175

201176
@ensure_not_async # NOTE: Safeguard for accidental async use
202177
def wait_for_bootup(self, timeout: float = 10) -> None:
@@ -216,17 +191,7 @@ def wait_for_bootup(self, timeout: float = 10) -> None:
216191

217192
async def await_for_bootup(self, timeout: float = 10) -> None:
218193
"""Wait until a boot-up message is received."""
219-
async def _wait_for_bootup():
220-
while True:
221-
async with self.astate_update:
222-
self._state_received = None
223-
await self.astate_update.wait()
224-
if self._state_received == 0:
225-
return
226-
try:
227-
await asyncio.wait_for(_wait_for_bootup(), timeout=timeout)
228-
except asyncio.TimeoutError:
229-
raise NmtError("Timeout waiting for boot-up message")
194+
return await asyncio.to_thread(self.wait_for_bootup, timeout)
230195

231196
def add_heartbeat_callback(self, callback: Callable[[int], None]):
232197
"""Add function to be called on heartbeat reception.

canopen/node/remote.py

Lines changed: 4 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -61,25 +61,17 @@ def associate_network(self, network: canopen.network.Network):
6161
self.nmt.network = network
6262
for sdo in self.sdo_channels:
6363
network.subscribe(sdo.tx_cobid, sdo.on_response)
64-
if network.is_async():
65-
network.subscribe(0x700 + self.id, self.nmt.aon_heartbeat)
66-
network.subscribe(0x80 + self.id, self.emcy.aon_emcy)
67-
else:
68-
network.subscribe(0x700 + self.id, self.nmt.on_heartbeat)
69-
network.subscribe(0x80 + self.id, self.emcy.on_emcy)
64+
network.subscribe(0x700 + self.id, self.nmt.on_heartbeat)
65+
network.subscribe(0x80 + self.id, self.emcy.on_emcy)
7066
network.subscribe(0, self.nmt.on_command)
7167

7268
def remove_network(self) -> None:
7369
if not self.has_network():
7470
return
7571
for sdo in self.sdo_channels:
7672
self.network.unsubscribe(sdo.tx_cobid, sdo.on_response)
77-
if self.network.is_async():
78-
self.network.unsubscribe(0x700 + self.id, self.nmt.aon_heartbeat)
79-
self.network.unsubscribe(0x80 + self.id, self.emcy.aon_emcy)
80-
else:
81-
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
82-
self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy)
73+
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
74+
self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy)
8375
self.network.unsubscribe(0, self.nmt.on_command)
8476
self.network = canopen.network._UNINITIALIZED_NETWORK
8577
self.sdo.network = canopen.network._UNINITIALIZED_NETWORK

canopen/pdo/base.py

Lines changed: 5 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from canopen import variable
1414
from canopen.async_guard import ensure_not_async
1515
from canopen.sdo import SdoAbortedError
16+
from canopen.utils import call_callbacks
1617

1718
if TYPE_CHECKING:
1819
from canopen import LocalNode, RemoteNode
@@ -221,7 +222,6 @@ def __init__(self, pdo_node, com_record, map_array):
221222
self.period: Optional[float] = None
222223
self.callbacks = []
223224
self.receive_condition = threading.Condition()
224-
self.areceive_condition = asyncio.Condition()
225225
self.is_received: bool = False
226226
self._task = None
227227

@@ -337,25 +337,9 @@ def on_message(self, can_id, data, timestamp):
337337
self.period = timestamp - self.timestamp
338338
self.timestamp = timestamp
339339
self.receive_condition.notify_all()
340-
for callback in self.callbacks:
341-
# FIXME: Assert on couroutines?
342-
callback(self)
343340

344-
# @callback
345-
async def aon_message(self, can_id, data, timestamp):
346-
is_transmitting = self._task is not None
347-
if can_id == self.cob_id and not is_transmitting:
348-
async with self.areceive_condition:
349-
self.is_received = True
350-
self.data = data
351-
if self.timestamp is not None:
352-
self.period = timestamp - self.timestamp
353-
self.timestamp = timestamp
354-
self.areceive_condition.notify_all()
355-
for callback in self.callbacks:
356-
res = callback(self)
357-
if res is not None and asyncio.iscoroutine(res):
358-
await res
341+
# Call all registered callbacks
342+
call_callbacks(self.callbacks, self.pdo_node.network.loop, self)
359343

360344
def add_callback(self, callback: Callable[[PdoMap], None]) -> None:
361345
"""Add a callback which will be called on receive.
@@ -570,10 +554,7 @@ def subscribe(self) -> None:
570554
"""
571555
if self.enabled:
572556
logger.info("Subscribing to enabled PDO 0x%X on the network", self.cob_id)
573-
if self.pdo_node.network.is_async():
574-
self.pdo_node.network.subscribe(self.cob_id, self.aon_message)
575-
else:
576-
self.pdo_node.network.subscribe(self.cob_id, self.on_message)
557+
self.pdo_node.network.subscribe(self.cob_id, self.on_message)
577558

578559
def clear(self) -> None:
579560
"""Clear all variables from this map."""
@@ -681,14 +662,7 @@ async def await_for_reception(self, timeout: float = 10) -> float:
681662
:param float timeout: Max time to wait in seconds.
682663
:return: Timestamp of message received or None if timeout.
683664
"""
684-
async with self.areceive_condition:
685-
self.is_received = False
686-
try:
687-
await asyncio.wait_for(self.areceive_condition.wait(), timeout=timeout)
688-
# FIXME: Can we assume that self.is_received it set here?
689-
return self.timestamp
690-
except asyncio.TimeoutError:
691-
return None
665+
await asyncio.to_thread(self.wait_for_reception, timeout)
692666

693667

694668
class PdoVariable(variable.Variable):

canopen/sdo/client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def __init__(self, rx_cobid, tx_cobid, od):
4848

4949
# @callback # NOTE: called from another thread
5050
def on_response(self, can_id, data, timestamp):
51-
self.responses.put_nowait(bytes(data))
51+
self.responses.put(bytes(data))
5252

5353
@ensure_not_async # NOTE: Safeguard for accidental async use
5454
def send_request(self, request):

canopen/utils.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
"""Additional utility functions for canopen."""
22

3-
from typing import Optional, Union
3+
import asyncio
4+
from typing import Optional, Union, Iterable, Callable
45

56

67
def pretty_index(index: Optional[Union[int, str]],
@@ -21,3 +22,24 @@ def pretty_index(index: Optional[Union[int, str]],
2122
sub_str = f"{sub!r}"
2223

2324
return ":".join(s for s in (index_str, sub_str) if s)
25+
26+
27+
def call_callbacks(callbacks: Iterable[Callable], loop: asyncio.AbstractEventLoop | None = None, *args, **kwargs) -> bool:
28+
"""Call a list of callbacks with the given arguments.
29+
30+
"""
31+
32+
def dispatch():
33+
for callback in callbacks:
34+
result = callback(*args, **kwargs)
35+
if result is not None and asyncio.iscoroutine(result):
36+
asyncio.create_task(result)
37+
38+
# If the loop is running, call the callbacks from the loop to minimize
39+
# blocking and multithreading issues.
40+
if loop is not None and loop.is_running():
41+
loop.call_soon_threadsafe(dispatch)
42+
return False
43+
else:
44+
dispatch()
45+
return True

0 commit comments

Comments
 (0)