Skip to content

Commit f7f5162

Browse files
Revert "Removes async client"
This reverts commit cb4b65b. Signed-off-by: Elena Kolevska <[email protected]>
1 parent c70b927 commit f7f5162

File tree

8 files changed

+569
-5
lines changed

8 files changed

+569
-5
lines changed

dapr/aio/clients/grpc/client.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424

2525
from warnings import warn
2626

27-
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any
27+
from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable
2828
from typing_extensions import Self
2929

3030
from google.protobuf.message import Message as GrpcMessage
@@ -39,12 +39,14 @@
3939
AioRpcError,
4040
)
4141

42+
from dapr.aio.clients.grpc.subscription import Subscription
4243
from dapr.clients.exceptions import DaprInternalError, DaprGrpcError
4344
from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions
4445
from dapr.clients.grpc._state import StateOptions, StateItem
4546
from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus
4647
from dapr.clients.health import DaprHealth
4748
from dapr.clients.retry import RetryPolicy
49+
from dapr.common.pubsub.subscription import StreamInactiveError
4850
from dapr.conf.helpers import GrpcEndpoint
4951
from dapr.conf import settings
5052
from dapr.proto import api_v1, api_service_v1, common_v1
@@ -94,6 +96,7 @@
9496
UnlockResponse,
9597
GetWorkflowResponse,
9698
StartWorkflowResponse,
99+
TopicEventResponse,
97100
)
98101

99102

@@ -482,6 +485,72 @@ async def publish_event(
482485

483486
return DaprResponse(await call.initial_metadata())
484487

488+
async def subscribe(
489+
self,
490+
pubsub_name: str,
491+
topic: str,
492+
metadata: Optional[dict] = None,
493+
dead_letter_topic: Optional[str] = None,
494+
) -> Subscription:
495+
"""
496+
Subscribe to a topic with a bidirectional stream
497+
498+
Args:
499+
pubsub_name (str): The name of the pubsub component.
500+
topic (str): The name of the topic.
501+
metadata (Optional[dict]): Additional metadata for the subscription.
502+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
503+
504+
Returns:
505+
Subscription: The Subscription object managing the stream.
506+
"""
507+
subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic)
508+
await subscription.start()
509+
return subscription
510+
511+
async def subscribe_with_handler(
512+
self,
513+
pubsub_name: str,
514+
topic: str,
515+
handler_fn: Callable[..., TopicEventResponse],
516+
metadata: Optional[dict] = None,
517+
dead_letter_topic: Optional[str] = None,
518+
) -> Callable[[], Awaitable[None]]:
519+
"""
520+
Subscribe to a topic with a bidirectional stream and a message handler function
521+
522+
Args:
523+
pubsub_name (str): The name of the pubsub component.
524+
topic (str): The name of the topic.
525+
handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received.
526+
metadata (Optional[dict]): Additional metadata for the subscription.
527+
dead_letter_topic (Optional[str]): Name of the dead-letter topic.
528+
529+
Returns:
530+
Callable[[], Awaitable[None]]: An async function to close the subscription.
531+
"""
532+
subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic)
533+
534+
async def stream_messages(sub: Subscription):
535+
while True:
536+
try:
537+
message = await sub.next_message()
538+
if message:
539+
response = await handler_fn(message)
540+
if response:
541+
await subscription.respond(message, response.status)
542+
else:
543+
continue
544+
except StreamInactiveError:
545+
break
546+
547+
async def close_subscription():
548+
await subscription.close()
549+
550+
asyncio.create_task(stream_messages(subscription))
551+
552+
return close_subscription
553+
485554
async def get_state(
486555
self,
487556
store_name: str,
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
import asyncio
2+
from grpc import StatusCode
3+
from grpc.aio import AioRpcError
4+
5+
from dapr.clients.grpc._response import TopicEventResponse
6+
from dapr.clients.health import DaprHealth
7+
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage
8+
from dapr.proto import api_v1, appcallback_v1
9+
10+
11+
class Subscription:
12+
def __init__(self, stub, pubsub_name, topic, metadata=None, dead_letter_topic=None):
13+
self._stub = stub
14+
self._pubsub_name = pubsub_name
15+
self._topic = topic
16+
self._metadata = metadata or {}
17+
self._dead_letter_topic = dead_letter_topic or ''
18+
self._stream = None
19+
self._send_queue = asyncio.Queue()
20+
self._stream_active = asyncio.Event()
21+
22+
async def start(self):
23+
async def outgoing_request_iterator():
24+
try:
25+
initial_request = api_v1.SubscribeTopicEventsRequestAlpha1(
26+
initial_request=api_v1.SubscribeTopicEventsRequestInitialAlpha1(
27+
pubsub_name=self._pubsub_name,
28+
topic=self._topic,
29+
metadata=self._metadata,
30+
dead_letter_topic=self._dead_letter_topic,
31+
)
32+
)
33+
yield initial_request
34+
35+
while self._stream_active.is_set():
36+
try:
37+
response = await asyncio.wait_for(self._send_queue.get(), timeout=1.0)
38+
yield response
39+
except asyncio.TimeoutError:
40+
continue
41+
except Exception as e:
42+
raise Exception(f'Error while writing to stream: {e}')
43+
44+
self._stream = self._stub.SubscribeTopicEventsAlpha1(outgoing_request_iterator())
45+
self._stream_active.set()
46+
await self._stream.read() # discard the initial message
47+
48+
async def reconnect_stream(self):
49+
await self.close()
50+
DaprHealth.wait_until_ready()
51+
print('Attempting to reconnect...')
52+
await self.start()
53+
54+
async def next_message(self):
55+
if not self._stream_active.is_set():
56+
raise StreamInactiveError('Stream is not active')
57+
58+
try:
59+
if self._stream is not None:
60+
message = await self._stream.read()
61+
if message is None:
62+
return None
63+
return SubscriptionMessage(message.event_message)
64+
except AioRpcError as e:
65+
if e.code() == StatusCode.UNAVAILABLE:
66+
print(
67+
f'gRPC error while reading from stream: {e.details()}, '
68+
f'Status Code: {e.code()}. '
69+
f'Attempting to reconnect...'
70+
)
71+
await self.reconnect_stream()
72+
elif e.code() != StatusCode.CANCELLED:
73+
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
74+
except Exception as e:
75+
raise Exception(f'Error while fetching message: {e}')
76+
77+
return None
78+
79+
async def respond(self, message, status):
80+
try:
81+
status = appcallback_v1.TopicEventResponse(status=status.value)
82+
response = api_v1.SubscribeTopicEventsRequestProcessedAlpha1(
83+
id=message.id(), status=status
84+
)
85+
msg = api_v1.SubscribeTopicEventsRequestAlpha1(event_processed=response)
86+
if not self._stream_active.is_set():
87+
raise StreamInactiveError('Stream is not active')
88+
await self._send_queue.put(msg)
89+
except Exception as e:
90+
print(f"Can't send message: {e}")
91+
92+
async def respond_success(self, message):
93+
await self.respond(message, TopicEventResponse('success').status)
94+
95+
async def respond_retry(self, message):
96+
await self.respond(message, TopicEventResponse('retry').status)
97+
98+
async def respond_drop(self, message):
99+
await self.respond(message, TopicEventResponse('drop').status)
100+
101+
async def close(self):
102+
if self._stream:
103+
try:
104+
self._stream.cancel()
105+
self._stream_active.clear()
106+
except AioRpcError as e:
107+
if e.code() != StatusCode.CANCELLED:
108+
raise Exception(f'Error while closing stream: {e}')
109+
except Exception as e:
110+
raise Exception(f'Error while closing stream: {e}')

daprdocs/content/en/python-sdk-docs/python-client.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -261,10 +261,10 @@ You can create a streaming subscription to a PubSub topic using either the `subs
261261
or `subscribe_handler` methods.
262262

263263
The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
264-
stream by calling the `next_message` method. This will block on the main thread while waiting for
265-
messages.
266-
When done, you should call the close method to terminate the subscription and stop receiving
267-
messages.
264+
stream by
265+
calling the `next_message` method. This will block on the main thread while waiting for messages.
266+
When done, you should call the close method to terminate the
267+
subscription and stop receiving messages.
268268

269269
The `subscribe_with_handler` method accepts a callback function that is executed for each message
270270
received from the stream.
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
# Example - Publish and subscribe to messages
2+
3+
This example utilizes a publisher and a subscriber to show the bidirectional pubsub pattern.
4+
It creates a publisher and calls the `publish_event` method in the `DaprClient`.
5+
In the s`subscriber.py` file it creates a subscriber object that can call the `next_message` method to get new messages from the stream. After processing the new message, it returns a status to the stream.
6+
7+
8+
> **Note:** Make sure to use the latest proto bindings
9+
10+
## Pre-requisites
11+
12+
- [Dapr CLI and initialized environment](https://docs.dapr.io/getting-started)
13+
- [Install Python 3.8+](https://www.python.org/downloads/)
14+
15+
## Install Dapr python-SDK
16+
17+
<!-- Our CI/CD pipeline automatically installs the correct version, so we can skip this step in the automation -->
18+
19+
```bash
20+
pip3 install dapr
21+
```
22+
23+
## Run async example where users control reading messages off the stream
24+
25+
Run the following command in a terminal/command prompt:
26+
27+
<!-- STEP
28+
name: Run subscriber
29+
expected_stdout_lines:
30+
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
31+
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
32+
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
33+
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
34+
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
35+
- "== APP == Closing subscription..."
36+
output_match_mode: substring
37+
background: true
38+
match_order: none
39+
sleep: 3
40+
-->
41+
42+
```bash
43+
# 1. Start Subscriber
44+
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber.py
45+
```
46+
47+
<!-- END_STEP -->
48+
49+
In another terminal/command prompt run:
50+
51+
<!-- STEP
52+
name: Run publisher
53+
expected_stdout_lines:
54+
- "== APP == {'id': 1, 'message': 'hello world'}"
55+
- "== APP == {'id': 2, 'message': 'hello world'}"
56+
- "== APP == {'id': 3, 'message': 'hello world'}"
57+
- "== APP == {'id': 4, 'message': 'hello world'}"
58+
- "== APP == {'id': 5, 'message': 'hello world'}"
59+
background: true
60+
output_match_mode: substring
61+
sleep: 15
62+
-->
63+
64+
```bash
65+
# 2. Start Publisher
66+
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
67+
```
68+
69+
<!-- END_STEP -->
70+
71+
## Run async example with a handler function
72+
73+
Run the following command in a terminal/command prompt:
74+
75+
<!-- STEP
76+
name: Run subscriber
77+
expected_stdout_lines:
78+
- "== APP == Processing message: {'id': 1, 'message': 'hello world'} from TOPIC_A..."
79+
- "== APP == Processing message: {'id': 2, 'message': 'hello world'} from TOPIC_A..."
80+
- "== APP == Processing message: {'id': 3, 'message': 'hello world'} from TOPIC_A..."
81+
- "== APP == Processing message: {'id': 4, 'message': 'hello world'} from TOPIC_A..."
82+
- "== APP == Processing message: {'id': 5, 'message': 'hello world'} from TOPIC_A..."
83+
- "== APP == Closing subscription..."
84+
output_match_mode: substring
85+
background: true
86+
match_order: none
87+
sleep: 3
88+
-->
89+
90+
```bash
91+
# 1. Start Subscriber
92+
dapr run --app-id python-subscriber --app-protocol grpc python3 subscriber-handler.py
93+
```
94+
95+
<!-- END_STEP -->
96+
97+
In another terminal/command prompt run:
98+
99+
<!-- STEP
100+
name: Run publisher
101+
expected_stdout_lines:
102+
- "== APP == {'id': 1, 'message': 'hello world'}"
103+
- "== APP == {'id': 2, 'message': 'hello world'}"
104+
- "== APP == {'id': 3, 'message': 'hello world'}"
105+
- "== APP == {'id': 4, 'message': 'hello world'}"
106+
- "== APP == {'id': 5, 'message': 'hello world'}"
107+
background: true
108+
output_match_mode: substring
109+
sleep: 15
110+
-->
111+
112+
```bash
113+
# 2. Start Publisher
114+
dapr run --app-id python-publisher --app-protocol grpc --dapr-grpc-port=3500 --enable-app-health-check python3 publisher.py
115+
```
116+
117+
<!-- END_STEP -->
118+
119+
120+
## Cleanup
121+
122+
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# ------------------------------------------------------------
2+
# Copyright 2022 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
# ------------------------------------------------------------
13+
import asyncio
14+
import json
15+
16+
from dapr.aio.clients import DaprClient
17+
18+
19+
async def publish_events():
20+
"""
21+
Publishes events to a pubsub topic asynchronously
22+
"""
23+
24+
async with DaprClient() as d:
25+
id = 0
26+
while id < 5:
27+
id += 1
28+
req_data = {'id': id, 'message': 'hello world'}
29+
30+
# Create a typed message with content type and body
31+
await d.publish_event(
32+
pubsub_name='pubsub',
33+
topic_name='TOPIC_A',
34+
data=json.dumps(req_data),
35+
data_content_type='application/json',
36+
publish_metadata={'ttlInSeconds': '100', 'rawPayload': 'false'},
37+
)
38+
39+
# Print the request
40+
print(req_data, flush=True)
41+
42+
await asyncio.sleep(1)
43+
44+
45+
asyncio.run(publish_events())

0 commit comments

Comments
 (0)