Skip to content

Commit 75fdfe4

Browse files
lumagizariiii9003
andauthored
Implement BitTiming/BitTimingFD handling for PCAN Bus (#1514)
* add BitTiming parameter to PcanBus * Move valid CAN/FD clocks to pcan/basic * Add additional PCAN constructor tests * Add tests for BitTiming with PCAN constructor * Unify PCAN constructor code paths for FD with/without timing --------- Co-authored-by: zariiii9003 <[email protected]>
1 parent b16f8aa commit 75fdfe4

File tree

3 files changed

+135
-32
lines changed

3 files changed

+135
-32
lines changed

can/interfaces/pcan/basic.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,16 @@ class TPCANChannelInformation(Structure):
655655
"PCAN_LANBUS16": PCAN_LANBUS16,
656656
}
657657

658+
VALID_PCAN_CAN_CLOCKS = [8_000_000]
659+
660+
VALID_PCAN_FD_CLOCKS = [
661+
20_000_000,
662+
24_000_000,
663+
30_000_000,
664+
40_000_000,
665+
60_000_000,
666+
80_000_000,
667+
]
658668

659669
# ///////////////////////////////////////////////////////////
660670
# PCAN-Basic API function declarations

can/interfaces/pcan/pcan.py

Lines changed: 52 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,21 @@
55
import time
66
from datetime import datetime
77
import platform
8-
from typing import Optional, List, Tuple
8+
from typing import Optional, List, Tuple, Union, Any
99

1010
from packaging import version
1111

1212
from can import (
1313
BusABC,
1414
BusState,
15+
BitTiming,
16+
BitTimingFd,
17+
Message,
1518
CanError,
1619
CanOperationError,
1720
CanInitializationError,
18-
Message,
1921
)
20-
from can.util import len2dlc, dlc2len
21-
22+
from can.util import check_or_adjust_timing_clock, dlc2len, len2dlc
2223
from .basic import (
2324
PCAN_BITRATES,
2425
PCAN_FD_PARAMETER_LIST,
@@ -61,8 +62,11 @@
6162
FEATURE_FD_CAPABLE,
6263
PCAN_DICT_STATUS,
6364
PCAN_BUSOFF_AUTORESET,
65+
TPCANBaudrate,
6466
PCAN_ATTACHED_CHANNELS,
6567
TPCANChannelInformation,
68+
VALID_PCAN_FD_CLOCKS,
69+
VALID_PCAN_CAN_CLOCKS,
6670
)
6771

6872

@@ -112,12 +116,12 @@
112116
class PcanBus(BusABC):
113117
def __init__(
114118
self,
115-
channel="PCAN_USBBUS1",
116-
device_id=None,
117-
state=BusState.ACTIVE,
118-
bitrate=500000,
119-
*args,
120-
**kwargs,
119+
channel: str = "PCAN_USBBUS1",
120+
device_id: Optional[int] = None,
121+
state: BusState = BusState.ACTIVE,
122+
timing: Optional[Union[BitTiming, BitTimingFd]] = None,
123+
bitrate: int = 500000,
124+
**kwargs: Any,
121125
):
122126
"""A PCAN USB interface to CAN.
123127
@@ -142,6 +146,18 @@ def __init__(
142146
BusState of the channel.
143147
Default is ACTIVE
144148
149+
:param timing:
150+
An instance of :class:`~can.BitTiming` or :class:`~can.BitTimingFd`
151+
to specify the bit timing parameters for the PCAN interface. If this parameter
152+
is provided, it takes precedence over all other timing-related parameters.
153+
If this parameter is not provided, the bit timing parameters can be specified
154+
using the `bitrate` parameter for standard CAN or the `fd`, `f_clock`,
155+
`f_clock_mhz`, `nom_brp`, `nom_tseg1`, `nom_tseg2`, `nom_sjw`, `data_brp`,
156+
`data_tseg1`, `data_tseg2`, and `data_sjw` parameters for CAN FD.
157+
Note that the `f_clock` value of the `timing` instance must be 8_000_000
158+
for standard CAN or any of the following values for CAN FD: 20_000_000,
159+
24_000_000, 30_000_000, 40_000_000, 60_000_000, 80_000_000.
160+
145161
:param int bitrate:
146162
Bitrate of channel in bit/s.
147163
Default is 500 kbit/s.
@@ -231,8 +247,7 @@ def __init__(
231247
raise ValueError(err_msg)
232248

233249
self.channel_info = str(channel)
234-
self.fd = kwargs.get("fd", False)
235-
pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K)
250+
self.fd = isinstance(timing, BitTimingFd) if timing else kwargs.get("fd", False)
236251

237252
hwtype = PCAN_TYPE_ISA
238253
ioport = 0x02A0
@@ -245,30 +260,41 @@ def __init__(
245260

246261
self.check_api_version()
247262

248-
if state is BusState.ACTIVE or state is BusState.PASSIVE:
263+
if state in [BusState.ACTIVE, BusState.PASSIVE]:
249264
self.state = state
250265
else:
251266
raise ValueError("BusState must be Active or Passive")
252267

253-
if self.fd:
254-
f_clock_val = kwargs.get("f_clock", None)
255-
if f_clock_val is None:
256-
f_clock = "{}={}".format("f_clock_mhz", kwargs.get("f_clock_mhz", None))
257-
else:
258-
f_clock = "{}={}".format("f_clock", kwargs.get("f_clock", None))
259-
260-
fd_parameters_values = [f_clock] + [
261-
f"{key}={kwargs.get(key, None)}"
262-
for key in PCAN_FD_PARAMETER_LIST
263-
if kwargs.get(key, None) is not None
268+
if isinstance(timing, BitTiming):
269+
timing = check_or_adjust_timing_clock(timing, VALID_PCAN_CAN_CLOCKS)
270+
pcan_bitrate = TPCANBaudrate(timing.btr0 << 8 | timing.btr1)
271+
result = self.m_objPCANBasic.Initialize(
272+
self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt
273+
)
274+
elif self.fd:
275+
if isinstance(timing, BitTimingFd):
276+
timing = check_or_adjust_timing_clock(
277+
timing, sorted(VALID_PCAN_FD_CLOCKS, reverse=True)
278+
)
279+
# We dump the timing parameters into the kwargs because they have equal names
280+
# as the kwargs parameters and this saves us one additional code path
281+
kwargs.update(timing)
282+
283+
clock_param = "f_clock" if "f_clock" in kwargs else "f_clock_mhz"
284+
fd_parameters_values = [
285+
f"{key}={kwargs[key]}"
286+
for key in (clock_param,) + PCAN_FD_PARAMETER_LIST
287+
if key in kwargs
264288
]
265289

266-
self.fd_bitrate = " ,".join(fd_parameters_values).encode("ascii")
290+
self.fd_bitrate = ", ".join(fd_parameters_values).encode("ascii")
267291

268292
result = self.m_objPCANBasic.InitializeFD(
269293
self.m_PcanHandle, self.fd_bitrate
270294
)
295+
271296
else:
297+
pcan_bitrate = PCAN_BITRATES.get(bitrate, PCAN_BAUD_500K)
272298
result = self.m_objPCANBasic.Initialize(
273299
self.m_PcanHandle, pcan_bitrate, hwtype, ioport, interrupt
274300
)
@@ -312,7 +338,7 @@ def __init__(
312338
if result != PCAN_ERROR_OK:
313339
raise PcanCanInitializationError(self._get_formatted_error(result))
314340

315-
super().__init__(channel=channel, state=state, bitrate=bitrate, *args, **kwargs)
341+
super().__init__(channel=channel, state=state, bitrate=bitrate, **kwargs)
316342

317343
def _find_channel_by_dev_id(self, device_id):
318344
"""

test/test_pcan.py

Lines changed: 73 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,18 @@
33
"""
44

55
import ctypes
6-
import platform
76
import unittest
87
from unittest import mock
98
from unittest.mock import Mock, patch
109

11-
1210
import pytest
1311
from parameterized import parameterized
1412

1513
import can
1614
from can.bus import BusState
1715
from can.exceptions import CanInitializationError
18-
from can.interfaces.pcan.basic import *
1916
from can.interfaces.pcan import PcanBus, PcanError
17+
from can.interfaces.pcan.basic import *
2018

2119

2220
class TestPCANBus(unittest.TestCase):
@@ -53,22 +51,52 @@ def _mockGetValue(self, channel, parameter):
5351

5452
def test_bus_creation(self) -> None:
5553
self.bus = can.Bus(interface="pcan")
54+
5655
self.assertIsInstance(self.bus, PcanBus)
5756
self.MockPCANBasic.assert_called_once()
57+
5858
self.mock_pcan.Initialize.assert_called_once()
5959
self.mock_pcan.InitializeFD.assert_not_called()
6060

6161
def test_bus_creation_state_error(self) -> None:
6262
with self.assertRaises(ValueError):
6363
can.Bus(interface="pcan", state=BusState.ERROR)
6464

65-
def test_bus_creation_fd(self) -> None:
66-
self.bus = can.Bus(interface="pcan", fd=True)
65+
@parameterized.expand([("f_clock", 8_000_000), ("f_clock_mhz", 8)])
66+
def test_bus_creation_fd(self, clock_param: str, clock_val: int) -> None:
67+
self.bus = can.Bus(
68+
interface="pcan",
69+
fd=True,
70+
nom_brp=1,
71+
nom_tseg1=129,
72+
nom_tseg2=30,
73+
nom_sjw=1,
74+
data_brp=1,
75+
data_tseg1=9,
76+
data_tseg2=6,
77+
data_sjw=1,
78+
channel="PCAN_USBBUS1",
79+
**{clock_param: clock_val},
80+
)
81+
6782
self.assertIsInstance(self.bus, PcanBus)
6883
self.MockPCANBasic.assert_called_once()
6984
self.mock_pcan.Initialize.assert_not_called()
7085
self.mock_pcan.InitializeFD.assert_called_once()
7186

87+
# Retrieve second argument of first call
88+
bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1]
89+
90+
self.assertTrue(f"{clock_param}={clock_val}".encode("ascii") in bitrate_arg)
91+
self.assertTrue(b"nom_brp=1" in bitrate_arg)
92+
self.assertTrue(b"nom_tseg1=129" in bitrate_arg)
93+
self.assertTrue(b"nom_tseg2=30" in bitrate_arg)
94+
self.assertTrue(b"nom_sjw=1" in bitrate_arg)
95+
self.assertTrue(b"data_brp=1" in bitrate_arg)
96+
self.assertTrue(b"data_tseg1=9" in bitrate_arg)
97+
self.assertTrue(b"data_tseg2=6" in bitrate_arg)
98+
self.assertTrue(b"data_sjw=1" in bitrate_arg)
99+
72100
def test_api_version_low(self) -> None:
73101
self.PCAN_API_VERSION_SIM = "1.0"
74102
with self.assertLogs("can.pcan", level="WARNING") as cm:
@@ -333,6 +361,11 @@ def test_state(self, name, bus_state: BusState, expected_parameter) -> None:
333361
(PCAN_USBBUS1, PCAN_LISTEN_ONLY, expected_parameter),
334362
)
335363

364+
def test_state_constructor(self):
365+
for state in [BusState.ACTIVE, BusState.PASSIVE]:
366+
bus = can.Bus(interface="pcan", state=state)
367+
assert bus.state == state
368+
336369
def test_detect_available_configs(self) -> None:
337370
if platform.system() == "Darwin":
338371
self.mock_pcan.GetValue = Mock(
@@ -381,7 +414,8 @@ def get_value_side_effect(handle, param):
381414
self.mock_pcan.GetValue = Mock(side_effect=get_value_side_effect)
382415

383416
if expected_result == "error":
384-
self.assertRaises(ValueError, can.Bus, interface="pcan", device_id=dev_id)
417+
with self.assertRaises(ValueError):
418+
can.Bus(interface="pcan", device_id=dev_id)
385419
else:
386420
self.bus = can.Bus(interface="pcan", device_id=dev_id)
387421
self.assertEqual(expected_result, self.bus.channel_info)
@@ -416,6 +450,39 @@ def test_peak_fd_bus_constructor_regression(self):
416450

417451
can.Bus(**params)
418452

453+
def test_constructor_bit_timing(self):
454+
timing = can.BitTiming.from_registers(f_clock=8_000_000, btr0=0x47, btr1=0x2F)
455+
can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing)
456+
457+
bitrate_arg = self.mock_pcan.Initialize.call_args[0][1]
458+
self.assertEqual(bitrate_arg.value, 0x472F)
459+
460+
def test_constructor_bit_timing_fd(self):
461+
timing = can.BitTimingFd(
462+
f_clock=40_000_000,
463+
nom_brp=1,
464+
nom_tseg1=129,
465+
nom_tseg2=30,
466+
nom_sjw=1,
467+
data_brp=1,
468+
data_tseg1=9,
469+
data_tseg2=6,
470+
data_sjw=1,
471+
)
472+
can.Bus(interface="pcan", channel="PCAN_USBBUS1", timing=timing)
473+
474+
bitrate_arg = self.mock_pcan.InitializeFD.call_args[0][-1]
475+
476+
self.assertTrue(b"f_clock=40000000" in bitrate_arg)
477+
self.assertTrue(b"nom_brp=1" in bitrate_arg)
478+
self.assertTrue(b"nom_tseg1=129" in bitrate_arg)
479+
self.assertTrue(b"nom_tseg2=30" in bitrate_arg)
480+
self.assertTrue(b"nom_sjw=1" in bitrate_arg)
481+
self.assertTrue(b"data_brp=1" in bitrate_arg)
482+
self.assertTrue(b"data_tseg1=9" in bitrate_arg)
483+
self.assertTrue(b"data_tseg2=6" in bitrate_arg)
484+
self.assertTrue(b"data_sjw=1" in bitrate_arg)
485+
419486

420487
if __name__ == "__main__":
421488
unittest.main()

0 commit comments

Comments
 (0)