Skip to content

Commit 6c85263

Browse files
linter and updates docs
Signed-off-by: Elena Kolevska <[email protected]>
1 parent c22ec5f commit 6c85263

File tree

5 files changed

+33
-25
lines changed

5 files changed

+33
-25
lines changed

dapr/aio/clients/grpc/subscription.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,4 +120,4 @@ def __aiter__(self):
120120
return self
121121

122122
async def __anext__(self):
123-
return await self.next_message()
123+
return await self.next_message()

dapr/clients/grpc/subscription.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,4 +149,3 @@ def __iter__(self):
149149

150150
def __next__(self):
151151
return self.next_message()
152-

daprdocs/content/en/python-sdk-docs/python-client.md

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -260,8 +260,8 @@ def mytopic_important(event: v1.Event) -> None:
260260
You can create a streaming subscription to a PubSub topic using either the `subscribe`
261261
or `subscribe_handler` methods.
262262

263-
The `subscribe` method returns a `Subscription` object, which allows you to pull messages from the
264-
stream by
263+
The `subscribe` method returns an iterable `Subscription` object, which allows you to pull messages from the
264+
stream by using a `for` loop (ex. `for message in subscription`) or by
265265
calling the `next_message` method. This will block on the main thread while waiting for messages.
266266
When done, you should call the close method to terminate the
267267
subscription and stop receiving messages.
@@ -281,7 +281,7 @@ Here's an example of using the `subscribe` method:
281281
import time
282282

283283
from dapr.clients import DaprClient
284-
from dapr.clients.grpc.subscription import StreamInactiveError
284+
from dapr.clients.grpc.subscription import StreamInactiveError, StreamCancelledError
285285

286286
counter = 0
287287

@@ -303,30 +303,35 @@ def main():
303303
)
304304

305305
try:
306-
while counter < 5:
307-
try:
308-
message = subscription.next_message()
306+
for message in subscription:
307+
if message is None:
308+
print('No message received. The stream might have been cancelled.')
309+
continue
309310

310-
except StreamInactiveError as e:
311+
try:
312+
response_status = process_message(message)
313+
314+
if response_status == 'success':
315+
subscription.respond_success(message)
316+
elif response_status == 'retry':
317+
subscription.respond_retry(message)
318+
elif response_status == 'drop':
319+
subscription.respond_drop(message)
320+
321+
if counter >= 5:
322+
break
323+
except StreamInactiveError:
311324
print('Stream is inactive. Retrying...')
312325
time.sleep(1)
313326
continue
314-
if message is None:
315-
print('No message received within timeout period.')
316-
continue
317-
318-
# Process the message
319-
response_status = process_message(message)
320-
321-
if response_status == 'success':
322-
subscription.respond_success(message)
323-
elif response_status == 'retry':
324-
subscription.respond_retry(message)
325-
elif response_status == 'drop':
326-
subscription.respond_drop(message)
327+
except StreamCancelledError:
328+
print('Stream was cancelled')
329+
break
330+
except Exception as e:
331+
print(f'Error occurred during message processing: {e}')
327332

328333
finally:
329-
print("Closing subscription...")
334+
print('Closing subscription...')
330335
subscription.close()
331336

332337

examples/pubsub-streaming-async/subscriber.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,10 @@ async def main():
3333
try:
3434
async for message in subscription:
3535
if message is None:
36-
print('No message received within timeout period. '
37-
'The stream might have been cancelled.')
36+
print(
37+
'No message received within timeout period. '
38+
'The stream might have been cancelled.'
39+
)
3840
continue
3941

4042
try:

examples/pubsub-streaming/subscriber.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ def process_message(message):
2222
print(f'Processing message: {message.data()} from {message.topic()}...', flush=True)
2323
return 'success'
2424

25+
2526
def main():
2627
with DaprClient() as client:
2728
global counter
@@ -66,5 +67,6 @@ def main():
6667
print('Closing subscription...')
6768
subscription.close()
6869

70+
6971
if __name__ == '__main__':
7072
main()

0 commit comments

Comments
 (0)