Skip to content

Commit acb1d0d

Browse files
author
Carlos Bellino
committed
WIP: Broker and Handler template added.
1 parent 4d28c73 commit acb1d0d

File tree

5 files changed

+164
-7
lines changed

5 files changed

+164
-7
lines changed

faststream/sqs/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from faststream.broker.test import TestApp
2-
from faststream.sqs.annotations import SQSBroker, SQSMessage, SQSProducer
2+
from faststream.sqs.annotations import SQSBroker, SQSMessage
33
from faststream.sqs.router import SQSRouter
44
from faststream.sqs.shared.router import SQSRoute
55
from faststream.sqs.shared.schemas import (

faststream/sqs/annotations.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from aiobotocore.client import AioBaseClient
2+
13
from faststream._compat import Annotated
24
from faststream.annotations import ContextRepo, Logger, NoCast
35
from faststream.sqs.broker import SQSBroker as SB # NOQA
@@ -12,8 +14,12 @@
1214
"SQSBroker",
1315
"SQSMessage",
1416
"SQSProducer",
17+
"client",
18+
"queue_url",
1519
)
1620

1721
SQSBroker = Annotated[SB, Context("broker")]
1822
SQSMessage = Annotated[SM, Context("message")]
1923
SQSProducer = Annotated[SQSFastProducer, Context("broker._producer")]
24+
client = Annotated[AioBaseClient, Context("client")]
25+
queue_url = Annotated[str, Context("queue_url")]

faststream/sqs/asyncapi.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1-
class Publisher:
1+
class Handler:
22
# TODO
33
pass
44

55

6-
class Handler:
6+
class Publisher:
77
# TODO
88
pass

faststream/sqs/broker.py

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,95 @@
1-
class SQSBroker:
2-
# TODO
3-
pass
1+
from types import TracebackType
2+
from typing import Any, Awaitable, Callable, Dict, Optional, Sequence, Type, Union
3+
4+
from aiobotocore.client import AioBaseClient
5+
from fast_depends.dependencies import Depends
6+
7+
from faststream import BaseMiddleware
8+
from faststream.broker.core.asyncronous import BrokerAsyncUsecase, default_filter
9+
from faststream.broker.message import StreamMessage
10+
from faststream.broker.publisher import BasePublisher
11+
from faststream.broker.push_back_watcher import BaseWatcher
12+
from faststream.broker.types import (
13+
CustomDecoder,
14+
CustomParser,
15+
Filter,
16+
MsgType,
17+
P_HandlerParams,
18+
T_HandlerReturn,
19+
WrappedReturn,
20+
)
21+
from faststream.broker.wrapper import HandlerCallWrapper
22+
from faststream.sqs.asyncapi import Handler, Publisher
23+
from faststream.sqs.producer import SQSFastProducer
24+
from faststream.sqs.shared.logging import SQSLoggingMixin
25+
from faststream.types import AnyDict, SendableMessage
26+
27+
28+
class SQSBroker(
29+
SQSLoggingMixin,
30+
BrokerAsyncUsecase[AnyDict, AioBaseClient],
31+
):
32+
handlers: Dict[str, Handler] # type: ignore[assignment]
33+
_publishers: Dict[str, Publisher] # type: ignore[assignment]
34+
_producer: Optional[SQSFastProducer]
35+
36+
async def start(self) -> None:
37+
pass
38+
39+
async def _connect(self, **kwargs: Any) -> AioBaseClient:
40+
pass
41+
42+
async def _close(
43+
self,
44+
exc_type: Optional[Type[BaseException]] = None,
45+
exc_val: Optional[BaseException] = None,
46+
exec_tb: Optional[TracebackType] = None,
47+
) -> None:
48+
pass
49+
50+
def _process_message(
51+
self,
52+
func: Callable[[StreamMessage[MsgType]], Awaitable[T_HandlerReturn]],
53+
watcher: BaseWatcher,
54+
) -> Callable[[StreamMessage[MsgType]], Awaitable[WrappedReturn[T_HandlerReturn]],]:
55+
pass
56+
57+
async def publish(
58+
self,
59+
message: SendableMessage,
60+
*args: Any,
61+
reply_to: str = "",
62+
rpc: bool = False,
63+
rpc_timeout: Optional[float] = None,
64+
raise_timeout: bool = False,
65+
**kwargs: Any,
66+
) -> Optional[SendableMessage]:
67+
pass
68+
69+
def subscriber(
70+
self,
71+
*broker_args: Any,
72+
retry: Union[bool, int] = False,
73+
dependencies: Sequence[Depends] = (),
74+
decoder: Optional[CustomDecoder[StreamMessage[MsgType]]] = None,
75+
parser: Optional[CustomParser[MsgType, StreamMessage[MsgType]]] = None,
76+
middlewares: Optional[Sequence[Callable[[MsgType], BaseMiddleware]]] = None,
77+
filter: Filter[StreamMessage[MsgType]] = default_filter,
78+
_raw: bool = False,
79+
_get_dependant: Optional[Any] = None,
80+
**broker_kwargs: Any,
81+
) -> Callable[
82+
[
83+
Union[
84+
Callable[P_HandlerParams, T_HandlerReturn],
85+
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
86+
]
87+
],
88+
HandlerCallWrapper[MsgType, P_HandlerParams, T_HandlerReturn],
89+
]:
90+
pass
91+
92+
def publisher(
93+
self, key: Any, publisher: BasePublisher[MsgType]
94+
) -> BasePublisher[MsgType]:
95+
pass

faststream/sqs/handler.py

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,69 @@
1+
import asyncio
2+
import logging
3+
from typing import Any, NoReturn, Optional
4+
5+
import anyio
16
from aiobotocore.client import AioBaseClient
7+
from typing_extensions import TypeAlias
28

39
from faststream.broker.handler import AsyncHandler
10+
from faststream.sqs.shared.schemas import SQSQueue
411
from faststream.types import AnyDict
12+
from faststream.utils.context import context
13+
14+
QueueUrl: TypeAlias = str
515

616

717
class LogicSQSHandler(AsyncHandler[AnyDict]):
18+
queue: SQSQueue
19+
consumer_params: AnyDict
20+
task: Optional["asyncio.Task[Any]"] = None
21+
22+
async def _consume(self, queue_url: str) -> NoReturn:
23+
c = self._get_log_context(None, self.queue.name)
24+
25+
connected = True
26+
with context.scope("queue_url", queue_url):
27+
while True:
28+
try:
29+
if connected is False:
30+
await self.create_queue(self.queue)
31+
32+
r = await self._connection.receive_message(
33+
QueueUrl=queue_url,
34+
**self.consumer_params,
35+
)
36+
37+
except Exception as e:
38+
if connected is True:
39+
self._log(e, logging.WARNING, c, exc_info=e)
40+
self._queues.pop(self.queue.name)
41+
connected = False
42+
43+
await anyio.sleep(5)
44+
45+
else:
46+
if connected is False:
47+
self._log("Connection established", logging.INFO, c)
48+
connected = True
49+
50+
messages = r.get("Messages", [])
51+
for msg in messages:
52+
try:
53+
await self.callback(msg, True)
54+
except Exception:
55+
has_trash_messages = True
56+
else:
57+
has_trash_messages = False
58+
59+
if has_trash_messages is True:
60+
await anyio.sleep(
61+
self.consumer_params.get("WaitTimeSeconds", 1.0)
62+
)
63+
864
async def start(self, client: AioBaseClient) -> None:
9-
# TODO check "start" method on broker
65+
url = await self.create_queue(self.queue)
66+
self.task = asyncio.create_task(self._consume(url))
67+
68+
async def close(self) -> None:
1069
pass

0 commit comments

Comments
 (0)