Skip to content

Commit 5547413

Browse files
zariiii9003pkess
andauthored
Add CAN bridge script (#1961)
* Add basic implementation for can bridge * Implement basic configuration parsing * Add implementation for bridge * Improve exit handling * Add debug logging * Add error handling for wrong arguments * Use stderr * Add custom usage * Add bus configuration info * Code format * Add exception for prints for can_bridge * Add from to exception in exception * Remove assignment to unused variable * Shorten line length * Organize imports * Remove unnecessary else * Add documentation for new script * Add handling for -h and help sub command Necessary for generation of documentation * Add from none to exception * Fix typo busses to bus * Add type annotations * Fix type annotations * Fix type annotations again * Add --help to get help * Add basic print help test * Add basic test file for bridge script * Add very basic test * Add different channels for virtual bus * Add assert for call to exit * Patch correct function * test * fjkdf * once again -.- * Try snakecase * use new api to create cli args for bus1 and bus2 --------- Co-authored-by: Peter Kessen <[email protected]>
1 parent 7bc904e commit 5547413

File tree

5 files changed

+217
-0
lines changed

5 files changed

+217
-0
lines changed

can/bridge.py

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
"""
2+
Creates a bridge between two CAN buses.
3+
4+
This will connect to two CAN buses. Messages received on one
5+
bus will be sent to the other bus and vice versa.
6+
"""
7+
8+
import argparse
9+
import errno
10+
import sys
11+
import time
12+
from datetime import datetime
13+
from typing import Final
14+
15+
from can.cli import add_bus_arguments, create_bus_from_namespace
16+
from can.listener import RedirectReader
17+
from can.notifier import Notifier
18+
19+
BRIDGE_DESCRIPTION: Final = """\
20+
Bridge two CAN buses.
21+
22+
Both can buses will be connected so that messages from bus1 will be sent on
23+
bus2 and messages from bus2 will be sent to bus1.
24+
"""
25+
BUS_1_PREFIX: Final = "bus1"
26+
BUS_2_PREFIX: Final = "bus2"
27+
28+
29+
def _parse_bridge_args(args: list[str]) -> argparse.Namespace:
30+
"""Parse command line arguments for bridge script."""
31+
32+
parser = argparse.ArgumentParser(description=BRIDGE_DESCRIPTION)
33+
add_bus_arguments(parser, prefix=BUS_1_PREFIX, group_title="Bus 1 arguments")
34+
add_bus_arguments(parser, prefix=BUS_2_PREFIX, group_title="Bus 2 arguments")
35+
36+
# print help message when no arguments were given
37+
if not args:
38+
parser.print_help(sys.stderr)
39+
raise SystemExit(errno.EINVAL)
40+
41+
results, _unknown_args = parser.parse_known_args(args)
42+
return results
43+
44+
45+
def main() -> None:
46+
results = _parse_bridge_args(sys.argv[1:])
47+
48+
with (
49+
create_bus_from_namespace(results, prefix=BUS_1_PREFIX) as bus1,
50+
create_bus_from_namespace(results, prefix=BUS_2_PREFIX) as bus2,
51+
):
52+
reader1_to_2 = RedirectReader(bus2)
53+
reader2_to_1 = RedirectReader(bus1)
54+
with Notifier(bus1, [reader1_to_2]), Notifier(bus2, [reader2_to_1]):
55+
print(f"CAN Bridge (Started on {datetime.now()})")
56+
try:
57+
while True:
58+
time.sleep(1)
59+
except KeyboardInterrupt:
60+
pass
61+
62+
print(f"CAN Bridge (Stopped on {datetime.now()})")
63+
64+
65+
if __name__ == "__main__":
66+
main()

doc/scripts.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,15 @@ The full usage page can be seen below:
5757
:shell:
5858

5959

60+
can.bridge
61+
----------
62+
63+
A small application that can be used to connect two can buses:
64+
65+
.. command-output:: python -m can.bridge -h
66+
:shell:
67+
68+
6069
can.logconvert
6170
--------------
6271

pyproject.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ can_logconvert = "can.logconvert:main"
4949
can_logger = "can.logger:main"
5050
can_player = "can.player:main"
5151
can_viewer = "can.viewer:main"
52+
can_bridge = "can.bridge:main"
5253

5354
[project.urls]
5455
homepage = "https://github.com/hardbyte/python-can"
@@ -186,6 +187,7 @@ ignore = [
186187
"can/cli.py" = ["T20"] # flake8-print
187188
"can/logger.py" = ["T20"] # flake8-print
188189
"can/player.py" = ["T20"] # flake8-print
190+
"can/bridge.py" = ["T20"] # flake8-print
189191
"can/viewer.py" = ["T20"] # flake8-print
190192
"examples/*" = ["T20"] # flake8-print
191193

test/test_bridge.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#!/usr/bin/env python
2+
3+
"""
4+
This module tests the functions inside of bridge.py
5+
"""
6+
7+
import random
8+
import string
9+
import sys
10+
import threading
11+
import time
12+
from time import sleep as real_sleep
13+
import unittest.mock
14+
15+
import can
16+
import can.bridge
17+
from can.interfaces import virtual
18+
19+
from .message_helper import ComparingMessagesTestCase
20+
21+
22+
class TestBridgeScriptModule(unittest.TestCase, ComparingMessagesTestCase):
23+
24+
TIMEOUT = 3.0
25+
26+
def __init__(self, *args, **kwargs):
27+
unittest.TestCase.__init__(self, *args, **kwargs)
28+
ComparingMessagesTestCase.__init__(
29+
self,
30+
allowed_timestamp_delta=None,
31+
preserves_channel=False,
32+
)
33+
34+
def setUp(self) -> None:
35+
self.stop_event = threading.Event()
36+
37+
self.channel1 = "".join(random.choices(string.ascii_letters, k=8))
38+
self.channel2 = "".join(random.choices(string.ascii_letters, k=8))
39+
40+
self.cli_args = [
41+
"--bus1-interface",
42+
"virtual",
43+
"--bus1-channel",
44+
self.channel1,
45+
"--bus2-interface",
46+
"virtual",
47+
"--bus2-channel",
48+
self.channel2,
49+
]
50+
51+
self.testmsg = can.Message(
52+
arbitration_id=0xC0FFEE, data=[0, 25, 0, 1, 3, 1, 4, 1], is_extended_id=True
53+
)
54+
55+
def fake_sleep(self, duration):
56+
"""A fake replacement for time.sleep that checks periodically
57+
whether self.stop_event is set, and raises KeyboardInterrupt
58+
if so.
59+
60+
This allows tests to simulate an interrupt (like Ctrl+C)
61+
during long sleeps, in a controlled and responsive way.
62+
"""
63+
interval = 0.05 # Small interval for responsiveness
64+
t_wakeup = time.perf_counter() + duration
65+
while time.perf_counter() < t_wakeup:
66+
if self.stop_event.is_set():
67+
raise KeyboardInterrupt("Simulated interrupt from fake_sleep")
68+
real_sleep(interval)
69+
70+
def test_bridge(self):
71+
with (
72+
unittest.mock.patch("can.bridge.time.sleep", new=self.fake_sleep),
73+
unittest.mock.patch("can.bridge.sys.argv", [sys.argv[0], *self.cli_args]),
74+
):
75+
# start script
76+
thread = threading.Thread(target=can.bridge.main)
77+
thread.start()
78+
79+
# wait until script instantiates virtual buses
80+
t0 = time.perf_counter()
81+
while True:
82+
with virtual.channels_lock:
83+
if (
84+
self.channel1 in virtual.channels
85+
and self.channel2 in virtual.channels
86+
):
87+
break
88+
if time.perf_counter() > t0 + 2.0:
89+
raise TimeoutError("Bridge script did not create virtual buses")
90+
real_sleep(0.2)
91+
92+
# create buses with the same channels as in scripts
93+
with (
94+
can.interfaces.virtual.VirtualBus(self.channel1) as bus1,
95+
can.interfaces.virtual.VirtualBus(self.channel2) as bus2,
96+
):
97+
# send test message to bus1, it should be received on bus2
98+
bus1.send(self.testmsg)
99+
recv_msg = bus2.recv(self.TIMEOUT)
100+
self.assertMessageEqual(self.testmsg, recv_msg)
101+
102+
# assert that both buses are empty
103+
self.assertIsNone(bus1.recv(0))
104+
self.assertIsNone(bus2.recv(0))
105+
106+
# send test message to bus2, it should be received on bus1
107+
bus2.send(self.testmsg)
108+
recv_msg = bus1.recv(self.TIMEOUT)
109+
self.assertMessageEqual(self.testmsg, recv_msg)
110+
111+
# assert that both buses are empty
112+
self.assertIsNone(bus1.recv(0))
113+
self.assertIsNone(bus2.recv(0))
114+
115+
# stop the bridge script
116+
self.stop_event.set()
117+
thread.join()
118+
119+
# assert that the virtual buses were closed
120+
with virtual.channels_lock:
121+
self.assertNotIn(self.channel1, virtual.channels)
122+
self.assertNotIn(self.channel2, virtual.channels)
123+
124+
125+
if __name__ == "__main__":
126+
unittest.main()

test/test_scripts.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,20 @@ def _import(self):
9898
return module
9999

100100

101+
class TestBridgeScript(CanScriptTest):
102+
def _commands(self):
103+
commands = [
104+
"python -m can.bridge --help",
105+
"can_bridge --help",
106+
]
107+
return commands
108+
109+
def _import(self):
110+
import can.bridge as module
111+
112+
return module
113+
114+
101115
class TestLogconvertScript(CanScriptTest):
102116
def _commands(self):
103117
commands = [

0 commit comments

Comments
 (0)