Skip to content

Commit c237e7b

Browse files
Adds stream cancelled error
Signed-off-by: Elena Kolevska <[email protected]>
1 parent ffde935 commit c237e7b

File tree

5 files changed

+30
-11
lines changed

5 files changed

+30
-11
lines changed

dapr/aio/clients/grpc/subscription.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,8 @@
44

55
from dapr.clients.grpc._response import TopicEventResponse
66
from dapr.clients.health import DaprHealth
7-
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage
7+
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage, \
8+
StreamCancelledError
89
from dapr.proto import api_v1, appcallback_v1
910

1011

@@ -69,7 +70,9 @@ async def next_message(self):
6970
f'Attempting to reconnect...'
7071
)
7172
await self.reconnect_stream()
72-
elif e.code() != StatusCode.CANCELLED:
73+
elif e.code() == StatusCode.CANCELLED:
74+
raise StreamCancelledError('Stream has been cancelled')
75+
else:
7376
raise Exception(f'gRPC error while reading from subscription stream: {e} ')
7477
except Exception as e:
7578
raise Exception(f'Error while fetching message: {e}')

dapr/clients/grpc/subscription.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,8 @@
22

33
from dapr.clients.grpc._response import TopicEventResponse
44
from dapr.clients.health import DaprHealth
5-
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage
5+
from dapr.common.pubsub.subscription import StreamInactiveError, SubscriptionMessage, \
6+
StreamCancelledError
67
from dapr.proto import api_v1, appcallback_v1
78
import queue
89
import threading
@@ -85,7 +86,9 @@ def next_message(self):
8586
f'gRPC error while reading from stream: {e.details()}, Status Code: {e.code()}'
8687
)
8788
self.reconnect_stream()
88-
elif e.code() != StatusCode.CANCELLED:
89+
elif e.code() == StatusCode.CANCELLED:
90+
raise StreamCancelledError('Stream has been cancelled')
91+
else:
8992
raise Exception(
9093
f'gRPC error while reading from subscription stream: {e.details()} '
9194
f'Status Code: {e.code()}'

dapr/common/pubsub/subscription.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,3 +86,6 @@ def _parse_data_content(self):
8686

8787
class StreamInactiveError(Exception):
8888
pass
89+
90+
class StreamCancelledError(Exception):
91+
pass

examples/pubsub-streaming-async/subscriber.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from dapr.aio.clients import DaprClient
55
from dapr.clients.grpc.subscription import StreamInactiveError
6+
from dapr.common.pubsub.subscription import StreamCancelledError
67

78
parser = argparse.ArgumentParser(description='Publish events to a Dapr pub/sub topic.')
89
parser.add_argument('--topic', type=str, required=True, help='The topic name to publish to.')
@@ -33,15 +34,18 @@ async def main():
3334
while counter < 5:
3435
try:
3536
message = await subscription.next_message()
37+
if message is None:
38+
print('No message received within timeout period. '
39+
'The stream might have been cancelled.')
40+
continue
3641

3742
except StreamInactiveError:
3843
print('Stream is inactive. Retrying...')
3944
await asyncio.sleep(1)
4045
continue
41-
if message is None:
42-
print('No message received within timeout period.')
43-
continue
44-
46+
except StreamCancelledError as e:
47+
print('Stream was cancelled')
48+
break
4549
# Process the message
4650
response_status = process_message(message)
4751

examples/pubsub-streaming/subscriber.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
from dapr.clients import DaprClient
55
from dapr.clients.grpc.subscription import StreamInactiveError
6+
from dapr.common.pubsub.subscription import StreamCancelledError
67

78
counter = 0
89

@@ -38,17 +39,22 @@ def main():
3839
while counter < 5:
3940
try:
4041
message = subscription.next_message()
42+
if message is None:
43+
print('No message received within timeout period. '
44+
'The stream might have been cancelled.')
45+
continue
4146

4247
except StreamInactiveError as e:
4348
print('Stream is inactive. Retrying...')
4449
time.sleep(1)
4550
continue
51+
except StreamCancelledError as e:
52+
print('Stream was cancelled')
53+
break
4654
except Exception as e:
4755
print(f'Error occurred: {e}')
4856
pass
49-
if message is None:
50-
print('No message received within timeout period.')
51-
continue
57+
5258

5359
# Process the message
5460
response_status = process_message(message)

0 commit comments

Comments
 (0)