1
1
#!/usr/bin/env python
2
2
3
3
import unittest
4
+ from typing import cast
5
+
6
+ import serial
4
7
5
8
import can
6
9
15
18
16
19
https://realpython.com/pypy-faster-python/#it-doesnt-work-well-with-c-extensions
17
20
"""
18
- TIMEOUT = 0.5 if IS_PYPY else 0.001 # 0.001 is the default set in slcanBus
21
+ TIMEOUT = 0.5 if IS_PYPY else 0.01 # 0.001 is the default set in slcanBus
19
22
20
23
21
24
class slcanTestCase (unittest .TestCase ):
22
25
def setUp (self ):
23
- self .bus = can .Bus (
24
- "loop://" , interface = "slcan" , sleep_after_open = 0 , timeout = TIMEOUT
26
+ self .bus = cast (
27
+ can .interfaces .slcan .slcanBus ,
28
+ can .Bus ("loop://" , interface = "slcan" , sleep_after_open = 0 , timeout = TIMEOUT ),
25
29
)
26
- self .serial = self .bus .serialPortOrig
27
- self .serial .read ( self . serial . in_waiting )
30
+ self .serial = cast ( serial . Serial , self .bus .serialPortOrig )
31
+ self .serial .reset_input_buffer ( )
28
32
29
33
def tearDown (self ):
30
34
self .bus .shutdown ()
@@ -44,8 +48,9 @@ def test_send_extended(self):
44
48
arbitration_id = 0x12ABCDEF , is_extended_id = True , data = [0xAA , 0x55 ]
45
49
)
46
50
self .bus .send (msg )
47
- data = self .serial .read (self .serial .in_waiting )
48
- self .assertEqual (data , b"T12ABCDEF2AA55\r " )
51
+ expected = b"T12ABCDEF2AA55\r "
52
+ data = self .serial .read (len (expected ))
53
+ self .assertEqual (data , expected )
49
54
50
55
def test_recv_standard (self ):
51
56
self .serial .write (b"t4563112233\r " )
@@ -62,8 +67,9 @@ def test_send_standard(self):
62
67
arbitration_id = 0x456 , is_extended_id = False , data = [0x11 , 0x22 , 0x33 ]
63
68
)
64
69
self .bus .send (msg )
65
- data = self .serial .read (self .serial .in_waiting )
66
- self .assertEqual (data , b"t4563112233\r " )
70
+ expected = b"t4563112233\r "
71
+ data = self .serial .read (len (expected ))
72
+ self .assertEqual (data , expected )
67
73
68
74
def test_recv_standard_remote (self ):
69
75
self .serial .write (b"r1238\r " )
@@ -79,8 +85,9 @@ def test_send_standard_remote(self):
79
85
arbitration_id = 0x123 , is_extended_id = False , is_remote_frame = True , dlc = 8
80
86
)
81
87
self .bus .send (msg )
82
- data = self .serial .read (self .serial .in_waiting )
83
- self .assertEqual (data , b"r1238\r " )
88
+ expected = b"r1238\r "
89
+ data = self .serial .read (len (expected ))
90
+ self .assertEqual (data , expected )
84
91
85
92
def test_recv_extended_remote (self ):
86
93
self .serial .write (b"R12ABCDEF6\r " )
@@ -96,8 +103,9 @@ def test_send_extended_remote(self):
96
103
arbitration_id = 0x12ABCDEF , is_extended_id = True , is_remote_frame = True , dlc = 6
97
104
)
98
105
self .bus .send (msg )
99
- data = self .serial .read (self .serial .in_waiting )
100
- self .assertEqual (data , b"R12ABCDEF6\r " )
106
+ expected = b"R12ABCDEF6\r "
107
+ data = self .serial .read (len (expected ))
108
+ self .assertEqual (data , expected )
101
109
102
110
def test_partial_recv (self ):
103
111
self .serial .write (b"T12ABCDEF" )
0 commit comments