11import json
22
3- import grpc
3+ from grpc import StreamStreamMultiCallable , RpcError , StatusCode # type: ignore
44
55from dapr .clients .exceptions import StreamInactiveError
66from dapr .clients .grpc ._response import TopicEventResponse
77from dapr .proto import api_v1 , appcallback_v1
88import queue
99import threading
10-
10+ from typing import Optional
1111
1212def success ():
1313 return appcallback_v1 .TopicEventResponse .SUCCESS
@@ -28,11 +28,11 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
2828 self .topic = topic
2929 self .metadata = metadata or {}
3030 self .dead_letter_topic = dead_letter_topic or ''
31- self ._stream = None
32- self ._response_thread = None
33- self ._send_queue = queue .Queue ()
34- self ._receive_queue = queue .Queue ()
35- self ._stream_active = False
31+ self ._stream : Optional [ StreamStreamMultiCallable ] = None # Type annotation for gRPC stream
32+ self ._response_thread : Optional [ threading . Thread ] = None # Type for thread
33+ self ._send_queue : queue . Queue = queue .Queue () # Type annotation for send queue
34+ self ._receive_queue : queue . Queue = queue .Queue () # Type annotation for receive queue
35+ self ._stream_active : bool = False
3636 self ._stream_lock = threading .Lock () # Protects _stream_active
3737
3838 def start (self ):
@@ -55,9 +55,8 @@ def outgoing_request_iterator():
5555 # Start sending back acknowledgement messages from the send queue
5656 while self ._is_stream_active ():
5757 try :
58- response = self ._send_queue .get ()
59- # The above blocks until a message is available or the stream is closed
60- # so that's why we need to check again if the stream is still active
58+ response = self ._send_queue .get (timeout = 1 )
59+ # Check again if the stream is still active
6160 if not self ._is_stream_active ():
6261 break
6362 yield response
@@ -76,17 +75,19 @@ def outgoing_request_iterator():
7675
7776 def _handle_incoming_messages (self ):
7877 try :
79- # The first message dapr sends on the stream is for signalling only, so discard it
80- next (self ._stream )
81-
82- # Read messages from the stream and put them in the receive queue
83- for message in self ._stream :
84- if self ._is_stream_active ():
85- self ._receive_queue .put (message .event_message )
86- else :
87- break
88- except grpc .RpcError as e :
89- if e .code () != grpc .StatusCode .CANCELLED :
78+ # Check if the stream is not None
79+ if self ._stream is not None :
80+ # The first message dapr sends on the stream is for signalling only, so discard it
81+ next (self ._stream )
82+
83+ # Read messages from the stream and put them in the receive queue
84+ for message in self ._stream :
85+ if self ._is_stream_active ():
86+ self ._receive_queue .put (message .event_message )
87+ else :
88+ break
89+ except RpcError as e :
90+ if e .code () != StatusCode .CANCELLED :
9091 print (f'gRPC error in stream: { e .details ()} , Status Code: { e .code ()} ' )
9192 except Exception as e :
9293 raise Exception (f'Error while handling responses: { e } ' )
@@ -157,8 +158,8 @@ def close(self):
157158 if self ._stream :
158159 try :
159160 self ._stream .cancel ()
160- except grpc . RpcError as e :
161- if e .code () != grpc . StatusCode .CANCELLED :
161+ except RpcError as e :
162+ if e .code () != StatusCode .CANCELLED :
162163 raise Exception (f'Error while closing stream: { e } ' )
163164 except Exception as e :
164165 raise Exception (f'Error while closing stream: { e } ' )
0 commit comments