77import logging
88import threading
99import time
10- from typing import Any , Awaitable , Callable , Iterable , List , Optional , Union
10+ import typing
11+ from contextlib import AbstractContextManager
12+ from types import TracebackType
13+ from typing import Any , Awaitable , Callable , Iterable , List , NamedTuple , Optional , Union
1114
1215from can .bus import BusABC
1316from can .listener import Listener
1821MessageRecipient = Union [Listener , Callable [[Message ], Union [Awaitable [None ], None ]]]
1922
2023
21- class Notifier :
24+ class _BusNotifierPair (NamedTuple ):
25+ bus : "BusABC"
26+ notifier : "Notifier"
27+
28+
29+ class _NotifierRegistry :
30+ """A registry to manage the association between CAN buses and Notifiers.
31+
32+ This class ensures that a bus is not added to multiple active Notifiers.
33+ """
34+
35+ def __init__ (self ) -> None :
36+ """Initialize the registry with an empty list of bus-notifier pairs and a threading lock."""
37+ self .pairs : typing .List [_BusNotifierPair ] = []
38+ self .lock = threading .Lock ()
39+
40+ def register (self , bus : BusABC , notifier : "Notifier" ) -> None :
41+ """Register a bus and its associated notifier.
42+
43+ Ensures that a bus is not added to multiple active Notifier instances.
44+
45+ :param bus:
46+ The CAN bus to register.
47+ :param notifier:
48+ The Notifier instance associated with the bus.
49+ :raises ValueError:
50+ If the bus is already assigned to an active Notifier.
51+ """
52+ with self .lock :
53+ for pair in self .pairs :
54+ if bus is pair .bus and not pair .notifier .stopped :
55+ raise ValueError (
56+ "A bus can not be added to multiple active Notifier instances."
57+ )
58+ self .pairs .append (_BusNotifierPair (bus , notifier ))
59+
60+ def unregister (self , bus : BusABC , notifier : "Notifier" ) -> None :
61+ """Unregister a bus and its associated notifier.
62+
63+ Removes the bus-notifier pair from the registry.
64+
65+ :param bus:
66+ The CAN bus to unregister.
67+ :param notifier:
68+ The Notifier instance associated with the bus.
69+ """
70+ with self .lock :
71+ registered_pairs_to_remove : typing .List [_BusNotifierPair ] = []
72+ for pair in self .pairs :
73+ if pair .bus is bus and pair .notifier is notifier :
74+ registered_pairs_to_remove .append (pair )
75+ for pair in registered_pairs_to_remove :
76+ self .pairs .remove (pair )
77+
78+
79+ class Notifier (AbstractContextManager ):
80+
81+ _registry : typing .Final = _NotifierRegistry ()
82+
2283 def __init__ (
2384 self ,
2485 bus : Union [BusABC , List [BusABC ]],
@@ -32,16 +93,21 @@ def __init__(
3293
3394 .. Note::
3495
35- Remember to call ` stop() ` after all messages are received as
96+ Remember to call :meth:`~can.Notifier. stop` after all messages are received as
3697 many listeners carry out flush operations to persist data.
3798
3899
39- :param bus: A :ref:`bus` or a list of buses to listen to.
100+ :param bus:
101+ A :ref:`bus` or a list of buses to listen to.
40102 :param listeners:
41103 An iterable of :class:`~can.Listener` or callables that receive a :class:`~can.Message`
42104 and return nothing.
43- :param timeout: An optional maximum number of seconds to wait for any :class:`~can.Message`.
44- :param loop: An :mod:`asyncio` event loop to schedule the ``listeners`` in.
105+ :param timeout:
106+ An optional maximum number of seconds to wait for any :class:`~can.Message`.
107+ :param loop:
108+ An :mod:`asyncio` event loop to schedule the ``listeners`` in.
109+ :raises ValueError:
110+ If the *bus* is already assigned to an active :class:`~can.Notifier`.
45111 """
46112 self .listeners : List [MessageRecipient ] = list (listeners )
47113 self .bus = bus
@@ -51,31 +117,36 @@ def __init__(
51117 #: Exception raised in thread
52118 self .exception : Optional [Exception ] = None
53119
54- self ._running = True
120+ self ._stopped = False
55121 self ._lock = threading .Lock ()
56122
57123 self ._readers : List [Union [int , threading .Thread ]] = []
58- buses = self .bus if isinstance (self .bus , list ) else [self .bus ]
59- for each_bus in buses :
124+ self . _bus_list = self .bus if isinstance (self .bus , list ) else [self .bus ]
125+ for each_bus in self . _bus_list :
60126 self .add_bus (each_bus )
61127
62128 def add_bus (self , bus : BusABC ) -> None :
63129 """Add a bus for notification.
64130
65131 :param bus:
66132 CAN bus instance.
133+ :raises ValueError:
134+ If the *bus* is already assigned to an active :class:`~can.Notifier`.
67135 """
68- reader : int = - 1
136+ # add bus to notifier registry
137+ self ._registry .register (bus , self )
138+
139+ file_descriptor : int = - 1
69140 try :
70- reader = bus .fileno ()
141+ file_descriptor = bus .fileno ()
71142 except NotImplementedError :
72143 # Bus doesn't support fileno, we fall back to thread based reader
73144 pass
74145
75- if self ._loop is not None and reader >= 0 :
146+ if self ._loop is not None and file_descriptor >= 0 :
76147 # Use bus file descriptor to watch for messages
77- self ._loop .add_reader (reader , self ._on_message_available , bus )
78- self ._readers .append (reader )
148+ self ._loop .add_reader (file_descriptor , self ._on_message_available , bus )
149+ self ._readers .append (file_descriptor )
79150 else :
80151 reader_thread = threading .Thread (
81152 target = self ._rx_thread ,
@@ -86,15 +157,15 @@ def add_bus(self, bus: BusABC) -> None:
86157 reader_thread .start ()
87158 self ._readers .append (reader_thread )
88159
89- def stop (self , timeout : float = 5 ) -> None :
160+ def stop (self , timeout : float = 5.0 ) -> None :
90161 """Stop notifying Listeners when new :class:`~can.Message` objects arrive
91162 and call :meth:`~can.Listener.stop` on each Listener.
92163
93164 :param timeout:
94165 Max time in seconds to wait for receive threads to finish.
95166 Should be longer than timeout given at instantiation.
96167 """
97- self ._running = False
168+ self ._stopped = True
98169 end_time = time .time () + timeout
99170 for reader in self ._readers :
100171 if isinstance (reader , threading .Thread ):
@@ -108,6 +179,10 @@ def stop(self, timeout: float = 5) -> None:
108179 if hasattr (listener , "stop" ):
109180 listener .stop ()
110181
182+ # remove bus from registry
183+ for bus in self ._bus_list :
184+ self ._registry .unregister (bus , self )
185+
111186 def _rx_thread (self , bus : BusABC ) -> None :
112187 # determine message handling callable early, not inside while loop
113188 if self ._loop :
@@ -118,7 +193,7 @@ def _rx_thread(self, bus: BusABC) -> None:
118193 else :
119194 handle_message = self ._on_message_received
120195
121- while self ._running :
196+ while not self ._stopped :
122197 try :
123198 if msg := bus .recv (self .timeout ):
124199 with self ._lock :
@@ -183,3 +258,17 @@ def remove_listener(self, listener: MessageRecipient) -> None:
183258 :raises ValueError: if `listener` was never added to this notifier
184259 """
185260 self .listeners .remove (listener )
261+
262+ @property
263+ def stopped (self ) -> bool :
264+ """Return ``True``, if Notifier was properly shut down with :meth:`~can.Notifier.stop`."""
265+ return self ._stopped
266+
267+ def __exit__ (
268+ self ,
269+ exc_type : Optional [typing .Type [BaseException ]],
270+ exc_value : Optional [BaseException ],
271+ traceback : Optional [TracebackType ],
272+ ) -> None :
273+ if not self ._stopped :
274+ self .stop ()
0 commit comments