|
7 | 7 |
|
8 | 8 | import argparse |
9 | 9 | import errno |
10 | | -import logging |
11 | 10 | import sys |
12 | 11 | import time |
13 | 12 | from datetime import datetime |
14 | | -from typing import ( |
15 | | - Iterator, |
16 | | - List, |
17 | | - Tuple, |
18 | | -) |
| 13 | +from typing import Final |
19 | 14 |
|
20 | | -import can |
21 | | - |
22 | | -from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config |
23 | | - |
24 | | -USAGE = """ |
25 | | -usage: can_bridge [{general config} --] {can A config} -- {can B config} |
| 15 | +from can.cli import add_bus_arguments, create_bus_from_namespace |
| 16 | +from can.listener import RedirectReader |
| 17 | +from can.notifier import Notifier |
26 | 18 |
|
| 19 | +BRIDGE_DESCRIPTION: Final = """\ |
27 | 20 | Bridge two CAN buses. |
28 | 21 |
|
29 | | -Both can buses will be connected so that messages from bus A will be sent on |
30 | | -bus B and messages on bus B will be sent to bus A. The buses are separated by a `--` |
31 | | -
|
32 | | -positional arguments: |
33 | | - {general config} The configuration for this program excluding |
34 | | - the config for each bus. Can be omitted |
35 | | - {can A config} The configuration for the first bus |
36 | | - {can B config} The configuration for the second bus |
37 | | -
|
38 | | -Example usage: |
39 | | - can_bridge -i socketcan -c can0 -- -i socketcan can1 |
40 | | - can_bridge -vvv -- -i socketcan -c can0 -- -i socketcan can1 |
41 | | -
|
42 | | -Type `can_bridge help_bus` for information about single bus configuration. |
| 22 | +Both can buses will be connected so that messages from bus1 will be sent on |
| 23 | +bus2 and messages from bus2 will be sent to bus1. |
43 | 24 | """ |
44 | | - |
45 | | -LOG = logging.getLogger(__name__) |
46 | | - |
47 | | - |
48 | | -class UserError(Exception): |
49 | | - pass |
50 | | - |
51 | | - |
52 | | -def get_config_list(it: Iterator[str], separator: str, conf: list) -> None: |
53 | | - while True: |
54 | | - el = next(it) |
55 | | - if el == separator: |
56 | | - break |
57 | | - |
58 | | - conf.append(el) |
| 25 | +BUS_1_PREFIX: Final = "bus1" |
| 26 | +BUS_2_PREFIX: Final = "bus2" |
59 | 27 |
|
60 | 28 |
|
61 | | -def split_configurations( |
62 | | - arg_list: List[str], separator: str = "--" |
63 | | -) -> Tuple[list, list, list]: |
64 | | - general = [] |
65 | | - conf_a: List[str] = [] |
66 | | - conf_b: List[str] = [] |
| 29 | +def _parse_bridge_args(args: list[str]) -> argparse.Namespace: |
| 30 | + """Parse command line arguments for bridge script.""" |
67 | 31 |
|
68 | | - found_sep = False |
69 | | - it = iter(arg_list) |
70 | | - try: |
71 | | - get_config_list(it, separator, conf_a) |
72 | | - found_sep = True |
73 | | - get_config_list(it, separator, conf_b) |
| 32 | + parser = argparse.ArgumentParser(description=BRIDGE_DESCRIPTION) |
| 33 | + add_bus_arguments(parser, prefix=BUS_1_PREFIX, group_title="Bus 1 arguments") |
| 34 | + add_bus_arguments(parser, prefix=BUS_2_PREFIX, group_title="Bus 2 arguments") |
74 | 35 |
|
75 | | - # When we reached this point we found two separators so we have |
76 | | - # a general config. We will treate the first config as general |
77 | | - general = conf_a |
78 | | - conf_a = conf_b |
79 | | - get_config_list(it, separator, conf_b) |
80 | | - |
81 | | - # When we reached this point we found three separators so this is |
82 | | - # an error. |
83 | | - raise UserError("To many configurations") |
84 | | - except StopIteration: |
85 | | - LOG.debug("All configurations were split") |
86 | | - if not found_sep: |
87 | | - raise UserError("Missing separator") from None |
| 36 | + # print help message when no arguments were given |
| 37 | + if not args: |
| 38 | + parser.print_help(sys.stderr) |
| 39 | + raise SystemExit(errno.EINVAL) |
88 | 40 |
|
89 | | - return general, conf_a, conf_b |
| 41 | + results, _unknown_args = parser.parse_known_args(args) |
| 42 | + return results |
90 | 43 |
|
91 | 44 |
|
92 | 45 | def main() -> None: |
93 | | - general_parser = argparse.ArgumentParser() |
94 | | - general_parser.add_argument( |
95 | | - "-v", |
96 | | - action="count", |
97 | | - dest="verbosity", |
98 | | - help="""How much information do you want to see at the command line? |
99 | | - You can add several of these e.g., -vv is DEBUG""", |
100 | | - default=2, |
101 | | - ) |
102 | | - |
103 | | - bus_parser = argparse.ArgumentParser(description="Bridge two CAN buses.") |
104 | | - |
105 | | - _create_base_argument_parser(bus_parser) |
106 | | - |
107 | | - parser = argparse.ArgumentParser(description="Bridge two CAN buses.") |
108 | | - parser.add_argument("configs", nargs=argparse.REMAINDER) |
109 | | - |
110 | | - # print help message when no arguments were given |
111 | | - if len(sys.argv) < 2: |
112 | | - print(USAGE, file=sys.stderr) |
113 | | - raise SystemExit(errno.EINVAL) |
114 | | - |
115 | | - args = sys.argv[1:] |
116 | | - try: |
117 | | - general, conf_a, conf_b = split_configurations(args) |
118 | | - except UserError as exc: |
119 | | - if len(args) >= 1: |
120 | | - if args[0] == "-h" or args[0] == "--help" or args[0] == "help": |
121 | | - print(USAGE) |
122 | | - raise SystemExit() from None |
123 | | - elif args[0] == "help_bus": |
124 | | - bus_parser.print_help(sys.stderr) |
125 | | - else: |
126 | | - print(f"Error while processing arguments: {exc}", file=sys.stderr) |
127 | | - raise SystemExit(errno.EINVAL) from exc |
128 | | - |
129 | | - LOG.debug("General configuration: %s", general) |
130 | | - LOG.debug("Bus A configuration: %s", conf_a) |
131 | | - LOG.debug("Bus B configuration: %s", conf_b) |
132 | | - g_results = general_parser.parse_args(general) |
133 | | - verbosity = g_results.verbosity |
134 | | - |
135 | | - a_results, a_unknown_args = bus_parser.parse_known_args(conf_a) |
136 | | - a_additional_config = _parse_additional_config( |
137 | | - [*a_results.extra_args, *a_unknown_args] |
138 | | - ) |
139 | | - a_results.__dict__["verbosity"] = verbosity |
140 | | - |
141 | | - b_results, b_unknown_args = bus_parser.parse_known_args(conf_b) |
142 | | - b_additional_config = _parse_additional_config( |
143 | | - [*b_results.extra_args, *b_unknown_args] |
144 | | - ) |
145 | | - b_results.__dict__["verbosity"] = verbosity |
146 | | - |
147 | | - LOG.debug("General configuration results: %s", g_results) |
148 | | - LOG.debug("Bus A configuration results: %s", a_results) |
149 | | - LOG.debug("Bus A additional configuration results: %s", a_additional_config) |
150 | | - LOG.debug("Bus B configuration results: %s", b_results) |
151 | | - LOG.debug("Bus B additional configuration results: %s", b_additional_config) |
152 | | - with _create_bus(a_results, **a_additional_config) as bus_a: |
153 | | - with _create_bus(b_results, **b_additional_config) as bus_b: |
154 | | - reader_a = can.RedirectReader(bus_b) |
155 | | - reader_b = can.RedirectReader(bus_a) |
156 | | - can.Notifier(bus_a, [reader_a]) |
157 | | - can.Notifier(bus_b, [reader_b]) |
158 | | - print(f"CAN Bridge (Started on {datetime.now()})") |
159 | | - try: |
160 | | - while True: |
161 | | - time.sleep(1) |
162 | | - except KeyboardInterrupt: |
163 | | - pass |
| 46 | + results = _parse_bridge_args(sys.argv[1:]) |
| 47 | + |
| 48 | + with create_bus_from_namespace(results, prefix=BUS_1_PREFIX) as bus1: |
| 49 | + with create_bus_from_namespace(results, prefix=BUS_2_PREFIX) as bus2: |
| 50 | + reader1_to_2 = RedirectReader(bus2) |
| 51 | + reader2_to_1 = RedirectReader(bus1) |
| 52 | + with Notifier(bus1, [reader1_to_2]), Notifier(bus2, [reader2_to_1]): |
| 53 | + print(f"CAN Bridge (Started on {datetime.now()})") |
| 54 | + try: |
| 55 | + while True: |
| 56 | + time.sleep(1) |
| 57 | + except KeyboardInterrupt: |
| 58 | + pass |
164 | 59 |
|
165 | 60 | print(f"CAN Bridge (Stopped on {datetime.now()})") |
166 | 61 |
|
|
0 commit comments