Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions canopen/network.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ def unsubscribe(self, can_id, callback=None) -> None:
del self.subscribers[can_id]
else:
self.subscribers[can_id].remove(callback)
# Remove empty list entry
if not self.subscribers[can_id]:
del self.subscribers[can_id]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant something like this instead, with a single del code-path. I find that easier to reason about:

        if callback is not None:
            self.subscribers[can_id].remove(callback)
        if not self.subscribers[can_id] or callback is None:
            del self.subscribers[can_id]

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, two lines shorter.


def connect(self, *args, **kwargs) -> Network:
"""Connect to CAN bus using python-can.
Expand Down
4 changes: 4 additions & 0 deletions canopen/node/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ def __init__(
self.emcy = EmcyProducer(0x80 + self.id)

def associate_network(self, network: canopen.network.Network):
if self.has_network():
raise RuntimeError("Node is already associated with a network")
self.network = network
self.sdo.network = network
self.tpdo.network = network
Expand All @@ -49,6 +51,8 @@ def associate_network(self, network: canopen.network.Network):
network.subscribe(0, self.nmt.on_command)

def remove_network(self) -> None:
if not self.has_network():
return
self.network.unsubscribe(self.sdo.rx_cobid, self.sdo.on_request)
self.network.unsubscribe(0, self.nmt.on_command)
self.network = canopen.network._UNINITIALIZED_NETWORK
Expand Down
4 changes: 4 additions & 0 deletions canopen/node/remote.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ def __init__(
self.load_configuration()

def associate_network(self, network: canopen.network.Network):
if self.has_network():
raise RuntimeError("Node is already associated with a network")
self.network = network
self.sdo.network = network
self.pdo.network = network
Expand All @@ -64,6 +66,8 @@ def associate_network(self, network: canopen.network.Network):
network.subscribe(0, self.nmt.on_command)

def remove_network(self) -> None:
if not self.has_network():
return
for sdo in self.sdo_channels:
self.network.unsubscribe(sdo.tx_cobid, sdo.on_response)
self.network.unsubscribe(0x700 + self.id, self.nmt.on_heartbeat)
Expand Down
113 changes: 113 additions & 0 deletions test/test_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
"""Unit tests for the RemoteNode and LocalNode classes."""
Copy link
Copy Markdown
Member

@acolomb acolomb Apr 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not really fond of having comments just for the sake of having a comment. None of our test files so far have a module-level docstring. There is no generated API documentation for them. And import unittest says just as much as this comment, while the enumeration of classes is redundant and bound to get out of sync sometime.

In essence, don't feel obliged to write docstrings that add no value. Or try to come up with a really good one that does so.

What IS the common denominator for stuff that will be grouped in this test file? So far it is only network association, what else do you have in mind?

Suggested change
"""Unit tests for the RemoteNode and LocalNode classes."""

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What IS the common denominator for stuff that will be grouped in this test file? So far it is only network association, what else do you have in mind?

Eventually we'll have more unit tests of the methods in these two classes, no?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, that's why I'm asking if you have any concrete ideas what will be added?

import unittest

import canopen
import canopen.network
import canopen.objectdictionary
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These seem to be not needed.

Suggested change
import canopen.network
import canopen.objectdictionary

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are. Some linters, like pyright, is complaining when they are lacking. The reason is that they are not referenced in the __init__.py exports, so they need to be explicitly imported.

image

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand. How come the code works without them though?



def count_subscribers(network: canopen.Network) -> int:
"""Count the number of subscribers in the network."""
return sum(
len(n) for n in network.subscribers.values()
)


class TestLocalNode(unittest.TestCase):
"""Unit tests for the LocalNode class."""

@classmethod
def setUpClass(cls):
cls.network = canopen.Network()
cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
cls.network.connect(interface="virtual")

cls.node = canopen.LocalNode(2, canopen.objectdictionary.ObjectDictionary())

@classmethod
def tearDownClass(cls):
cls.network.disconnect()

def test_associate_network(self):

# Need to store the number of subscribers before associating because the
# network implementation automatically adds subscribers to the list
n_subscribers = count_subscribers(self.network)

# Associating the network with the local node
self.node.associate_network(self.network)
self.assertIs(self.node.network, self.network)
self.assertIs(self.node.sdo.network, self.network)
self.assertIs(self.node.tpdo.network, self.network)
self.assertIs(self.node.rpdo.network, self.network)
self.assertIs(self.node.nmt.network, self.network)
self.assertIs(self.node.emcy.network, self.network)

# Test that its not possible to associate the network multiple times
with self.assertRaises(RuntimeError) as cm:
self.node.associate_network(self.network)
self.assertIn("already associated with a network", str(cm.exception))

# Test removal of the network. The count of subscribers should
# be the same as before the association
self.node.remove_network()
uninitalized = canopen.network._UNINITIALIZED_NETWORK
self.assertIs(self.node.network, uninitalized)
self.assertIs(self.node.sdo.network, uninitalized)
self.assertIs(self.node.tpdo.network, uninitalized)
self.assertIs(self.node.rpdo.network, uninitalized)
self.assertIs(self.node.nmt.network, uninitalized)
self.assertIs(self.node.emcy.network, uninitalized)
self.assertEqual(count_subscribers(self.network), n_subscribers)

# Test that its possible to deassociate the network multiple times
self.node.remove_network()


class TestRemoteNode(unittest.TestCase):
"""Unittests for the RemoteNode class."""

@classmethod
def setUpClass(cls):
cls.network = canopen.Network()
cls.network.NOTIFIER_SHUTDOWN_TIMEOUT = 0.0
cls.network.connect(interface="virtual")

cls.node = canopen.RemoteNode(2, canopen.objectdictionary.ObjectDictionary())

@classmethod
def tearDownClass(cls):
cls.network.disconnect()

def test_associate_network(self):

# Need to store the number of subscribers before associating because the
# network implementation automatically adds subscribers to the list
n_subscribers = count_subscribers(self.network)

# Associating the network with the local node
self.node.associate_network(self.network)
self.assertIs(self.node.network, self.network)
self.assertIs(self.node.sdo.network, self.network)
self.assertIs(self.node.tpdo.network, self.network)
self.assertIs(self.node.rpdo.network, self.network)
self.assertIs(self.node.nmt.network, self.network)

# Test that its not possible to associate the network multiple times
with self.assertRaises(RuntimeError) as cm:
self.node.associate_network(self.network)
self.assertIn("already associated with a network", str(cm.exception))

# Test removal of the network. The count of subscribers should
# be the same as before the association
self.node.remove_network()
uninitalized = canopen.network._UNINITIALIZED_NETWORK
self.assertIs(self.node.network, uninitalized)
self.assertIs(self.node.sdo.network, uninitalized)
self.assertIs(self.node.tpdo.network, uninitalized)
self.assertIs(self.node.rpdo.network, uninitalized)
self.assertIs(self.node.nmt.network, uninitalized)
self.assertEqual(count_subscribers(self.network), n_subscribers)

# Test that its possible to deassociate the network multiple times
self.node.remove_network()