1
1
from typing import Any , Callable , Iterable
2
2
from heapq import heappush , heappop
3
- from hal import report , initializeNotifier , setNotifierName , observeUserProgramStarting , updateNotifierAlarm , \
4
- waitForNotifierAlarm , stopNotifier , tResourceType , tInstances
3
+ from hal import (
4
+ report ,
5
+ initializeNotifier ,
6
+ setNotifierName ,
7
+ observeUserProgramStarting ,
8
+ updateNotifierAlarm ,
9
+ waitForNotifierAlarm ,
10
+ stopNotifier ,
11
+ tResourceType ,
12
+ tInstances ,
13
+ )
5
14
from wpilib import RobotController
6
15
7
16
from .iterativerobotpy import IterativeRobotPy
10
19
_kResourceType_Framework = tResourceType .kResourceType_Framework
11
20
_kFramework_Timed = tInstances .kFramework_Timed
12
21
22
+
13
23
class _Callback :
14
- def __init__ (self , func : Callable [[],None ], periodUs : int , expirationUs : int ):
24
+ def __init__ (self , func : Callable [[], None ], periodUs : int , expirationUs : int ):
15
25
self .func = func
16
26
self ._periodUs = periodUs
17
27
self .expirationUs = expirationUs
18
28
19
29
@classmethod
20
- def makeCallBack (cls ,
21
- func : Callable [[],None ],
22
- startTimeUs : int ,
23
- periodUs : int ,
24
- offsetUs : int ) -> "_Callback" :
25
-
26
- callback = _Callback (
27
- func = func ,
28
- periodUs = periodUs ,
29
- expirationUs = startTimeUs
30
- )
30
+ def makeCallBack (
31
+ cls , func : Callable [[], None ], startTimeUs : int , periodUs : int , offsetUs : int
32
+ ) -> "_Callback" :
33
+
34
+ callback = _Callback (func = func , periodUs = periodUs , expirationUs = startTimeUs )
31
35
32
36
currentTimeUs = _getFPGATime ()
33
- callback .expirationUs = offsetUs + callback .calcFutureExpirationUs (currentTimeUs )
37
+ callback .expirationUs = offsetUs + callback .calcFutureExpirationUs (
38
+ currentTimeUs
39
+ )
34
40
return callback
35
41
36
42
def calcFutureExpirationUs (self , currentTimeUs : int ) -> int :
@@ -40,8 +46,11 @@ def calcFutureExpirationUs(self, currentTimeUs: int) -> int:
40
46
# callback wouldn't be running otherwise.
41
47
# todo does this math work?
42
48
# todo does the "// periodUs * periodUs" do the correct integer math?
43
- return self .expirationUs + self ._periodUs + \
44
- ((currentTimeUs - self .expirationUs ) // self ._periodUs ) * self ._periodUs
49
+ return (
50
+ self .expirationUs
51
+ + self ._periodUs
52
+ + ((currentTimeUs - self .expirationUs ) // self ._periodUs ) * self ._periodUs
53
+ )
45
54
46
55
def setNextStartTimeUs (self , currentTimeUs : int ) -> None :
47
56
self .expirationUs = self .calcFutureExpirationUs (currentTimeUs )
@@ -63,7 +72,7 @@ def add(self, item: Any) -> None:
63
72
def pop (self ) -> Any :
64
73
return heappop (self ._data )
65
74
66
- def peek (self ) -> Any | None :
75
+ def peek (self ) -> Any | None :
67
76
if self ._data :
68
77
return self ._data [0 ]
69
78
else :
@@ -94,7 +103,9 @@ def __init__(self, period: float = 0.020):
94
103
95
104
self ._notifier , status = initializeNotifier ()
96
105
if status != 0 :
97
- raise RuntimeError (f"initializeNotifier() returned { self ._notifier } , { status } " )
106
+ raise RuntimeError (
107
+ f"initializeNotifier() returned { self ._notifier } , { status } "
108
+ )
98
109
99
110
status = setNotifierName (self ._notifier , "TimedRobot" )
100
111
if status != 0 :
@@ -126,7 +137,9 @@ def startCompetition(self) -> None:
126
137
127
138
currentTimeUs , status = waitForNotifierAlarm (self ._notifier )
128
139
if status != 0 :
129
- raise RuntimeError (f"waitForNotifierAlarm() returned currentTimeUs={ currentTimeUs } status={ status } " )
140
+ raise RuntimeError (
141
+ f"waitForNotifierAlarm() returned currentTimeUs={ currentTimeUs } status={ status } "
142
+ )
130
143
131
144
if currentTimeUs == 0 :
132
145
# when HAL_StopNotifier(self.notifier) is called the above waitForNotifierAlarm
@@ -142,7 +155,9 @@ def startCompetition(self) -> None:
142
155
callback = self ._callbacks .pop ()
143
156
self ._runCallbackAndReschedule (callback , currentTimeUs )
144
157
145
- def _runCallbackAndReschedule (self , callback : Callable [[],None ], currentTimeUs : int ) -> None :
158
+ def _runCallbackAndReschedule (
159
+ self , callback : Callable [[], None ], currentTimeUs : int
160
+ ) -> None :
146
161
callback .func ()
147
162
callback .setNextStartTimeUs (currentTimeUs )
148
163
self ._callbacks .add (callback )
@@ -151,16 +166,16 @@ def endCompetition(self) -> None:
151
166
stopNotifier (self ._notifier )
152
167
153
168
def getLoopStartTime (self ) -> float :
154
- return self .loopStartTimeUs / 1e6 # units are seconds
155
-
156
- def addPeriodic (self ,
157
- callback : Callable [[],None ],
158
- period : float , # units are seconds
159
- offset : float = 0.0 # units are seconds
160
- ) -> None :
169
+ return self .loopStartTimeUs / 1e6 # units are seconds
170
+
171
+ def addPeriodic (
172
+ self ,
173
+ callback : Callable [[], None ],
174
+ period : float , # units are seconds
175
+ offset : float = 0.0 , # units are seconds
176
+ ) -> None :
161
177
self ._callbacks .add (
162
178
_Callback .makeCallBack (
163
- callback ,
164
- self ._startTimeUs , int (period * 1e6 ), int (offset * 1e6 )
179
+ callback , self ._startTimeUs , int (period * 1e6 ), int (offset * 1e6 )
165
180
)
166
181
)
0 commit comments