File tree Expand file tree Collapse file tree 1 file changed +5
-7
lines changed
Expand file tree Collapse file tree 1 file changed +5
-7
lines changed Original file line number Diff line number Diff line change @@ -71,17 +71,16 @@ def next_message(self):
7171 @return: The next message from the queue,
7272 or None if no message is received within the timeout.
7373 """
74- if not self ._is_stream_active ():
74+ if not self ._is_stream_active () or self . _stream is None :
7575 raise StreamInactiveError ('Stream is not active' )
7676
77+
7778 try :
7879 # Read the next message from the stream directly
79- if self ._stream is not None :
80- message = next (self ._stream , None )
81- if message is None :
82- return None
83- return SubscriptionMessage (message .event_message )
80+ message = next (self ._stream )
81+ return SubscriptionMessage (message .event_message )
8482 except RpcError as e :
83+ # If Dapr can't be reached, wait until it's ready and reconnect the stream
8584 if e .code () == StatusCode .UNAVAILABLE :
8685 print (
8786 f'gRPC error while reading from stream: { e .details ()} , Status Code: { e .code ()} '
@@ -95,7 +94,6 @@ def next_message(self):
9594 except Exception as e :
9695 raise Exception (f'Error while fetching message: { e } ' )
9796
98- return None
9997
10098 def respond (self , message , status ):
10199 try :
You can’t perform that action at this time.
0 commit comments