11import logging
22from collections .abc import Callable
3+ from typing import cast
34
45from hiero_sdk_python import Client , Timestamp , TopicId , TopicMessageQuery
56from hiero_sdk_python .consensus .topic_message import TopicMessage
@@ -26,10 +27,8 @@ def __init__(
2627
2728 self ._query = (
2829 TopicMessageQuery (topic_id = TopicId .from_string (topic_id ), start_time = Timestamp (0 , 0 ).to_date ())
29- # Not implemented in native SDK
30- # See GH issue: https://github.com/hiero-ledger/hiero-sdk-python/issues/43
31- # .setMaxBackoff(JDuration.ofMillis(2000))
32- # .setMaxAttempts(5)
30+ .set_max_backoff (2.0 )
31+ .set_max_attempts (5 )
3332 )
3433
3534 def set_start_time (self , start_time : Timestamp ):
@@ -45,7 +44,7 @@ def set_limit(self, limit: int):
4544 return self
4645
4746 def set_completion_handler (self , completion_handler : Callable [[], None ]):
48- # Not implemented in native SDK
47+ # Completion handler + subscription handle cancellation does not work properly in native SDK
4948 # See GH issue: https://github.com/hiero-ledger/hiero-sdk-python/issues/43
5049 # self._query.setCompletionHandler(completion_handler)
5150 return self
@@ -67,13 +66,11 @@ def subscribe(
6766 def handle_message (response ):
6867 self ._handle_response (response , receiver )
6968
70- # Subscription handle is not actually implemented in native SDK
71- # See GH issue: https://github.com/hiero-ledger/hiero-sdk-python/issues/43
7269 self ._subscription_handle = self ._query .subscribe (client , handle_message , error_handler )
7370
7471 def unsubscribe (self ):
7572 if self ._subscription_handle :
76- self ._subscription_handle .unsubscribe ()
73+ self ._subscription_handle .cancel ()
7774
7875 def _handle_response (
7976 self , response : TopicMessage , receiver : Callable [[HcsMessage | HcsMessageWithResponseMetadata ], None ]
@@ -97,20 +94,22 @@ def _handle_response(
9794 receiver (
9895 HcsMessageWithResponseMetadata (
9996 message = message ,
100- sequence_number = response .sequence_number ,
101- consensus_timestamp = Timestamp .from_protobuf (response .consensus_timestamp ),
97+ sequence_number = cast ( int , response .sequence_number ) ,
98+ consensus_timestamp = Timestamp .from_date (response .consensus_timestamp ),
10299 )
103100 )
104101 else :
105102 receiver (message )
106103
107104 def _extract_message (self , response : TopicMessage ) -> HcsMessage | None :
108105 try :
109- return self ._message_class .from_json (response .message .decode ())
106+ message_content = cast (bytes , response .contents ).decode ()
107+ return self ._message_class .from_json (message_content )
110108 except Exception as error :
111109 LOGGER .warning (f"Failed to extract HCS message from response: { error !s} " )
112110
113111 def _report_invalid_message (self , response : TopicMessage , reason : str ):
114- LOGGER .warning (f"Got invalid message: { response .message .decode ()} , reason: { reason } " )
112+ message_content = cast (bytes , response .contents ).decode ()
113+ LOGGER .warning (f"Got invalid message: { message_content } , reason: { reason } " )
115114 if self ._invalid_message_handler :
116115 self ._invalid_message_handler (response , reason )
0 commit comments