Skip to content

Commit 2add93f

Browse files
works
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 3bb7e60 commit 2add93f

File tree

5 files changed

+150
-73
lines changed

5 files changed

+150
-73
lines changed

dapr/clients/exceptions.py

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -27,22 +27,15 @@
2727
class DaprInternalError(Exception):
2828
"""DaprInternalError encapsulates all Dapr exceptions"""
2929

30-
def __init__(
31-
self,
32-
message: Optional[str],
33-
error_code: Optional[str] = ERROR_CODE_UNKNOWN,
34-
raw_response_bytes: Optional[bytes] = None,
35-
):
30+
def __init__(self, message: Optional[str], error_code: Optional[str] = ERROR_CODE_UNKNOWN,
31+
raw_response_bytes: Optional[bytes] = None, ):
3632
self._message = message
3733
self._error_code = error_code
3834
self._raw_response_bytes = raw_response_bytes
3935

4036
def as_dict(self):
41-
return {
42-
'message': self._message,
43-
'errorCode': self._error_code,
44-
'raw_response_bytes': self._raw_response_bytes,
45-
}
37+
return {'message': self._message, 'errorCode': self._error_code,
38+
'raw_response_bytes': self._raw_response_bytes, }
4639

4740

4841
class StatusDetails:
@@ -119,16 +112,16 @@ def get_grpc_status(self):
119112
return self._grpc_status
120113

121114
def json(self):
122-
error_details = {
123-
'status_code': self.code().name,
124-
'message': self.details(),
125-
'error_code': self.error_code(),
126-
'details': self._details.as_dict(),
127-
}
115+
error_details = {'status_code': self.code().name, 'message': self.details(),
116+
'error_code': self.error_code(), 'details': self._details.as_dict(), }
128117
return json.dumps(error_details)
129118

130119

131120
def serialize_status_detail(status_detail):
132121
if not status_detail:
133122
return None
134123
return MessageToDict(status_detail, preserving_proto_field_name=True)
124+
125+
126+
class StreamInactiveError(Exception):
127+
pass

dapr/clients/grpc/subscription.py

Lines changed: 83 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
import grpc
2+
3+
from dapr.clients.exceptions import StreamInactiveError
4+
from dapr.clients.grpc._response import TopicEventResponse
25
from dapr.proto import api_v1, appcallback_v1
36
import queue
47
import threading
@@ -24,9 +27,11 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
2427
self.metadata = metadata or {}
2528
self.dead_letter_topic = dead_letter_topic or ''
2629
self._stream = None
30+
self._response_thread = None
2731
self._send_queue = queue.Queue()
2832
self._receive_queue = queue.Queue()
2933
self._stream_active = False
34+
self._stream_lock = threading.Lock() # Protects _stream_active
3035

3136
def start(self):
3237
def request_iterator():
@@ -38,65 +43,112 @@ def request_iterator():
3843
dead_letter_topic=self.dead_letter_topic or ''))
3944
yield initial_request
4045

41-
while self._stream_active:
46+
while self._is_stream_active():
4247
try:
43-
request = self._send_queue.get()
44-
if request is None:
45-
break
46-
47-
yield request
48+
yield self._send_queue.get() # TODO Should I add a timeout?
4849
except queue.Empty:
4950
continue
5051
except Exception as e:
51-
print(f"Exception in request_iterator: {e}")
52-
raise e
52+
raise Exception(f"Error in request iterator: {e}")
5353

5454
# Create the bidirectional stream
5555
self._stream = self._stub.SubscribeTopicEventsAlpha1(request_iterator())
56-
self._stream_active = True
56+
self._set_stream_active()
5757

5858
# Start a thread to handle incoming messages
59-
threading.Thread(target=self._handle_responses, daemon=True).start()
59+
self._response_thread = threading.Thread(target=self._handle_responses, daemon=True)
60+
self._response_thread.start()
6061

6162
def _handle_responses(self):
6263
try:
6364
# The first message dapr sends on the stream is for signalling only, so discard it
6465
next(self._stream)
6566

66-
for msg in self._stream:
67-
print(f"Received message from dapr on stream: {msg.event_message.id}") # SubscribeTopicEventsResponseAlpha1
68-
self._receive_queue.put(msg.event_message)
67+
# Read messages from the stream and put them in the receive queue
68+
for message in self._stream:
69+
if self._is_stream_active():
70+
self._receive_queue.put(message.event_message)
71+
else:
72+
break
6973
except grpc.RpcError as e:
70-
print(f"gRPC error in stream: {e}")
74+
if e.code() != grpc.StatusCode.CANCELLED:
75+
print(f"gRPC error in stream: {e.details()}, Status Code: {e.code()}")
7176
except Exception as e:
72-
print(f"Unexpected error in stream: {e}")
77+
raise Exception(f"Error while handling responses: {e}")
7378
finally:
74-
self._stream_active = False
79+
self._set_stream_inactive()
7580

76-
def next_message(self, timeout=None):
77-
print("in next_message")
78-
try:
79-
return self._receive_queue.get(timeout=timeout)
80-
except queue.Empty as e :
81-
print("queue empty", e)
82-
return None
83-
except Exception as e:
84-
print(f"Exception in next_message: {e}")
85-
return None
81+
def next_message(self, timeout=1):
82+
"""
83+
Gets the next message from the receive queue
84+
@param timeout: Timeout in seconds
85+
@return: The next message
86+
"""
87+
return self.read_message_from_queue(self._receive_queue, timeout=timeout)
8688

87-
def respond(self, message, status):
89+
def _respond(self, message, status):
8890
try:
8991
status = appcallback_v1.TopicEventResponse(status=status.value)
9092
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(id=message.id,
9193
status=status)
9294
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
9395

94-
self._send_queue.put(msg)
96+
self.send_message_to_queue(self._send_queue, msg)
9597
except Exception as e:
9698
print(f"Exception in send_message: {e}")
9799

100+
def respond_success(self, message):
101+
self._respond(message, TopicEventResponse('success').status)
102+
103+
def respond_retry(self, message):
104+
self._respond(message, TopicEventResponse('retry').status)
105+
106+
def respond_drop(self, message):
107+
self._respond(message, TopicEventResponse('drop').status)
108+
109+
def send_message_to_queue(self, q, message):
110+
if not self._is_stream_active():
111+
raise StreamInactiveError("Stream is not active")
112+
q.put(message)
113+
114+
def read_message_from_queue(self, q, timeout):
115+
if not self._is_stream_active():
116+
raise StreamInactiveError("Stream is not active")
117+
try:
118+
return q.get(timeout=timeout)
119+
except queue.Empty:
120+
return None
121+
122+
def _set_stream_active(self):
123+
with self._stream_lock:
124+
self._stream_active = True
125+
126+
def _set_stream_inactive(self):
127+
with self._stream_lock:
128+
self._stream_active = False
129+
130+
def _is_stream_active(self):
131+
with self._stream_lock:
132+
return self._stream_active
133+
98134
def close(self):
99-
self._stream_active = False
100-
self._send_queue.put(None)
135+
if not self._is_stream_active():
136+
return
137+
138+
self._set_stream_inactive()
139+
140+
# Cancel the stream
101141
if self._stream:
102-
self._stream.cancel()
142+
try:
143+
self._stream.cancel()
144+
except grpc.RpcError as e:
145+
if e.code() != grpc.StatusCode.CANCELLED:
146+
raise Exception(f"Error while closing stream: {e}")
147+
except Exception as e:
148+
raise Exception(f"Error while closing stream: {e}")
149+
150+
# Join the response-handling thread to ensure it has finished
151+
if self._response_thread:
152+
self._response_thread.join()
153+
self._response_thread = None
154+

examples/pubsub_streaming/subscriber.py

Lines changed: 15 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@
55

66
def process_message(message):
77
# Process the message here
8-
print(f"Processing message: {message.data}")
9-
return TopicEventResponse('success').status
8+
print(f"Processing message: {message}")
9+
return "success"
1010

1111

1212
def main():
@@ -15,26 +15,25 @@ def main():
1515
subscription = client.subscribe(pubsub_name="pubsub", topic="TOPIC_A", dead_letter_topic="TOPIC_A_DEAD")
1616

1717
try:
18-
while True:
18+
for i in range(5):
1919
try:
20-
try:
21-
message = subscription.next_message(timeout=5)
22-
except Exception as e:
23-
print(f"An error occurred: {e}")
24-
20+
message = subscription.next_message(timeout=0.1)
2521
if message is None:
2622
print("No message received within timeout period.")
2723
continue
2824

29-
print(f"Received message with ID: {message.id}")
30-
3125
# Process the message
32-
try:
33-
subscription.respond(message, process_message(message))
34-
except Exception as e:
35-
print(f"An error occurred while sending the message: {e}")
36-
except KeyboardInterrupt:
37-
print("Received interrupt, shutting down...")
26+
response_status = process_message(message)
27+
28+
if response_status == "success":
29+
subscription.respond_success(message)
30+
elif response_status == "retry":
31+
subscription.respond_retry(message)
32+
elif response_status == "drop":
33+
subscription.respond_drop(message)
34+
35+
except Exception as e:
36+
print(f"Error getting message: {e}")
3837
break
3938

4039
finally:

tests/clients/fake_dapr_server.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from grpc_status import rpc_status
88

99
from dapr.clients.grpc._helpers import to_bytes
10-
from dapr.proto import api_service_v1, common_v1, api_v1
10+
from dapr.proto import api_service_v1, common_v1, api_v1, appcallback_v1
1111
from dapr.proto.common.v1.common_pb2 import ConfigurationItem
1212
from dapr.clients.grpc._response import WorkflowRuntimeStatus
1313
from dapr.proto.runtime.v1.dapr_pb2 import (
@@ -177,6 +177,14 @@ def PublishEvent(self, request, context):
177177
context.set_trailing_metadata(trailers)
178178
return empty_pb2.Empty()
179179

180+
def SubscribeTopicEventsAlpha1(self, request_iterator, context):
181+
yield api_v1.SubscribeTopicEventsResponseAlpha1(
182+
initial_response=api_v1.SubscribeTopicEventsResponseInitialAlpha1())
183+
yield api_v1.SubscribeTopicEventsResponseAlpha1(
184+
event_message=appcallback_v1.TopicEventRequest(id='123', topic="TOPIC_A", data=b'hello1'))
185+
yield api_v1.SubscribeTopicEventsResponseAlpha1(
186+
event_message=appcallback_v1.TopicEventRequest(id='456', topic="TOPIC_A", data=b'hello2'))
187+
180188
def SaveState(self, request, context):
181189
self.check_for_exception(context)
182190

tests/clients/test_dapr_grpc_client.py

Lines changed: 33 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from google.rpc import status_pb2, code_pb2
2626

27-
from dapr.clients.exceptions import DaprGrpcError
27+
from dapr.clients.exceptions import DaprGrpcError, StreamInactiveError
2828
from dapr.clients.grpc.client import DaprGrpcClient
2929
from dapr.clients import DaprClient
3030
from dapr.proto import common_v1
@@ -34,13 +34,9 @@
3434
from dapr.clients.grpc._request import TransactionalStateOperation
3535
from dapr.clients.grpc._state import StateOptions, Consistency, Concurrency, StateItem
3636
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
37-
from dapr.clients.grpc._response import (
38-
ConfigurationItem,
39-
ConfigurationResponse,
40-
ConfigurationWatcher,
41-
UnlockResponseStatus,
42-
WorkflowRuntimeStatus,
43-
)
37+
from dapr.clients.grpc._response import (ConfigurationItem, ConfigurationResponse,
38+
ConfigurationWatcher, UnlockResponseStatus,
39+
WorkflowRuntimeStatus, TopicEventResponse, )
4440

4541

4642
class DaprGrpcClientTests(unittest.TestCase):
@@ -262,6 +258,35 @@ def test_publish_error(self):
262258
data=111,
263259
)
264260

261+
def test_subscribe_topic(self):
262+
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
263+
subscription = dapr.subscribe(pubsub_name='pubsub', topic='example')
264+
265+
# First message
266+
message1 = subscription.next_message(timeout=5)
267+
subscription.respond_success(message1)
268+
269+
self.assertEqual('123', message1.id)
270+
self.assertEqual(b'hello1', message1.data)
271+
self.assertEqual('TOPIC_A', message1.topic)
272+
273+
# Second message
274+
message2 = subscription.next_message(timeout=5)
275+
subscription.respond_success(message2)
276+
277+
self.assertEqual('456', message2.id)
278+
self.assertEqual(b'hello2', message2.data)
279+
self.assertEqual('TOPIC_A', message2.topic)
280+
281+
def test_subscribe_topic_early_close(self):
282+
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')
283+
subscription = dapr.subscribe(pubsub_name='pubsub', topic='example')
284+
subscription.close()
285+
286+
with self.assertRaises(StreamInactiveError):
287+
subscription.next_message(timeout=5)
288+
289+
265290
@patch.object(settings, 'DAPR_API_TOKEN', 'test-token')
266291
def test_dapr_api_token_insertion(self):
267292
dapr = DaprGrpcClient(f'{self.scheme}localhost:{self.grpc_port}')

0 commit comments

Comments
 (0)