Skip to content

Commit 7918209

Browse files
bessmanzariiii9003
andauthored
Add auto-modifying cyclic tasks (#703)
* Add auto-modifying cyclic tasks * Make sure self.modifier_callback exists * Don't break socketcan Added modifier_callback arguments where necessary, and changed MRO of sockercan's CyclicSendTask to match that of the fallback. * Remove __init__ from ModifiableCyclicTaskABC This makes several changes to socketcan in previous commits unnecessary. These changes are also removed. * Forgot some brackets... * Reformatting by black * modifier_callback should change one message per send * Forgot to change type hint * fix CI * use mutating callback, adapt SocketcanBus and ixxat, add test * improve docstring * fix ixxat imports --------- Co-authored-by: zariiii9003 <[email protected]> Co-authored-by: zariiii9003 <[email protected]>
1 parent 09213b1 commit 7918209

File tree

8 files changed

+274
-71
lines changed

8 files changed

+274
-71
lines changed

can/broadcastmanager.py

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def stop(self) -> None:
5353
"""
5454

5555

56-
class CyclicSendTaskABC(CyclicTask):
56+
class CyclicSendTaskABC(CyclicTask, abc.ABC):
5757
"""
5858
Message send task with defined period
5959
"""
@@ -114,7 +114,7 @@ def _check_and_convert_messages(
114114
return messages
115115

116116

117-
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC):
117+
class LimitedDurationCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
118118
def __init__(
119119
self,
120120
messages: Union[Sequence[Message], Message],
@@ -136,17 +136,15 @@ def __init__(
136136
self.duration = duration
137137

138138

139-
class RestartableCyclicTaskABC(CyclicSendTaskABC):
139+
class RestartableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
140140
"""Adds support for restarting a stopped cyclic task"""
141141

142142
@abc.abstractmethod
143143
def start(self) -> None:
144144
"""Restart a stopped periodic task."""
145145

146146

147-
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
148-
"""Adds support for modifying a periodic message"""
149-
147+
class ModifiableCyclicTaskABC(CyclicSendTaskABC, abc.ABC):
150148
def _check_modified_messages(self, messages: Tuple[Message, ...]) -> None:
151149
"""Helper function to perform error checking when modifying the data in
152150
the cyclic task.
@@ -190,7 +188,7 @@ def modify_data(self, messages: Union[Sequence[Message], Message]) -> None:
190188
self.messages = messages
191189

192190

193-
class MultiRateCyclicSendTaskABC(CyclicSendTaskABC):
191+
class MultiRateCyclicSendTaskABC(CyclicSendTaskABC, abc.ABC):
194192
"""A Cyclic send task that supports switches send frequency after a set time."""
195193

196194
def __init__(
@@ -218,7 +216,7 @@ def __init__(
218216

219217

220218
class ThreadBasedCyclicSendTask(
221-
ModifiableCyclicTaskABC, LimitedDurationCyclicSendTaskABC, RestartableCyclicTaskABC
219+
LimitedDurationCyclicSendTaskABC, ModifiableCyclicTaskABC, RestartableCyclicTaskABC
222220
):
223221
"""Fallback cyclic send task using daemon thread."""
224222

@@ -230,6 +228,7 @@ def __init__(
230228
period: float,
231229
duration: Optional[float] = None,
232230
on_error: Optional[Callable[[Exception], bool]] = None,
231+
modifier_callback: Optional[Callable[[Message], None]] = None,
233232
) -> None:
234233
"""Transmits `messages` with a `period` seconds for `duration` seconds on a `bus`.
235234
@@ -255,6 +254,7 @@ def __init__(
255254
time.perf_counter() + duration if duration else None
256255
)
257256
self.on_error = on_error
257+
self.modifier_callback = modifier_callback
258258

259259
if USE_WINDOWS_EVENTS:
260260
self.period_ms = int(round(period * 1000, 0))
@@ -301,14 +301,22 @@ def _run(self) -> None:
301301
# Prevent calling bus.send from multiple threads
302302
with self.send_lock:
303303
try:
304+
if self.modifier_callback is not None:
305+
self.modifier_callback(self.messages[msg_index])
304306
self.bus.send(self.messages[msg_index])
305307
except Exception as exc: # pylint: disable=broad-except
306308
log.exception(exc)
307-
if self.on_error:
308-
if not self.on_error(exc):
309-
break
310-
else:
309+
310+
# stop if `on_error` callback was not given
311+
if self.on_error is None:
312+
self.stop()
313+
raise exc
314+
315+
# stop if `on_error` returns False
316+
if not self.on_error(exc):
317+
self.stop()
311318
break
319+
312320
msg_due_time_ns += self.period_ns
313321
if self.end_time is not None and time.perf_counter() >= self.end_time:
314322
break

can/bus.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from abc import ABC, ABCMeta, abstractmethod
99
from enum import Enum, auto
1010
from time import time
11-
from typing import Any, Iterator, List, Optional, Sequence, Tuple, Union, cast
11+
from typing import Any, Callable, Iterator, List, Optional, Sequence, Tuple, Union, cast
1212

1313
import can
1414
import can.typechecking
@@ -195,6 +195,7 @@ def send_periodic(
195195
period: float,
196196
duration: Optional[float] = None,
197197
store_task: bool = True,
198+
modifier_callback: Optional[Callable[[Message], None]] = None,
198199
) -> can.broadcastmanager.CyclicSendTaskABC:
199200
"""Start sending messages at a given period on this bus.
200201
@@ -216,6 +217,10 @@ def send_periodic(
216217
:param store_task:
217218
If True (the default) the task will be attached to this Bus instance.
218219
Disable to instead manage tasks manually.
220+
:param modifier_callback:
221+
Function which should be used to modify each message's data before
222+
sending. The callback modifies the :attr:`~can.Message.data` of the
223+
message and returns ``None``.
219224
:return:
220225
A started task instance. Note the task can be stopped (and depending on
221226
the backend modified) by calling the task's
@@ -230,7 +235,7 @@ def send_periodic(
230235
231236
.. note::
232237
233-
For extremely long running Bus instances with many short lived
238+
For extremely long-running Bus instances with many short-lived
234239
tasks the default api with ``store_task==True`` may not be
235240
appropriate as the stopped tasks are still taking up memory as they
236241
are associated with the Bus instance.
@@ -247,9 +252,8 @@ def send_periodic(
247252
# Create a backend specific task; will be patched to a _SelfRemovingCyclicTask later
248253
task = cast(
249254
_SelfRemovingCyclicTask,
250-
self._send_periodic_internal(msgs, period, duration),
255+
self._send_periodic_internal(msgs, period, duration, modifier_callback),
251256
)
252-
253257
# we wrap the task's stop method to also remove it from the Bus's list of tasks
254258
periodic_tasks = self._periodic_tasks
255259
original_stop_method = task.stop
@@ -275,6 +279,7 @@ def _send_periodic_internal(
275279
msgs: Union[Sequence[Message], Message],
276280
period: float,
277281
duration: Optional[float] = None,
282+
modifier_callback: Optional[Callable[[Message], None]] = None,
278283
) -> can.broadcastmanager.CyclicSendTaskABC:
279284
"""Default implementation of periodic message sending using threading.
280285
@@ -298,7 +303,12 @@ def _send_periodic_internal(
298303
threading.Lock()
299304
)
300305
task = ThreadBasedCyclicSendTask(
301-
self, self._lock_send_periodic, msgs, period, duration
306+
bus=self,
307+
lock=self._lock_send_periodic,
308+
messages=msgs,
309+
period=period,
310+
duration=duration,
311+
modifier_callback=modifier_callback,
302312
)
303313
return task
304314

can/interfaces/ixxat/canlib.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
from typing import Optional
1+
from typing import Callable, Optional, Sequence, Union
22

33
import can.interfaces.ixxat.canlib_vcinpl as vcinpl
44
import can.interfaces.ixxat.canlib_vcinpl2 as vcinpl2
5-
from can import BusABC, Message
6-
from can.bus import BusState
5+
from can import (
6+
BusABC,
7+
BusState,
8+
CyclicSendTaskABC,
9+
Message,
10+
)
711

812

913
class IXXATBus(BusABC):
@@ -145,8 +149,16 @@ def _recv_internal(self, timeout):
145149
def send(self, msg: Message, timeout: Optional[float] = None) -> None:
146150
return self.bus.send(msg, timeout)
147151

148-
def _send_periodic_internal(self, msgs, period, duration=None):
149-
return self.bus._send_periodic_internal(msgs, period, duration)
152+
def _send_periodic_internal(
153+
self,
154+
msgs: Union[Sequence[Message], Message],
155+
period: float,
156+
duration: Optional[float] = None,
157+
modifier_callback: Optional[Callable[[Message], None]] = None,
158+
) -> CyclicSendTaskABC:
159+
return self.bus._send_periodic_internal(
160+
msgs, period, duration, modifier_callback
161+
)
150162

151163
def shutdown(self) -> None:
152164
super().shutdown()

can/interfaces/ixxat/canlib_vcinpl.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,14 +13,18 @@
1313
import functools
1414
import logging
1515
import sys
16-
from typing import Callable, Optional, Tuple
17-
18-
from can import BusABC, CanProtocol, Message
19-
from can.broadcastmanager import (
16+
import warnings
17+
from typing import Callable, Optional, Sequence, Tuple, Union
18+
19+
from can import (
20+
BusABC,
21+
BusState,
22+
CanProtocol,
23+
CyclicSendTaskABC,
2024
LimitedDurationCyclicSendTaskABC,
25+
Message,
2126
RestartableCyclicTaskABC,
2227
)
23-
from can.bus import BusState
2428
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
2529
from can.ctypesutil import HRESULT as ctypes_HRESULT
2630
from can.exceptions import CanInitializationError, CanInterfaceNotImplementedError
@@ -785,17 +789,39 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
785789
# Want to log outgoing messages?
786790
# log.log(self.RECV_LOGGING_LEVEL, "Sent: %s", message)
787791

788-
def _send_periodic_internal(self, msgs, period, duration=None):
792+
def _send_periodic_internal(
793+
self,
794+
msgs: Union[Sequence[Message], Message],
795+
period: float,
796+
duration: Optional[float] = None,
797+
modifier_callback: Optional[Callable[[Message], None]] = None,
798+
) -> CyclicSendTaskABC:
789799
"""Send a message using built-in cyclic transmit list functionality."""
790-
if self._scheduler is None:
791-
self._scheduler = HANDLE()
792-
_canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler)
793-
caps = structures.CANCAPABILITIES()
794-
_canlib.canSchedulerGetCaps(self._scheduler, caps)
795-
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
796-
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
797-
return CyclicSendTask(
798-
self._scheduler, msgs, period, duration, self._scheduler_resolution
800+
if modifier_callback is None:
801+
if self._scheduler is None:
802+
self._scheduler = HANDLE()
803+
_canlib.canSchedulerOpen(
804+
self._device_handle, self.channel, self._scheduler
805+
)
806+
caps = structures.CANCAPABILITIES()
807+
_canlib.canSchedulerGetCaps(self._scheduler, caps)
808+
self._scheduler_resolution = caps.dwClockFreq / caps.dwCmsDivisor
809+
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
810+
return CyclicSendTask(
811+
self._scheduler, msgs, period, duration, self._scheduler_resolution
812+
)
813+
814+
# fallback to thread based cyclic task
815+
warnings.warn(
816+
f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
817+
"when the `modifier_callback` argument is given."
818+
)
819+
return BusABC._send_periodic_internal(
820+
self,
821+
msgs=msgs,
822+
period=period,
823+
duration=duration,
824+
modifier_callback=modifier_callback,
799825
)
800826

801827
def shutdown(self):

can/interfaces/ixxat/canlib_vcinpl2.py

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,15 @@
1313
import functools
1414
import logging
1515
import sys
16-
from typing import Callable, Optional, Tuple
16+
import warnings
17+
from typing import Callable, Optional, Sequence, Tuple, Union
1718

18-
from can import BusABC, CanProtocol, Message
19-
from can.broadcastmanager import (
19+
from can import (
20+
BusABC,
21+
CanProtocol,
22+
CyclicSendTaskABC,
2023
LimitedDurationCyclicSendTaskABC,
24+
Message,
2125
RestartableCyclicTaskABC,
2226
)
2327
from can.ctypesutil import HANDLE, PHANDLE, CLibrary
@@ -931,19 +935,41 @@ def send(self, msg: Message, timeout: Optional[float] = None) -> None:
931935
else:
932936
_canlib.canChannelPostMessage(self._channel_handle, message)
933937

934-
def _send_periodic_internal(self, msgs, period, duration=None):
938+
def _send_periodic_internal(
939+
self,
940+
msgs: Union[Sequence[Message], Message],
941+
period: float,
942+
duration: Optional[float] = None,
943+
modifier_callback: Optional[Callable[[Message], None]] = None,
944+
) -> CyclicSendTaskABC:
935945
"""Send a message using built-in cyclic transmit list functionality."""
936-
if self._scheduler is None:
937-
self._scheduler = HANDLE()
938-
_canlib.canSchedulerOpen(self._device_handle, self.channel, self._scheduler)
939-
caps = structures.CANCAPABILITIES2()
940-
_canlib.canSchedulerGetCaps(self._scheduler, caps)
941-
self._scheduler_resolution = (
942-
caps.dwCmsClkFreq / caps.dwCmsDivisor
943-
) # TODO: confirm
944-
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
945-
return CyclicSendTask(
946-
self._scheduler, msgs, period, duration, self._scheduler_resolution
946+
if modifier_callback is None:
947+
if self._scheduler is None:
948+
self._scheduler = HANDLE()
949+
_canlib.canSchedulerOpen(
950+
self._device_handle, self.channel, self._scheduler
951+
)
952+
caps = structures.CANCAPABILITIES2()
953+
_canlib.canSchedulerGetCaps(self._scheduler, caps)
954+
self._scheduler_resolution = (
955+
caps.dwCmsClkFreq / caps.dwCmsDivisor
956+
) # TODO: confirm
957+
_canlib.canSchedulerActivate(self._scheduler, constants.TRUE)
958+
return CyclicSendTask(
959+
self._scheduler, msgs, period, duration, self._scheduler_resolution
960+
)
961+
962+
# fallback to thread based cyclic task
963+
warnings.warn(
964+
f"{self.__class__.__name__} falls back to a thread-based cyclic task, "
965+
"when the `modifier_callback` argument is given."
966+
)
967+
return BusABC._send_periodic_internal(
968+
self,
969+
msgs=msgs,
970+
period=period,
971+
duration=duration,
972+
modifier_callback=modifier_callback,
947973
)
948974

949975
def shutdown(self):

0 commit comments

Comments
 (0)