88import threading
99import time
1010from collections .abc import Awaitable , Iterable
11- from typing import Any , Callable , Optional , Union
11+ from contextlib import AbstractContextManager
12+ from types import TracebackType
13+ from typing import (
14+ Any ,
15+ Callable ,
16+ Final ,
17+ NamedTuple ,
18+ Optional ,
19+ Union ,
20+ )
1221
1322from can .bus import BusABC
1423from can .listener import Listener
1928MessageRecipient = Union [Listener , Callable [[Message ], Union [Awaitable [None ], None ]]]
2029
2130
22- class Notifier :
31+ class _BusNotifierPair (NamedTuple ):
32+ bus : "BusABC"
33+ notifier : "Notifier"
34+
35+
36+ class _NotifierRegistry :
37+ """A registry to manage the association between CAN buses and Notifiers.
38+
39+ This class ensures that a bus is not added to multiple active Notifiers.
40+ """
41+
42+ def __init__ (self ) -> None :
43+ """Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
44+ self .pairs : list [_BusNotifierPair ] = []
45+ self .lock = threading .Lock ()
46+
47+ def register (self , bus : BusABC , notifier : "Notifier" ) -> None :
48+ """Register a bus and its associated notifier.
49+
50+ Ensures that a bus is not added to multiple active :class:`~can.Notifier` instances.
51+
52+ :param bus:
53+ The CAN bus to register.
54+ :param notifier:
55+ The :class:`~can.Notifier` instance associated with the bus.
56+ :raises ValueError:
57+ If the bus is already assigned to an active Notifier.
58+ """
59+ with self .lock :
60+ for pair in self .pairs :
61+ if bus is pair .bus and not pair .notifier .stopped :
62+ raise ValueError (
63+ "A bus can not be added to multiple active Notifier instances."
64+ )
65+ self .pairs .append (_BusNotifierPair (bus , notifier ))
66+
67+ def unregister (self , bus : BusABC , notifier : "Notifier" ) -> None :
68+ """Unregister a bus and its associated notifier.
69+
70+ Removes the bus-notifier pair from the registry.
71+
72+ :param bus:
73+ The CAN bus to unregister.
74+ :param notifier:
75+ The :class:`~can.Notifier` instance associated with the bus.
76+ """
77+ with self .lock :
78+ registered_pairs_to_remove : list [_BusNotifierPair ] = []
79+ for pair in self .pairs :
80+ if pair .bus is bus and pair .notifier is notifier :
81+ registered_pairs_to_remove .append (pair )
82+ for pair in registered_pairs_to_remove :
83+ self .pairs .remove (pair )
84+
85+ def find_instances (self , bus : BusABC ) -> tuple ["Notifier" , ...]:
86+ """Find the :class:`~can.Notifier` instances associated with a given CAN bus.
87+
88+ This method searches the registry for the :class:`~can.Notifier`
89+ that is linked to the specified bus. If the bus is found, the
90+ corresponding :class:`~can.Notifier` instances are returned. If the bus is not
91+ found in the registry, an empty tuple is returned.
92+
93+ :param bus:
94+ The CAN bus for which to find the associated :class:`~can.Notifier` .
95+ :return:
96+ A tuple of :class:`~can.Notifier` instances associated with the given bus.
97+ """
98+ instance_list = []
99+ with self .lock :
100+ for pair in self .pairs :
101+ if bus is pair .bus :
102+ instance_list .append (pair .notifier )
103+ return tuple (instance_list )
104+
105+
106+ class Notifier (AbstractContextManager ):
107+
108+ _registry : Final = _NotifierRegistry ()
109+
23110 def __init__ (
24111 self ,
25112 bus : Union [BusABC , list [BusABC ]],
@@ -33,69 +120,89 @@ def __init__(
33120
34121 .. Note::
35122
36- Remember to call ` stop() ` after all messages are received as
123+ Remember to call :meth:`~can.Notifier. stop` after all messages are received as
37124 many listeners carry out flush operations to persist data.
38125
39126
40- :param bus: A :ref:`bus` or a list of buses to listen to.
127+ :param bus:
128+ A :ref:`bus` or a list of buses to consume messages from.
41129 :param listeners:
42130 An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
43131 and return nothing.
44- :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
45- :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
132+ :param timeout:
133+ An optional maximum number of seconds to wait for any :class:`~can.Message`.
134+ :param loop:
135+ An :mod:`asyncio` event loop to schedule the ``listeners`` in.
136+ :raises ValueError:
137+ If a passed in *bus* is already assigned to an active :class:`~can.Notifier`.
46138 """
47139 self .listeners : list [MessageRecipient ] = list (listeners )
48- self .bus = bus
140+ self ._bus_list : list [ BusABC ] = []
49141 self .timeout = timeout
50142 self ._loop = loop
51143
52144 #: Exception raised in thread
53145 self .exception : Optional [Exception ] = None
54146
55- self ._running = True
147+ self ._stopped = False
56148 self ._lock = threading .Lock ()
57149
58150 self ._readers : list [Union [int , threading .Thread ]] = []
59- buses = self . bus if isinstance (self . bus , list ) else [self . bus ]
60- for each_bus in buses :
151+ _bus_list : list [ BusABC ] = bus if isinstance (bus , list ) else [bus ]
152+ for each_bus in _bus_list :
61153 self .add_bus (each_bus )
62154
155+ @property
156+ def bus (self ) -> Union [BusABC , tuple ["BusABC" , ...]]:
157+ """Return the associated bus or a tuple of buses."""
158+ if len (self ._bus_list ) == 1 :
159+ return self ._bus_list [0 ]
160+ return tuple (self ._bus_list )
161+
63162 def add_bus (self , bus : BusABC ) -> None :
64163 """Add a bus for notification.
65164
66165 :param bus:
67166 CAN bus instance.
167+ :raises ValueError:
168+ If the *bus* is already assigned to an active :class:`~can.Notifier`.
68169 """
69- reader : int = - 1
170+ # add bus to notifier registry
171+ Notifier ._registry .register (bus , self )
172+
173+ # add bus to internal bus list
174+ self ._bus_list .append (bus )
175+
176+ file_descriptor : int = - 1
70177 try :
71- reader = bus .fileno ()
178+ file_descriptor = bus .fileno ()
72179 except NotImplementedError :
73180 # Bus doesn't support fileno, we fall back to thread based reader
74181 pass
75182
76- if self ._loop is not None and reader >= 0 :
183+ if self ._loop is not None and file_descriptor >= 0 :
77184 # Use bus file descriptor to watch for messages
78- self ._loop .add_reader (reader , self ._on_message_available , bus )
79- self ._readers .append (reader )
185+ self ._loop .add_reader (file_descriptor , self ._on_message_available , bus )
186+ self ._readers .append (file_descriptor )
80187 else :
81188 reader_thread = threading .Thread (
82189 target = self ._rx_thread ,
83190 args = (bus ,),
84- name = f'can.notifier for bus "{ bus .channel_info } "' ,
191+ name = f'{ self . __class__ . __qualname__ } for bus "{ bus .channel_info } "' ,
85192 )
86193 reader_thread .daemon = True
87194 reader_thread .start ()
88195 self ._readers .append (reader_thread )
89196
90- def stop (self , timeout : float = 5 ) -> None :
197+ def stop (self , timeout : float = 5.0 ) -> None :
91198 """Stop notifying Listeners when new :class:`~can.Message` objects arrive
92199 and call :meth:`~can.Listener.stop` on each Listener.
93200
94201 :param timeout:
95202 Max time in seconds to wait for receive threads to finish.
96203 Should be longer than timeout given at instantiation.
97204 """
98- self ._running = False
205+ self ._stopped = True
99206 end_time = time .time () + timeout
100207 for reader in self ._readers :
101208 if isinstance (reader , threading .Thread ):
@@ -109,6 +216,10 @@ def stop(self, timeout: float = 5) -> None:
109216 if hasattr (listener , "stop" ):
110217 listener .stop ()
111218
219+ # remove bus from registry
220+ for bus in self ._bus_list :
221+ Notifier ._registry .unregister (bus , self )
222+
112223 def _rx_thread (self , bus : BusABC ) -> None :
113224 # determine message handling callable early, not inside while loop
114225 if self ._loop :
@@ -119,7 +230,7 @@ def _rx_thread(self, bus: BusABC) -> None:
119230 else :
120231 handle_message = self ._on_message_received
121232
122- while self ._running :
233+ while not self ._stopped :
123234 try :
124235 if msg := bus .recv (self .timeout ):
125236 with self ._lock :
@@ -184,3 +295,33 @@ def remove_listener(self, listener: MessageRecipient) -> None:
184295 :raises ValueError: if `listener` was never added to this notifier
185296 """
186297 self .listeners .remove (listener )
298+
299+ @property
300+ def stopped (self ) -> bool :
301+ """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
302+ return self ._stopped
303+
304+ @staticmethod
305+ def find_instances (bus : BusABC ) -> tuple ["Notifier" , ...]:
306+ """Find :class:`~can.Notifier` instances associated with a given CAN bus.
307+
308+ This method searches the registry for the :class:`~can.Notifier`
309+ that is linked to the specified bus. If the bus is found, the
310+ corresponding :class:`~can.Notifier` instances are returned. If the bus is not
311+ found in the registry, an empty tuple is returned.
312+
313+ :param bus:
314+ The CAN bus for which to find the associated :class:`~can.Notifier` .
315+ :return:
316+ A tuple of :class:`~can.Notifier` instances associated with the given bus.
317+ """
318+ return Notifier ._registry .find_instances (bus )
319+
320+ def __exit__ (
321+ self ,
322+ exc_type : Optional [type [BaseException ]],
323+ exc_value : Optional [BaseException ],
324+ traceback : Optional [TracebackType ],
325+ ) -> None :
326+ if not self ._stopped :
327+ self .stop ()
0 commit comments