Skip to content

Commit 4d28c73

Browse files
author
Carlos Bellino
committed
draft: SQS integration.
1 parent 3bcf2d6 commit 4d28c73

File tree

13 files changed

+400
-0
lines changed

13 files changed

+400
-0
lines changed

faststream/sqs/__init__.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
from faststream.broker.test import TestApp
2+
from faststream.sqs.annotations import SQSBroker, SQSMessage, SQSProducer
3+
from faststream.sqs.router import SQSRouter
4+
from faststream.sqs.shared.router import SQSRoute
5+
from faststream.sqs.shared.schemas import (
6+
FifoQueue,
7+
RedriveAllowPolicy,
8+
RedrivePolicy,
9+
SQSQueue,
10+
)
11+
from faststream.sqs.test import TestSQSBroker
12+
13+
__all__ = (
14+
"FifoQueue",
15+
"RedriveAllowPolicy",
16+
"RedrivePolicy",
17+
"SQSBroker",
18+
"SQSMessage",
19+
"SQSQueue",
20+
"SQSRouter",
21+
"SQSRoute",
22+
"TestApp",
23+
"TestSQSBroker",
24+
)

faststream/sqs/annotations.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
from faststream._compat import Annotated
2+
from faststream.annotations import ContextRepo, Logger, NoCast
3+
from faststream.sqs.broker import SQSBroker as SB # NOQA
4+
from faststream.sqs.message import SQSMessage as SM # NOQA
5+
from faststream.sqs.producer import SQSFastProducer
6+
from faststream.utils.context import Context
7+
8+
__all__ = (
9+
"Logger",
10+
"ContextRepo",
11+
"NoCast",
12+
"SQSBroker",
13+
"SQSMessage",
14+
"SQSProducer",
15+
)
16+
17+
SQSBroker = Annotated[SB, Context("broker")]
18+
SQSMessage = Annotated[SM, Context("message")]
19+
SQSProducer = Annotated[SQSFastProducer, Context("broker._producer")]

faststream/sqs/asyncapi.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
class Publisher:
2+
# TODO
3+
pass
4+
5+
6+
class Handler:
7+
# TODO
8+
pass

faststream/sqs/broker.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class SQSBroker:
2+
# TODO
3+
pass

faststream/sqs/handler.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from aiobotocore.client import AioBaseClient
2+
3+
from faststream.broker.handler import AsyncHandler
4+
from faststream.types import AnyDict
5+
6+
7+
class LogicSQSHandler(AsyncHandler[AnyDict]):
8+
async def start(self, client: AioBaseClient) -> None:
9+
# TODO check "start" method on broker
10+
pass

faststream/sqs/message.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
from typing import Any
2+
3+
from faststream.broker.message import StreamMessage
4+
from faststream.types import AnyDict
5+
6+
7+
class SQSMessage(StreamMessage[AnyDict]):
8+
def __init__(
9+
self,
10+
*args: Any,
11+
**kwargs: Any,
12+
) -> None:
13+
super().__init__(*args, **kwargs)
14+
self.commited = False
15+
16+
async def ack(self, **kwargs: Any) -> None:
17+
# TODO: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html
18+
self.commited = True
19+
20+
async def nack(self, **kwargs: Any) -> None:
21+
# TODO
22+
self.commited = True
23+
24+
async def reject(self, **kwargs: Any) -> None:
25+
# TODO
26+
self.commited = True

faststream/sqs/producer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
class SQSFastProducer:
2+
# TODO
3+
pass

faststream/sqs/router.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from faststream.sqs.shared.router import SQSRouter as BaseRouter
2+
3+
4+
class SQSRouter(BaseRouter):
5+
# TODO
6+
pass

faststream/sqs/shared/__init__.py

Whitespace-only changes.

faststream/sqs/shared/logging.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
import logging
2+
from typing import Any, Optional
3+
4+
from faststream._compat import override
5+
from faststream.broker.core.mixins import LoggingMixin
6+
from faststream.broker.message import StreamMessage
7+
from faststream.log import access_logger
8+
from faststream.types import AnyDict
9+
10+
11+
class SQSLoggingMixin(LoggingMixin):
12+
_max_queue_len: int
13+
14+
def __init__(
15+
self,
16+
*args: Any,
17+
logger: Optional[logging.Logger] = access_logger,
18+
log_level: int = logging.INFO,
19+
log_fmt: Optional[str] = None,
20+
**kwargs: Any,
21+
) -> None:
22+
super().__init__(
23+
*args,
24+
logger=logger,
25+
log_level=log_level,
26+
log_fmt=log_fmt,
27+
**kwargs,
28+
)
29+
self._max_queue_len = 4
30+
31+
@override
32+
def _get_log_context( # type: ignore[override]
33+
self,
34+
message: Optional[StreamMessage[Any]],
35+
queue: str = "",
36+
) -> AnyDict:
37+
return {
38+
"queue": queue,
39+
**super()._get_log_context(message),
40+
}
41+
42+
@property
43+
def fmt(self) -> str:
44+
return self._fmt or (
45+
"%(asctime)s %(levelname)s - "
46+
f"%(queue)-{self._max_queue_len}s | "
47+
"%(message_id)-10s "
48+
"- %(message)s"
49+
)
50+
51+
def _setup_log_context(
52+
self,
53+
queue: Optional[str] = None,
54+
) -> None:
55+
if queue is not None:
56+
self._max_queue_len = max((self._max_queue_len, len(queue)))

0 commit comments

Comments
 (0)