Skip to content

Commit c325a2a

Browse files
Sync bidi streaming and tests
Signed-off-by: Elena Kolevska <[email protected]>
1 parent 2add93f commit c325a2a

File tree

9 files changed

+279
-105
lines changed

9 files changed

+279
-105
lines changed

dapr/clients/exceptions.py

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,15 +27,22 @@
2727
class DaprInternalError(Exception):
2828
"""DaprInternalError encapsulates all Dapr exceptions"""
2929

30-
def __init__(self, message: Optional[str], error_code: Optional[str] = ERROR_CODE_UNKNOWN,
31-
raw_response_bytes: Optional[bytes] = None, ):
30+
def __init__(
31+
self,
32+
message: Optional[str],
33+
error_code: Optional[str] = ERROR_CODE_UNKNOWN,
34+
raw_response_bytes: Optional[bytes] = None,
35+
):
3236
self._message = message
3337
self._error_code = error_code
3438
self._raw_response_bytes = raw_response_bytes
3539

3640
def as_dict(self):
37-
return {'message': self._message, 'errorCode': self._error_code,
38-
'raw_response_bytes': self._raw_response_bytes, }
41+
return {
42+
'message': self._message,
43+
'errorCode': self._error_code,
44+
'raw_response_bytes': self._raw_response_bytes,
45+
}
3946

4047

4148
class StatusDetails:
@@ -112,8 +119,12 @@ def get_grpc_status(self):
112119
return self._grpc_status
113120

114121
def json(self):
115-
error_details = {'status_code': self.code().name, 'message': self.details(),
116-
'error_code': self.error_code(), 'details': self._details.as_dict(), }
122+
error_details = {
123+
'status_code': self.code().name,
124+
'message': self.details(),
125+
'error_code': self.error_code(),
126+
'details': self._details.as_dict(),
127+
}
117128
return json.dumps(error_details)
118129

119130

dapr/clients/grpc/client.py

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@
4646
from dapr.clients.health import DaprHealth
4747
from dapr.clients.retry import RetryPolicy
4848
from dapr.conf import settings
49-
from dapr.proto import api_v1, api_service_v1, common_v1, appcallback_v1
49+
from dapr.proto import api_v1, api_service_v1, common_v1
5050
from dapr.proto.runtime.v1.dapr_pb2 import UnsubscribeConfigurationResponse
5151
from dapr.version import __version__
5252

@@ -482,16 +482,6 @@ def publish_event(
482482

483483
return DaprResponse(call.initial_metadata())
484484

485-
# def subscribe(self, pubsub_name, topic, metadata=None, dead_letter_topic=None):
486-
# stream = self._stub.SubscribeTopicEventsAlpha1()
487-
#
488-
# # Send InitialRequest
489-
# initial_request = api_v1.SubscribeTopicEventsInitialRequestAlpha1(pubsub_name=pubsub_name, topic=topic, metadata=metadata, dead_letter_topic=dead_letter_topic)
490-
# request = api_v1.SubscribeTopicEventsRequestAlpha1(initial_request=initial_request)
491-
# stream.write(request)
492-
#
493-
# return stream
494-
495485
def subscribe(self, pubsub_name, topic, metadata=None, dead_letter_topic=None):
496486
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
497487
subscription.start()

dapr/clients/grpc/subscription.py

Lines changed: 113 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
import json
2+
13
import grpc
24

35
from dapr.clients.exceptions import StreamInactiveError
@@ -34,32 +36,45 @@ def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=No
3436
self._stream_lock = threading.Lock() # Protects _stream_active
3537

3638
def start(self):
37-
def request_iterator():
39+
def outgoing_request_iterator():
40+
"""
41+
Generator function to create the request iterator for the stream
42+
"""
3843
try:
3944
# Send InitialRequest needed to establish the stream
4045
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
4146
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
42-
pubsub_name=self.pubsub_name, topic=self.topic, metadata=self.metadata or {},
43-
dead_letter_topic=self.dead_letter_topic or ''))
47+
pubsub_name=self.pubsub_name,
48+
topic=self.topic,
49+
metadata=self.metadata or {},
50+
dead_letter_topic=self.dead_letter_topic or '',
51+
)
52+
)
4453
yield initial_request
4554

55+
# Start sending back acknowledgement messages from the send queue
4656
while self._is_stream_active():
4757
try:
48-
yield self._send_queue.get() # TODO Should I add a timeout?
58+
response = self._send_queue.get()
59+
# The above blocks until a message is available or the stream is closed
60+
# so that's why we need to check again if the stream is still active
61+
if not self._is_stream_active():
62+
break
63+
yield response
4964
except queue.Empty:
5065
continue
5166
except Exception as e:
52-
raise Exception(f"Error in request iterator: {e}")
67+
raise Exception(f'Error in request iterator: {e}')
5368

5469
# Create the bidirectional stream
55-
self._stream = self._stub.SubscribeTopicEventsAlpha1(request_iterator())
70+
self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
5671
self._set_stream_active()
5772

5873
# Start a thread to handle incoming messages
59-
self._response_thread = threading.Thread(target=self._handle_responses, daemon=True)
74+
self._response_thread = threading.Thread(target=self._handle_incoming_messages, daemon=True)
6075
self._response_thread.start()
6176

62-
def _handle_responses(self):
77+
def _handle_incoming_messages(self):
6378
try:
6479
# The first message dapr sends on the stream is for signalling only, so discard it
6580
next(self._stream)
@@ -72,30 +87,31 @@ def _handle_responses(self):
7287
break
7388
except grpc.RpcError as e:
7489
if e.code() != grpc.StatusCode.CANCELLED:
75-
print(f"gRPC error in stream: {e.details()}, Status Code: {e.code()}")
90+
print(f'gRPC error in stream: {e.details()}, Status Code: {e.code()}')
7691
except Exception as e:
77-
raise Exception(f"Error while handling responses: {e}")
92+
raise Exception(f'Error while handling responses: {e}')
7893
finally:
7994
self._set_stream_inactive()
8095

81-
def next_message(self, timeout=1):
82-
"""
83-
Gets the next message from the receive queue
84-
@param timeout: Timeout in seconds
85-
@return: The next message
86-
"""
87-
return self.read_message_from_queue(self._receive_queue, timeout=timeout)
96+
def next_message(self, timeout=None):
97+
msg = self.read_message_from_queue(self._receive_queue, timeout=timeout)
98+
99+
if msg is None:
100+
return None
101+
102+
return SubscriptionMessage(msg)
88103

89104
def _respond(self, message, status):
90105
try:
91106
status = appcallback_v1.TopicEventResponse(status=status.value)
92-
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(id=message.id,
93-
status=status)
107+
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
108+
id=message.id(), status=status
109+
)
94110
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
95111

96112
self.send_message_to_queue(self._send_queue, msg)
97113
except Exception as e:
98-
print(f"Exception in send_message: {e}")
114+
print(f'Exception in send_message: {e}')
99115

100116
def respond_success(self, message):
101117
self._respond(message, TopicEventResponse('success').status)
@@ -108,12 +124,12 @@ def respond_drop(self, message):
108124

109125
def send_message_to_queue(self, q, message):
110126
if not self._is_stream_active():
111-
raise StreamInactiveError("Stream is not active")
127+
raise StreamInactiveError('Stream is not active')
112128
q.put(message)
113129

114-
def read_message_from_queue(self, q, timeout):
130+
def read_message_from_queue(self, q, timeout=None):
115131
if not self._is_stream_active():
116-
raise StreamInactiveError("Stream is not active")
132+
raise StreamInactiveError('Stream is not active')
117133
try:
118134
return q.get(timeout=timeout)
119135
except queue.Empty:
@@ -143,12 +159,84 @@ def close(self):
143159
self._stream.cancel()
144160
except grpc.RpcError as e:
145161
if e.code() != grpc.StatusCode.CANCELLED:
146-
raise Exception(f"Error while closing stream: {e}")
162+
raise Exception(f'Error while closing stream: {e}')
147163
except Exception as e:
148-
raise Exception(f"Error while closing stream: {e}")
164+
raise Exception(f'Error while closing stream: {e}')
149165

150166
# Join the response-handling thread to ensure it has finished
151167
if self._response_thread:
152168
self._response_thread.join()
153169
self._response_thread = None
154170

171+
172+
class SubscriptionMessage:
173+
def __init__(self, msg):
174+
self._id = msg.id
175+
self._source = msg.source
176+
self._type = msg.type
177+
self._spec_version = msg.spec_version
178+
self._data_content_type = msg.data_content_type
179+
self._topic = msg.topic
180+
self._pubsub_name = msg.pubsub_name
181+
self._raw_data = msg.data
182+
self._extensions = msg.extensions
183+
self._data = None
184+
185+
# Parse the content based on its media type
186+
if self._raw_data and len(self._raw_data) > 0:
187+
self._parse_data_content()
188+
189+
def id(self):
190+
return self._id
191+
192+
def source(self):
193+
return self._source
194+
195+
def type(self):
196+
return self._type
197+
198+
def spec_version(self):
199+
return self._spec_version
200+
201+
def data_content_type(self):
202+
return self._data_content_type
203+
204+
def topic(self):
205+
return self._topic
206+
207+
def pubsub_name(self):
208+
return self._pubsub_name
209+
210+
def raw_data(self):
211+
return self._raw_data
212+
213+
def extensions(self):
214+
return self._extensions
215+
216+
def data(self):
217+
return self._data
218+
219+
def _parse_data_content(self):
220+
try:
221+
if self._data_content_type == 'application/json':
222+
try:
223+
self._data = json.loads(self._raw_data)
224+
except json.JSONDecodeError:
225+
pass # If JSON parsing fails, keep `data` as None
226+
elif self._data_content_type == 'text/plain':
227+
# Assume UTF-8 encoding
228+
try:
229+
self._data = self._raw_data.decode('utf-8')
230+
except UnicodeDecodeError:
231+
pass
232+
elif self._data_content_type.startswith(
233+
'application/'
234+
) and self._data_content_type.endswith('+json'):
235+
# Handle custom JSON-based media types (e.g., application/vnd.api+json)
236+
try:
237+
self._data = json.loads(self._raw_data)
238+
except json.JSONDecodeError:
239+
pass # If JSON parsing fails, keep `data` as None
240+
except Exception as e:
241+
# Log or handle any unexpected exceptions
242+
print(f'Error parsing media type: {e}')
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
# Example - Publish and subscribe to messages
2+
3+
This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern.
4+
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
5+
In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream.
6+
7+
8+
> **Note:** Make sure to use the latest proto bindings
9+
10+
## Pre-requisites
11+
12+
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
13+
- [Install Python 3.8+](https://www.python.org/downloads/)
14+
15+
## Install Dapr python-SDK
16+
17+
<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->
18+
19+
```bash
20+
pip3 install dapr
21+
```
22+
23+
## Run the example
24+
25+
Run the following command in a terminal/command prompt:
26+
27+
<!-- STEP
28+
name: Run subscriber
29+
expected_stdout_lines:
30+
- '== APP == Subscriber received: id=1, message="hello world", content_type="application/json"'
31+
- 'RETRY status returned from app while processing pub/sub event'
32+
- '== APP == Subscriber received: id=2, message="hello world", content_type="application/json"'
33+
- '== APP == Subscriber received: id=3, message="hello world", content_type="application/json"'
34+
- '== APP == Wildcard-Subscriber received: id=4, message="hello world", content_type="application/json"'
35+
- '== APP == Wildcard-Subscriber received: id=5, message="hello world", content_type="application/json"'
36+
- '== APP == Wildcard-Subscriber received: id=6, message="hello world", content_type="application/json"'
37+
- '== APP == Dead-Letter Subscriber received: id=7, message="hello world", content_type="application/json"'
38+
- '== APP == Dead-Letter Subscriber. Received via deadletter topic: TOPIC_D_DEAD'
39+
- '== APP == Dead-Letter Subscriber. Originally intended topic: TOPIC_D'
40+
output_match_mode: substring
41+
background: true
42+
match_order: none
43+
sleep: 3
44+
-->
45+
46+
```bash
47+
# 1. Start Subscriber
48+
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
49+
```
50+
51+
<!-- END_STEP -->
52+
53+
In another terminal/command prompt run:
54+
55+
<!-- STEP
56+
name: Run publisher
57+
expected_stdout_lines:
58+
- "== APP == {'id': 1, 'message': 'hello world'}"
59+
- "== APP == {'id': 2, 'message': 'hello world'}"
60+
- "== APP == {'id': 3, 'message': 'hello world'}"
61+
- "== APP == {'id': 4, 'message': 'hello world'}"
62+
- "== APP == {'id': 5, 'message': 'hello world'}"
63+
background: true
64+
sleep: 15
65+
-->
66+
67+
```bash
68+
# 2. Start Publisher
69+
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
70+
```
71+
72+
<!-- END_STEP -->
73+
74+
## Cleanup
75+
76+

examples/pubsub_streaming/publisher.py renamed to examples/pubsub-streaming/publisher.py

Lines changed: 2 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,9 @@
1818

1919
with DaprClient() as d:
2020
id = 0
21-
while id < 3:
21+
while id < 5:
2222
id += 1
23-
req_data = {'id': time.time(), 'message': 'hello world'}
23+
req_data = {'id': id, 'message': 'hello world'}
2424

2525
# Create a typed message with content type and body
2626
resp = d.publish_event(
@@ -34,35 +34,3 @@
3434
print(req_data, flush=True)
3535

3636
time.sleep(1)
37-
38-
# we can publish events to different topics but handle them with the same method
39-
# by disabling topic validation in the subscriber
40-
#
41-
# id = 3
42-
# while id < 6:
43-
# id += 1
44-
# req_data = {'id': id, 'message': 'hello world'}
45-
# resp = d.publish_event(
46-
# pubsub_name='pubsub',
47-
# topic_name=f'topic/{id}',
48-
# data=json.dumps(req_data),
49-
# data_content_type='application/json',
50-
# )
51-
#
52-
# # Print the request
53-
# print(req_data, flush=True)
54-
#
55-
# time.sleep(0.5)
56-
#
57-
# # This topic will fail - initiate a retry which gets routed to the dead letter topic
58-
# req_data['id'] = 7
59-
# resp = d.publish_event(
60-
# pubsub_name='pubsub',
61-
# topic_name='TOPIC_D',
62-
# data=json.dumps(req_data),
63-
# data_content_type='application/json',
64-
# publish_metadata={'custommeta': 'somevalue'},
65-
# )
66-
#
67-
# # Print the request
68-
# print(req_data, flush=True)

0 commit comments

Comments
 (0)