@@ -280,6 +280,49 @@ def subscribe(self, topic: Topic, callback: Coroutine):
280280 future .set_result (None )
281281 return future
282282
283+ def unsubscribe (self , topic : Topic ):
284+ """
285+ Unsubscribe for events
286+
287+ Args:
288+ topic (Topic): the identifier of the event topic to be unsubscribed.
289+ Note: You can use ALL_TOPICS (event_notifier.ALL_TOPICS) to unsubscribe all topics
290+
291+ Returns:
292+ Coroutine: awaitable task to subscribe to topic if connected.
293+ """
294+ # Create none-future which can be safely awaited
295+ # but which also will not give warnings
296+ # if it isn't awaited. This is returned
297+ # on code paths which do not make RPC calls.
298+ none_future = asyncio .Future ()
299+ none_future .set_result (None )
300+
301+ # Topics to potentially make RPC calls about
302+ topics = list (self ._topics ) if topic is ALL_TOPICS else [topic ]
303+
304+ # Handle ALL_TOPICS or specific topics
305+ if topic is ALL_TOPICS and not self ._topics :
306+ logger .warning (f"Cannot unsubscribe 'ALL_TOPICS'. No topics are subscribed." )
307+ return none_future
308+ elif topic is not ALL_TOPICS and topic not in self ._topics :
309+ logger .warning (f"Cannot unsubscribe topic '{ topic } ' which is not subscribed." )
310+ return none_future
311+ elif topic is ALL_TOPICS and self ._topics :
312+ logger .debug (f"Unsubscribing all topics: { self ._topics } " )
313+ # remove all topics and callbacks
314+ self ._topics .clear ()
315+ self ._callbacks .clear ()
316+ elif topic is not ALL_TOPICS and topic in self ._topics :
317+ logger .debug (f"Unsubscribing topic '{ topic } '" )
318+ self ._topics .remove (topic )
319+ self ._callbacks .pop (topic , None )
320+
321+ if self .is_ready ():
322+ return self ._rpc_channel .other .unsubscribe (topics = topics )
323+ else :
324+ return none_future
325+
283326 async def publish (
284327 self , topics : TopicList , data = None , sync = True , notifier_id = None
285328 ) -> bool :
0 commit comments