Skip to content

Commit afd9f5c

Browse files
committed
Added more support for async
1 parent 3b5f869 commit afd9f5c

File tree

14 files changed

+127
-83
lines changed

14 files changed

+127
-83
lines changed

canopen/emcy.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,15 +23,15 @@ def __init__(self):
2323
#: Only active EMCYs. Will be cleared on Error Reset
2424
self.active: List["EmcyError"] = []
2525
self.callbacks = []
26-
self.emcy_received = threading.Condition() # FIXME Async
26+
self.emcy_received = threading.Condition()
2727
self.aemcy_received = asyncio.Condition()
2828

2929
def on_emcy(self, can_id, data, timestamp):
30-
# NOTE: Callback. Will be called from another thread
30+
# NOTE: Callback. Called from another thread unless async
3131
code, register, data = EMCY_STRUCT.unpack(data)
3232
entry = EmcyError(code, register, data, timestamp)
3333

34-
with self.emcy_received: # FIXME: Blocking
34+
with self.emcy_received: # NOTE: Blocking call
3535
if code & 0xFF00 == 0:
3636
# Error reset
3737
self.active = []
@@ -40,9 +40,8 @@ def on_emcy(self, can_id, data, timestamp):
4040
self.log.append(entry)
4141
self.emcy_received.notify_all()
4242

43-
# NOTE: Will be called from another thread
4443
for callback in self.callbacks:
45-
callback(entry) # FIXME: Assert if coroutine?
44+
callback(entry) # FIXME: Assert if callback is coroutine?
4645

4746
async def aon_emcy(self, can_id, data, timestamp):
4847
code, register, data = EMCY_STRUCT.unpack(data)
@@ -89,9 +88,9 @@ def wait(
8988
"""
9089
end_time = time.time() + timeout
9190
while True:
92-
with self.emcy_received: # FIXME: Blocking
91+
with self.emcy_received: # NOTE: Blocking call
9392
prev_log_size = len(self.log)
94-
self.emcy_received.wait(timeout) # FIXME: Blocking
93+
self.emcy_received.wait(timeout) # NOTE: Blocking call
9594
if len(self.log) == prev_log_size:
9695
# Resumed due to timeout
9796
return None

canopen/lss.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def __init__(self):
9191
self.network: Optional[Network] = None
9292
self._node_id = 0
9393
self._data = None
94-
self.responses = queue.Queue() # FIXME Async
94+
self.responses = queue.Queue()
9595
self.aresponses = asyncio.Queue()
9696

9797
def send_switch_state_global(self, mode):
@@ -264,7 +264,7 @@ def fast_scan(self):
264264
lss_next = 0
265265

266266
if self.__send_fast_scan_message(lss_id[0], lss_bit_check, lss_sub, lss_next):
267-
time.sleep(0.01)
267+
time.sleep(0.01) # NOTE: Blocking call
268268
while lss_sub < 4:
269269
lss_bit_check = 32
270270
while lss_bit_check > 0:
@@ -273,13 +273,13 @@ def fast_scan(self):
273273
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
274274
lss_id[lss_sub] |= 1<<lss_bit_check
275275

276-
time.sleep(0.01)
276+
time.sleep(0.01) # NOTE: Blocking call
277277

278278
lss_next = (lss_sub + 1) & 3
279279
if not self.__send_fast_scan_message(lss_id[lss_sub], lss_bit_check, lss_sub, lss_next):
280280
return False, None
281281

282-
time.sleep(0.01)
282+
time.sleep(0.01) # NOTE: Blocking
283283

284284
# Now the next 32 bits will be scanned
285285
lss_sub += 1
@@ -311,7 +311,7 @@ def __send_lss_address(self, req_cs, number):
311311
response = self.__send_command(message)
312312
# some device needs these delays between messages
313313
# because it can't handle messages arriving with no delay
314-
time.sleep(0.2)
314+
time.sleep(0.2) # NOTE: Blocking call
315315

316316
return response
317317

@@ -385,7 +385,7 @@ def __send_command(self, message):
385385
response = None
386386
if not self.responses.empty():
387387
logger.info("There were unexpected messages in the queue")
388-
self.responses = queue.Queue() # FIXME Async
388+
self.responses = queue.Queue() # FIXME: Recreating the queue. Async too?
389389

390390
self.network.send_message(self.LSS_TX_COBID, message)
391391

@@ -395,16 +395,16 @@ def __send_command(self, message):
395395
# Wait for the slave to respond
396396
# TODO check if the response is LSS response message
397397
try:
398-
response = self.responses.get( # FIXME: Blocking
398+
response = self.responses.get( # NOTE: Blocking call
399399
block=True, timeout=self.RESPONSE_TIMEOUT)
400400
except queue.Empty:
401401
raise LssError("No LSS response received")
402402

403403
return response
404404

405405
def on_message_received(self, can_id, data, timestamp):
406-
# NOTE: Callback. Will be called from another thread
407-
self.responses.put(bytes(data)) # FIXME: Blocking
406+
# NOTE: Callback. Called from another thread
407+
self.responses.put(bytes(data)) # NOTE: Blocking call
408408

409409
async def aon_message_received(self, can_id, data, timestamp):
410410
await self.aresponses.put(bytes(data))

canopen/network.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def __init__(
5959
self.notifier: Optional[Notifier] = None
6060
self.nodes: Dict[int, Union[RemoteNode, LocalNode]] = {}
6161
self.subscribers: Dict[int, List[Callback]] = {}
62-
self.send_lock = threading.Lock() # FIXME Async
62+
self.send_lock = threading.Lock()
6363
self.sync = SyncProducer(self)
6464
self.time = TimeProducer(self)
6565
self.nmt = NmtMaster(0)
@@ -232,9 +232,9 @@ def send_message(self, can_id: int, data: bytes, remote: bool = False) -> None:
232232
arbitration_id=can_id,
233233
data=data,
234234
is_remote_frame=remote)
235-
# NOTE: This lock is ok for async, because ther is only one thread
236-
# calling this function when using async, so it'll never lock.
237-
with self.send_lock: # FIXME: Blocking
235+
# NOTE: Blocking lock. This is probably ok for async, because async
236+
# only use one thread.
237+
with self.send_lock:
238238
self.bus.send(msg)
239239
self.check()
240240

@@ -270,7 +270,7 @@ def notify(self, can_id: int, data: bytearray, timestamp: float) -> None:
270270
:param timestamp:
271271
Timestamp of the message, preferably as a Unix timestamp
272272
"""
273-
# NOTE: Callback. Will be called from another thread
273+
# NOTE: Callback. Called from another thread unless async
274274
callbacks = self.subscribers.get(can_id)
275275
if callbacks is not None:
276276
for callback in callbacks:
@@ -291,6 +291,10 @@ def check(self) -> None:
291291
logger.error("An error has caused receiving of messages to stop")
292292
raise exc
293293

294+
def is_async(self) -> bool:
295+
"""Check if canopen has been connected with async"""
296+
return self.loop is not None
297+
294298
def __getitem__(self, node_id: int) -> Union[RemoteNode, LocalNode]:
295299
return self.nodes[node_id]
296300

@@ -355,6 +359,8 @@ def update(self, data: bytes) -> None:
355359
:param data:
356360
New data to transmit
357361
"""
362+
# NOTE: Called from callback, which is another thread on non-async use.
363+
# Make sure this is thread-safe.
358364
new_data = bytearray(data)
359365
old_data = self.msg.data
360366
self.msg.data = new_data
@@ -377,7 +383,7 @@ def __init__(self, network: Network):
377383
self.network = network
378384

379385
def on_message_received(self, msg):
380-
# NOTE: Callback. Will be called from another thread
386+
# NOTE: Callback. Called from another thread unless async
381387
if msg.is_error_frame or msg.is_remote_frame:
382388
return
383389

@@ -412,11 +418,14 @@ def __init__(self, network: Optional[Network] = None):
412418
self.nodes: List[int] = []
413419

414420
def on_message_received(self, can_id: int):
415-
# NOTE: Callback. Will be called from another thread
421+
# NOTE: Callback. Called from another thread unless async
416422
service = can_id & 0x780
417423
node_id = can_id & 0x7F
418424
if node_id not in self.nodes and node_id != 0 and service in self.SERVICES:
419-
# NOTE: Assume this is thread-safe
425+
# NOTE: In the current CPython implementation append on lists are
426+
# atomic which makes this thread-safe. However, other py
427+
# interpreters might not. It should be considered if a better
428+
# mechanism is needed to protect against race.
420429
self.nodes.append(node_id)
421430

422431
def reset(self):

canopen/nmt.py

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ def __init__(self, node_id: int):
5757
self._state = 0
5858

5959
def on_command(self, can_id, data, timestamp):
60-
# NOTE: Callback. Will be called from another thread
60+
# NOTE: Callback. Called from another thread unless async
6161
cmd, node_id = struct.unpack_from("BB", data)
6262
if node_id in (self.id, 0):
6363
logger.info("Node %d received command %d", self.id, cmd)
@@ -66,7 +66,7 @@ def on_command(self, can_id, data, timestamp):
6666
if new_state != self._state:
6767
logger.info("New NMT state %s, old state %s",
6868
NMT_STATES[new_state], NMT_STATES[self._state])
69-
# NOTE: Assume thread-safe
69+
# FIXME: Is this thread-safe?
7070
self._state = new_state
7171

7272
def send_command(self, code: int):
@@ -125,20 +125,20 @@ def __init__(self, node_id: int):
125125
self._node_guarding_producer = None
126126
#: Timestamp of last heartbeat message
127127
self.timestamp: Optional[float] = None
128-
self.state_update = threading.Condition() # FIXME
128+
self.state_update = threading.Condition()
129129
self.astate_update = asyncio.Condition()
130130
self._callbacks = []
131131

132132
def on_heartbeat(self, can_id, data, timestamp):
133-
# NOTE: Callback. Will be called from another thread
134-
with self.state_update: # FIXME: Blocking
133+
# NOTE: Callback. Called from another thread unless async
134+
with self.state_update: # NOTE: Blocking call
135135
self.timestamp = timestamp
136136
new_state, = struct.unpack_from("B", data)
137137
# Mask out toggle bit
138138
new_state &= 0x7F
139139
logger.debug("Received heartbeat can-id %d, state is %d", can_id, new_state)
140140
for callback in self._callbacks:
141-
callback(new_state) # FIXME: Assert on coroutines?
141+
callback(new_state) # FIXME: Assert if callback is coroutine?
142142
if new_state == 0:
143143
# Boot-up, will go to PRE-OPERATIONAL automatically
144144
self._state = 127
@@ -179,9 +179,9 @@ def send_command(self, code: int):
179179

180180
def wait_for_heartbeat(self, timeout: float = 10):
181181
"""Wait until a heartbeat message is received."""
182-
with self.state_update: # FIXME: Blocking
182+
with self.state_update: # NOTE: Blocking call
183183
self._state_received = None
184-
self.state_update.wait(timeout) # FIXME: Blocking
184+
self.state_update.wait(timeout) # NOTE: Blocking call
185185
if self._state_received is None:
186186
raise NmtError("No boot-up or heartbeat received")
187187
return self.state
@@ -191,9 +191,9 @@ def wait_for_bootup(self, timeout: float = 10) -> None:
191191
end_time = time.time() + timeout
192192
while True:
193193
now = time.time()
194-
with self.state_update: # FIXME: Blocking
194+
with self.state_update: # NOTE: Blocking call
195195
self._state_received = None
196-
self.state_update.wait(end_time - now + 0.1) # FIXME: Blocking
196+
self.state_update.wait(end_time - now + 0.1) # NOTE: Blocking call
197197
if now > end_time:
198198
raise NmtError("Timeout waiting for boot-up message")
199199
if self._state_received == 0:
@@ -235,7 +235,7 @@ def __init__(self, node_id: int, local_node):
235235
self._local_node = local_node
236236

237237
def on_command(self, can_id, data, timestamp):
238-
# NOTE: Callback. Will be called from another thread
238+
# NOTE: Callback. Called from another thread unless async
239239
super(NmtSlave, self).on_command(can_id, data, timestamp)
240240
self.update_heartbeat()
241241

@@ -291,9 +291,9 @@ def stop_heartbeat(self):
291291
self._send_task = None
292292

293293
def update_heartbeat(self):
294-
# NOTE: Called from callback. Might be called from another thread
294+
# NOTE: Called from callback. Called from another thread unless async
295295
if self._send_task is not None:
296-
# FIXME: Check if network.PeriodicMessageTask() is thread-safe
296+
# FIXME: Make this thread-safe
297297
self._send_task.update([self._state])
298298

299299

canopen/node/local.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,21 @@ def associate_network(self, network: Network):
4444
self.rpdo.network = network
4545
self.nmt.network = network
4646
self.emcy.network = network
47-
network.subscribe(self.sdo.rx_cobid, self.sdo.on_request) # FIXME: Async CB
48-
network.subscribe(0, self.nmt.on_command) # FIXME: Async CB
47+
if network.is_async():
48+
network.subscribe(self.sdo.rx_cobid, self.sdo.aon_request)
49+
network.subscribe(0, self.nmt.aon_command)
50+
else:
51+
network.subscribe(self.sdo.rx_cobid, self.sdo.on_request)
52+
network.subscribe(0, self.nmt.on_command)
4953

5054
def remove_network(self):
51-
self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request) # FIXME: Async CB
52-
self.network.unsubscribe(0, self.nmt.on_command) # FIXME: Async CB
55+
network = self.network
56+
if network.is_async():
57+
network.unsubscribe(self.sdo.rx_cobid, self.sdo.aon_request)
58+
network.unsubscribe(0, self.nmt.aon_command)
59+
else:
60+
network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request)
61+
network.unsubscribe(0, self.nmt.on_command)
5362
self.network = None
5463
self.sdo.network = None
5564
self.tpdo.network = None

canopen/node/remote.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ def associate_network(self, network: Network):
6161
self.tpdo.network = network
6262
self.rpdo.network = network
6363
self.nmt.network = network
64-
if network.loop:
64+
if network.is_async():
6565
for sdo in self.sdo_channels:
6666
network.subscribe(sdo.tx_cobid, sdo.aon_response)
6767
network.subscribe(0x700 + self.id, self.nmt.aon_heartbeat)
@@ -74,12 +74,18 @@ def associate_network(self, network: Network):
7474
network.subscribe(0, self.nmt.on_command)
7575

7676
def remove_network(self):
77-
# FIXME: Usubscribe async CB
78-
for sdo in self.sdo_channels:
79-
self.network.unsubscribe(sdo.tx_cobid, sdo.on_response)
80-
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
81-
self.network.unsubscribe(0x80 + self.id, self.emcy.on_emcy)
82-
self.network.unsubscribe(0, self.nmt.on_command)
77+
network = self.network
78+
if network.is_async():
79+
for sdo in self.sdo_channels:
80+
network.unsubscribe(sdo.tx_cobid, sdo.aon_response)
81+
network.unsubscribe(0x700 + self.id, self.nmt.aon_heartbeat)
82+
network.unsubscribe(0x80 + self.id, self.emcy.aon_emcy)
83+
else:
84+
for sdo in self.sdo_channels:
85+
network.unsubscribe(sdo.tx_cobid, sdo.on_response)
86+
network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
87+
network.unsubscribe(0x80 + self.id, self.emcy.on_emcy)
88+
network.unsubscribe(0, self.nmt.on_command)
8389
self.network = None
8490
self.sdo.network = None
8591
self.pdo.network = None

canopen/objectdictionary/eds.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,11 +172,12 @@ def import_from_node(node_id, network):
172172
:param int node_id: Identifier of the node
173173
:param network: network object
174174
"""
175+
# FIXME: Implement async variant
175176
# Create temporary SDO client
176177
sdo_client = SdoClient(0x600 + node_id, 0x580 + node_id, objectdictionary.ObjectDictionary())
177178
sdo_client.network = network
178179
# Subscribe to SDO responses
179-
network.subscribe(0x580 + node_id, sdo_client.on_response) # FIXME: Async CB
180+
network.subscribe(0x580 + node_id, sdo_client.on_response)
180181
# Create file like object for Store EDS variable
181182
try:
182183
eds_fp = sdo_client.open(0x1021, 0, "rt")

canopen/pdo/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -308,7 +308,7 @@ def is_periodic(self) -> bool:
308308
return False
309309

310310
def on_message(self, can_id, data, timestamp):
311-
# NOTE: Callback. Will be called from another thread
311+
# NOTE: Callback. Called from another thread unless async
312312
is_transmitting = self._task is not None
313313
if can_id == self.cob_id and not is_transmitting:
314314
with self.receive_condition: # FIXME: Blocking

canopen/profiles/p402.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -462,10 +462,10 @@ def on_TPDOs_update_callback(self, mapobject):
462462
:param mapobject: The received PDO message.
463463
:type mapobject: canopen.pdo.Map
464464
"""
465-
# NOTE: Callback. Will be called from another thread
465+
# NOTE: Callback. Called from another thread unless async
466466
for obj in mapobject:
467-
# NOTE: Assume thread-safe set without locking
468-
self.tpdo_values[obj.index] = obj.get_raw()
467+
# FIXME: Is this thread-safe?
468+
self.tpdo_values[obj.index] = obj.get_raw() # FIXME: Blocking?
469469

470470
@property
471471
def statusword(self):

0 commit comments

Comments
 (0)