|
24 | 24 |
|
25 | 25 | from warnings import warn |
26 | 26 |
|
27 | | -from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any, Awaitable |
| 27 | +from typing import Callable, Dict, Optional, Text, Union, Sequence, List, Any |
28 | 28 | from typing_extensions import Self |
29 | 29 |
|
30 | 30 | from google.protobuf.message import Message as GrpcMessage |
|
39 | 39 | AioRpcError, |
40 | 40 | ) |
41 | 41 |
|
42 | | -from dapr.aio.clients.grpc.subscription import Subscription |
43 | 42 | from dapr.clients.exceptions import DaprInternalError, DaprGrpcError |
44 | 43 | from dapr.clients.grpc._crypto import EncryptOptions, DecryptOptions |
45 | 44 | from dapr.clients.grpc._state import StateOptions, StateItem |
46 | 45 | from dapr.clients.grpc._helpers import getWorkflowRuntimeStatus |
47 | 46 | from dapr.clients.health import DaprHealth |
48 | 47 | from dapr.clients.retry import RetryPolicy |
49 | | -from dapr.common.pubsub.subscription import StreamInactiveError |
50 | 48 | from dapr.conf.helpers import GrpcEndpoint |
51 | 49 | from dapr.conf import settings |
52 | 50 | from dapr.proto import api_v1, api_service_v1, common_v1 |
|
96 | 94 | UnlockResponse, |
97 | 95 | GetWorkflowResponse, |
98 | 96 | StartWorkflowResponse, |
99 | | - TopicEventResponse, |
100 | 97 | ) |
101 | 98 |
|
102 | 99 |
|
@@ -485,72 +482,6 @@ async def publish_event( |
485 | 482 |
|
486 | 483 | return DaprResponse(await call.initial_metadata()) |
487 | 484 |
|
488 | | - async def subscribe( |
489 | | - self, |
490 | | - pubsub_name: str, |
491 | | - topic: str, |
492 | | - metadata: Optional[dict] = None, |
493 | | - dead_letter_topic: Optional[str] = None, |
494 | | - ) -> Subscription: |
495 | | - """ |
496 | | - Subscribe to a topic with a bidirectional stream |
497 | | -
|
498 | | - Args: |
499 | | - pubsub_name (str): The name of the pubsub component. |
500 | | - topic (str): The name of the topic. |
501 | | - metadata (Optional[dict]): Additional metadata for the subscription. |
502 | | - dead_letter_topic (Optional[str]): Name of the dead-letter topic. |
503 | | -
|
504 | | - Returns: |
505 | | - Subscription: The Subscription object managing the stream. |
506 | | - """ |
507 | | - subscription = Subscription(self._stub, pubsub_name, topic, metadata, dead_letter_topic) |
508 | | - await subscription.start() |
509 | | - return subscription |
510 | | - |
511 | | - async def subscribe_with_handler( |
512 | | - self, |
513 | | - pubsub_name: str, |
514 | | - topic: str, |
515 | | - handler_fn: Callable[..., TopicEventResponse], |
516 | | - metadata: Optional[dict] = None, |
517 | | - dead_letter_topic: Optional[str] = None, |
518 | | - ) -> Callable[[], Awaitable[None]]: |
519 | | - """ |
520 | | - Subscribe to a topic with a bidirectional stream and a message handler function |
521 | | -
|
522 | | - Args: |
523 | | - pubsub_name (str): The name of the pubsub component. |
524 | | - topic (str): The name of the topic. |
525 | | - handler_fn (Callable[..., TopicEventResponse]): The function to call when a message is received. |
526 | | - metadata (Optional[dict]): Additional metadata for the subscription. |
527 | | - dead_letter_topic (Optional[str]): Name of the dead-letter topic. |
528 | | -
|
529 | | - Returns: |
530 | | - Callable[[], Awaitable[None]]: An async function to close the subscription. |
531 | | - """ |
532 | | - subscription = await self.subscribe(pubsub_name, topic, metadata, dead_letter_topic) |
533 | | - |
534 | | - async def stream_messages(sub: Subscription): |
535 | | - while True: |
536 | | - try: |
537 | | - message = await sub.next_message() |
538 | | - if message: |
539 | | - response = await handler_fn(message) |
540 | | - if response: |
541 | | - await subscription.respond(message, response.status) |
542 | | - else: |
543 | | - continue |
544 | | - except StreamInactiveError: |
545 | | - break |
546 | | - |
547 | | - async def close_subscription(): |
548 | | - await subscription.close() |
549 | | - |
550 | | - asyncio.create_task(stream_messages(subscription)) |
551 | | - |
552 | | - return close_subscription |
553 | | - |
554 | 485 | async def get_state( |
555 | 486 | self, |
556 | 487 | store_name: str, |
|
0 commit comments