|
1 | 1 | # inspired by the NmtMaster code
|
2 | 2 | import logging
|
3 | 3 | import time
|
| 4 | +from typing import Dict |
4 | 5 |
|
5 | 6 | from canopen.node import RemoteNode
|
| 7 | +from canopen.pdo import PdoMap |
6 | 8 | from canopen.sdo import SdoCommunicationError
|
7 | 9 |
|
8 | 10 | logger = logging.getLogger(__name__)
|
@@ -212,8 +214,8 @@ class BaseNode402(RemoteNode):
|
212 | 214 | def __init__(self, node_id, object_dictionary):
|
213 | 215 | super(BaseNode402, self).__init__(node_id, object_dictionary)
|
214 | 216 | self.tpdo_values = {} # { index: value from last received TPDO }
|
215 |
| - self.tpdo_pointers = {} # { index: pdo.PdoMap instance } |
216 |
| - self.rpdo_pointers = {} # { index: pdo.PdoMap instance } |
| 217 | + self.tpdo_pointers: Dict[int, PdoMap] = {} |
| 218 | + self.rpdo_pointers: Dict[int, PdoMap] = {} |
217 | 219 |
|
218 | 220 | def setup_402_state_machine(self, read_pdos=True):
|
219 | 221 | """Configure the state machine by searching for a TPDO that has the StatusWord mapped.
|
@@ -453,11 +455,10 @@ def is_op_mode_supported(self, mode):
|
453 | 455 | bits = OperationMode.SUPPORTED[mode]
|
454 | 456 | return self._op_mode_support & bits == bits
|
455 | 457 |
|
456 |
| - def on_TPDOs_update_callback(self, mapobject): |
| 458 | + def on_TPDOs_update_callback(self, mapobject: PdoMap): |
457 | 459 | """Cache updated values from a TPDO received from this node.
|
458 | 460 |
|
459 | 461 | :param mapobject: The received PDO message.
|
460 |
| - :type mapobject: canopen.pdo.PdoMap |
461 | 462 | """
|
462 | 463 | for obj in mapobject:
|
463 | 464 | self.tpdo_values[obj.index] = obj.raw
|
|
0 commit comments