1
+ from typing import Any , Callable , Iterable
1
2
from heapq import heappush , heappop
2
3
from hal import report , initializeNotifier , setNotifierName , observeUserProgramStarting , updateNotifierAlarm , \
3
4
waitForNotifierAlarm , stopNotifier , tResourceType , tInstances
10
11
_kFramework_Timed = tInstances .kFramework_Timed
11
12
12
13
class _Callback :
13
- def __init__ (self , func , periodUs : int , expirationUs : int ):
14
+ def __init__ (self , func : Callable [[], None ] , periodUs : int , expirationUs : int ):
14
15
self .func = func
15
16
self ._periodUs = periodUs
16
17
self .expirationUs = expirationUs
17
18
18
19
@classmethod
19
20
def makeCallBack (cls ,
20
- func ,
21
+ func : Callable [[], None ] ,
21
22
startTimeUs : int ,
22
23
periodUs : int ,
23
- offsetUs : int ):
24
+ offsetUs : int ) -> "_Callback" :
24
25
25
26
callback = _Callback (
26
- func ,
27
+ func = func ,
27
28
periodUs = periodUs ,
28
29
expirationUs = startTimeUs
29
30
)
@@ -42,42 +43,42 @@ def calcFutureExpirationUs(self, currentTimeUs: int) -> int:
42
43
return self .expirationUs + self ._periodUs + \
43
44
((currentTimeUs - self .expirationUs ) // self ._periodUs ) * self ._periodUs
44
45
45
- def setNextStartTimeUs (self , currentTimeUs : int ):
46
+ def setNextStartTimeUs (self , currentTimeUs : int ) -> None :
46
47
self .expirationUs = self .calcFutureExpirationUs (currentTimeUs )
47
48
48
- def __lt__ (self , other ):
49
+ def __lt__ (self , other ) -> bool :
49
50
return self .expirationUs < other .expirationUs
50
51
51
- def __bool__ (self ):
52
+ def __bool__ (self ) -> bool :
52
53
return True
53
54
54
55
55
56
class _OrderedList :
56
57
def __init__ (self ):
57
- self ._data = []
58
+ self ._data : list [ Any ] = []
58
59
59
- def add (self , item ) :
60
+ def add (self , item : Any ) -> None :
60
61
heappush (self ._data , item )
61
62
62
- def pop (self ):
63
+ def pop (self ) -> Any :
63
64
return heappop (self ._data )
64
65
65
- def peek (self ):
66
+ def peek (self ) -> Any | None :
66
67
if self ._data :
67
68
return self ._data [0 ]
68
69
else :
69
70
return None
70
71
71
- def __len__ (self ):
72
+ def __len__ (self ) -> int :
72
73
return len (self ._data )
73
74
74
- def __iter__ (self ):
75
+ def __iter__ (self ) -> Iterable [ Any ] :
75
76
return iter (sorted (self ._data ))
76
77
77
- def __contains__ (self , item ):
78
+ def __contains__ (self , item ) -> bool :
78
79
return item in self ._data
79
80
80
- def __str__ (self ):
81
+ def __str__ (self ) -> str :
81
82
return str (sorted (self ._data ))
82
83
83
84
@@ -93,13 +94,11 @@ def __init__(self, period: float = 0.020):
93
94
94
95
self ._notifier , status = initializeNotifier ()
95
96
if status != 0 :
96
- message = f"initializeNotifier() returned { status } { self ._notifier } "
97
- #raise RuntimeError(message) # todo
97
+ raise RuntimeError (f"initializeNotifier() returned { self ._notifier } , { status } " )
98
98
99
99
status = setNotifierName (self ._notifier , "TimedRobot" )
100
100
if status != 0 :
101
- message = f"setNotifierName() returned { status } "
102
- #raise RuntimeError(message) # todo
101
+ raise RuntimeError (f"setNotifierName() returned { status } " )
103
102
104
103
report (_kResourceType_Framework , _kFramework_Timed )
105
104
@@ -123,13 +122,11 @@ def startCompetition(self) -> None:
123
122
124
123
status = updateNotifierAlarm (self ._notifier , callback .expirationUs )
125
124
if status != 0 :
126
- message = f"updateNotifierAlarm() returned { status } "
127
- #raise RuntimeError(message) # todo
125
+ raise RuntimeError (f"updateNotifierAlarm() returned { status } " )
128
126
129
127
currentTimeUs , status = waitForNotifierAlarm (self ._notifier )
130
128
if status != 0 :
131
- message = f"waitForNotifierAlarm() returned currentTimeUs={ currentTimeUs } status={ status } "
132
- #raise RuntimeError(message) # todo
129
+ raise RuntimeError (f"waitForNotifierAlarm() returned currentTimeUs={ currentTimeUs } status={ status } " )
133
130
134
131
if currentTimeUs == 0 :
135
132
# when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm
@@ -145,33 +142,22 @@ def startCompetition(self) -> None:
145
142
callback = self ._callbacks .pop ()
146
143
self ._runCallbackAndReschedule (callback , currentTimeUs )
147
144
148
- def _runCallbackAndReschedule (self , callback , currentTimeUs :int ):
145
+ def _runCallbackAndReschedule (self , callback : Callable [[], None ], currentTimeUs : int ) -> None :
149
146
callback .func ()
150
147
callback .setNextStartTimeUs (currentTimeUs )
151
148
self ._callbacks .add (callback )
152
149
153
- def endCompetition (self ):
150
+ def endCompetition (self ) -> None :
154
151
stopNotifier (self ._notifier )
155
152
156
- """
157
- todo this doesn't really translate to python (is it really needed?):
158
-
159
- TimedRobot::~TimedRobot() {
160
- if (m_notifier != HAL_kInvalidHandle) {
161
- int32_t status = 0;
162
- HAL_StopNotifier(m_notifier, &status);
163
- FRC_ReportError(status, "StopNotifier");
164
- }
165
- }
166
- """
167
-
168
- def getLoopStartTime (self ):
169
- return self .loopStartTimeUs / 1e6 # todo units are seconds
153
+ def getLoopStartTime (self ) -> float :
154
+ return self .loopStartTimeUs / 1e6 # units are seconds
170
155
171
156
def addPeriodic (self ,
172
- callback , # todo typehint
173
- period : float , # todo units seconds
174
- offset : float = 0.0 ): # todo units seconds
157
+ callback : Callable [[],None ],
158
+ period : float , # units are seconds
159
+ offset : float = 0.0 # units are seconds
160
+ ) -> None :
175
161
self ._callbacks .add (
176
162
_Callback .makeCallBack (
177
163
callback ,
0 commit comments