diff --git a/faststream/__init__.py b/faststream/__init__.py index f878d01b21..cfeb0ef11e 100644 --- a/faststream/__init__.py +++ b/faststream/__init__.py @@ -1,5 +1,6 @@ """A Python framework for building services interacting with Apache Kafka, RabbitMQ, NATS and Redis.""" +from faststream._internal.configs.settings import Settings from faststream._internal.testing.app import TestApp from faststream._internal.utils import apply_types from faststream.annotations import ContextRepo, Logger @@ -27,6 +28,7 @@ "PublishCommand", "PublishType", "Response", + "Settings", "SourceType", "StreamMessage", "TestApp", diff --git a/faststream/_internal/broker/broker.py b/faststream/_internal/broker/broker.py index 7e9ab06182..2cbbeaeeed 100644 --- a/faststream/_internal/broker/broker.py +++ b/faststream/_internal/broker/broker.py @@ -105,11 +105,20 @@ def _setup_logger(self) -> None: self.config.logger._setup(self.config.fd_config.context) + def setup_logger(self) -> None: + self._setup_logger() + + def resolve_settings(self) -> None: + for sub in self.subscribers: + sub.resolve_settings() + + for pub in self.publishers: + pub.resolve_settings() + async def connect(self) -> ConnectionType: """Connect to a remote server.""" if self._connection is None: self._connection = await self._connect() - self._setup_logger() return self._connection diff --git a/faststream/_internal/configs/__init__.py b/faststream/_internal/configs/__init__.py index a1249507e3..9106c8a6b0 100644 --- a/faststream/_internal/configs/__init__.py +++ b/faststream/_internal/configs/__init__.py @@ -1,5 +1,6 @@ from .broker import BrokerConfig, BrokerConfigType, ConfigComposition from .endpoint import PublisherUsecaseConfig, SubscriberUsecaseConfig +from .settings import Settings, make_settings_container from .specification import ( PublisherSpecificationConfig, SpecificationConfig as SubscriberSpecificationConfig, @@ -11,6 +12,8 @@ "ConfigComposition", "PublisherSpecificationConfig", "PublisherUsecaseConfig", + "Settings", "SubscriberSpecificationConfig", "SubscriberUsecaseConfig", + "make_settings_container", ) diff --git a/faststream/_internal/configs/broker.py b/faststream/_internal/configs/broker.py index 650fd88822..d3a9e35549 100644 --- a/faststream/_internal/configs/broker.py +++ b/faststream/_internal/configs/broker.py @@ -8,6 +8,8 @@ from faststream._internal.logger import LoggerState from faststream._internal.producer import ProducerProto, ProducerUnset +from .settings import SettingsContainer, make_settings_container + if TYPE_CHECKING: from fast_depends.dependencies import Dependant @@ -19,6 +21,8 @@ class BrokerConfig: prefix: str = "" include_in_schema: bool | None = True + settings: SettingsContainer = field(default_factory=make_settings_container) + broker_middlewares: Sequence["BrokerMiddleware[Any]"] = () broker_parser: Optional["CustomCallable"] = None broker_decoder: Optional["CustomCallable"] = None diff --git a/faststream/_internal/configs/settings.py b/faststream/_internal/configs/settings.py new file mode 100644 index 0000000000..ad9ba42d10 --- /dev/null +++ b/faststream/_internal/configs/settings.py @@ -0,0 +1,64 @@ +from collections.abc import Mapping, MutableMapping +from dataclasses import dataclass +from typing import Any, Protocol + + +@dataclass(slots=True) +class Settings: + key: str + + +class SettingsContainer(Protocol): + def resolve(self, item: Any) -> Any: + pass + + +class RealSettingsContainer(SettingsContainer): + def __init__(self, settings: Mapping[str, Any]) -> None: + self._items = settings + + def resolve(self, item: Any) -> Any: + if isinstance(item, Settings): + return self._items[item.key] + self._resolve_child(item) + return item + + def _resolve_child( + self, + item: Any, + seen: set[Any] | None = None, + ) -> None: + if seen is None: + seen = set() + + if id(item) in seen: + return + + seen.add(id(item)) + + if isinstance(item, MutableMapping): + for key, value in item.items(): + if isinstance(value, Settings): + item[key] = self._items[value.key] + self._resolve_child(value, seen) + + else: + for attr_name in dir(item): + if not attr_name.startswith("_"): + attr = getattr(item, attr_name) + if isinstance(attr, Settings): + setattr(item, attr_name, self._items[attr.key]) + self._resolve_child(attr, seen) + + +class FakeSettingsContainer(SettingsContainer): + def resolve(self, item: Any) -> Any: + return item + + +def make_settings_container( + settings: Mapping[str, Any] | None = None, +) -> SettingsContainer: + if not settings: + return FakeSettingsContainer() + return RealSettingsContainer(settings) diff --git a/faststream/rabbit/broker/broker.py b/faststream/rabbit/broker/broker.py index 2f6ddde33d..aad0af75a5 100644 --- a/faststream/rabbit/broker/broker.py +++ b/faststream/rabbit/broker/broker.py @@ -1,5 +1,5 @@ import logging -from collections.abc import Iterable, Sequence +from collections.abc import Iterable, Mapping, Sequence from typing import ( TYPE_CHECKING, Any, @@ -16,6 +16,7 @@ from faststream.__about__ import SERVICE_NAME from faststream._internal.broker import BrokerUsecase +from faststream._internal.configs import make_settings_container from faststream._internal.constants import EMPTY from faststream._internal.context.repository import ContextRepo from faststream._internal.di import FastDependsConfig @@ -108,6 +109,7 @@ def __init__( # FastDepends args apply_types: bool = True, serializer: Optional["SerializerProto"] = EMPTY, + settings: Mapping[str, Any] | None = None, provider: Optional["Provider"] = None, context: Optional["ContextRepo"] = None, ) -> None: @@ -141,6 +143,7 @@ def __init__( log_level: Service messages log level. apply_types: Whether to use FastDepends or not. serializer: FastDepends-compatible serializer to validate incoming messages. + settings: Container for configuration publisher and subscriber. provider: Provider for FastDepends. context: Context for FastDepends. """ @@ -185,6 +188,7 @@ def __init__( # Basic args routers=routers, config=RabbitBrokerConfig( + settings=make_settings_container(settings), channel_manager=cm, producer=producer, declarer=declarer, @@ -274,6 +278,9 @@ async def close( async def start(self) -> None: """Connect broker to RabbitMQ and startup all subscribers.""" await self.connect() + # can merge it into one operation, something like br.initialize or br.initialize_pipe + self.resolve_settings() + self.setup_logger() await self.declare_queue(RABBIT_REPLY) await super().start() diff --git a/faststream/rabbit/broker/registrator.py b/faststream/rabbit/broker/registrator.py index 6060c077fa..fb9d03883c 100644 --- a/faststream/rabbit/broker/registrator.py +++ b/faststream/rabbit/broker/registrator.py @@ -5,6 +5,7 @@ from typing_extensions import deprecated, override from faststream._internal.broker.registrator import Registrator +from faststream._internal.configs import Settings from faststream._internal.constants import EMPTY from faststream.exceptions import SetupError from faststream.middlewares import AckPolicy @@ -38,11 +39,11 @@ class RabbitRegistrator(Registrator[IncomingMessage, RabbitBrokerConfig]): @override def subscriber( # type: ignore[override] self, - queue: Union[str, "RabbitQueue"], - exchange: Union[str, "RabbitExchange", None] = None, + queue: Union[str, "RabbitQueue", Settings], + exchange: Union[str, "RabbitExchange", Settings, None] = None, *, - channel: Optional["Channel"] = None, - consume_args: dict[str, Any] | None = None, + channel: Optional["Channel"] | Settings = None, + consume_args: dict[str, Any] | Settings | None = None, no_ack: Annotated[ bool, deprecated( @@ -120,16 +121,16 @@ def subscriber( # type: ignore[override] @override def publisher( # type: ignore[override] self, - queue: Union["RabbitQueue", str] = "", - exchange: Union["RabbitExchange", str, None] = None, + queue: Union["RabbitQueue", str, Settings] = "", + exchange: Union["RabbitExchange", str, Settings, None] = None, *, - routing_key: str = "", - mandatory: bool = True, - immediate: bool = False, + routing_key: str | Settings = "", + mandatory: bool | Settings = True, + immediate: bool | Settings = False, + persist: bool | Settings = False, + reply_to: str | Settings | None = None, + priority: int | Settings | None = None, timeout: "TimeoutType" = None, - persist: bool = False, - reply_to: str | None = None, - priority: int | None = None, persistent: bool = True, # specific middlewares: Annotated[ @@ -145,12 +146,12 @@ def publisher( # type: ignore[override] schema: Any | None = None, include_in_schema: bool = True, # message args - headers: Optional["HeadersType"] = None, - content_type: str | None = None, - content_encoding: str | None = None, - expiration: Optional["DateType"] = None, - message_type: str | None = None, - user_id: str | None = None, + headers: Optional["HeadersType"] | Settings = None, + content_type: str | Settings | None = None, + content_encoding: str | Settings | None = None, + expiration: Optional["DateType"] | Settings = None, + message_type: str | Settings | None = None, + user_id: str | Settings | None = None, ) -> "RabbitPublisher": """Creates long-living and AsyncAPI-documented publisher object. diff --git a/faststream/rabbit/publisher/factory.py b/faststream/rabbit/publisher/factory.py index 1bf497439b..6250cd977d 100644 --- a/faststream/rabbit/publisher/factory.py +++ b/faststream/rabbit/publisher/factory.py @@ -1,5 +1,7 @@ from collections.abc import Sequence -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Union + +from faststream._internal.configs.settings import Settings from .config import RabbitPublisherConfig, RabbitPublisherSpecificationConfig from .specification import RabbitPublisherSpecification @@ -15,10 +17,10 @@ def create_publisher( *, - routing_key: str, - queue: "RabbitQueue", - exchange: "RabbitExchange", - message_kwargs: "PublishKwargs", + routing_key: str | Settings, + queue: Union["RabbitQueue", Settings], + exchange: Union["RabbitExchange", Settings], + message_kwargs: Union["PublishKwargs", Settings], # Broker args config: "RabbitBrokerConfig", # Publisher args diff --git a/faststream/rabbit/publisher/usecase.py b/faststream/rabbit/publisher/usecase.py index 8e87061a7f..6bc6aae9b7 100644 --- a/faststream/rabbit/publisher/usecase.py +++ b/faststream/rabbit/publisher/usecase.py @@ -1,5 +1,5 @@ from collections.abc import Iterable -from typing import TYPE_CHECKING, Any, Optional, Union +from typing import TYPE_CHECKING, Any, Callable, Optional, Union from typing_extensions import Unpack, override @@ -79,7 +79,17 @@ def routing( return routing_key + def resolve_settings(self) -> None: + resolver: Callable[..., Any] = self._outer_config.settings.resolve + self.routing_key = resolver(self.routing_key) + self.queue = RabbitQueue.validate(resolver(self.queue)) + self.exchange = RabbitExchange.validate(resolver(self.exchange)) + self.headers = resolver(self.headers) + self.reply_to = resolver(self.reply_to) + self.timeout = resolver(self.timeout) + async def start(self) -> None: + """Starts the consumer for the RabbitMQ queue.""" if self.exchange is not None: await self._outer_config.declarer.declare_exchange(self.exchange) return await super().start() diff --git a/faststream/rabbit/schemas/queue.py b/faststream/rabbit/schemas/queue.py index 6754dee9c2..33f70bd05d 100644 --- a/faststream/rabbit/schemas/queue.py +++ b/faststream/rabbit/schemas/queue.py @@ -79,6 +79,15 @@ def add_prefix(self, prefix: str) -> "RabbitQueue": return new_q + def set_routing(self): + re, routing_key = compile_path( + self.routing_key, + replace_symbol="*", + patch_regex=lambda x: x.replace(r"\#", ".+"), + ) + self.path_regex = re + self.routing_key = routing_key + @overload def __init__( self, @@ -164,11 +173,7 @@ def __init__( :param bind_arguments: Queue-exchange binding options. :param routing_key: Explicit binding routing key. Uses name if not present. """ - re, routing_key = compile_path( - routing_key, - replace_symbol="*", - patch_regex=lambda x: x.replace(r"\#", ".+"), - ) + re = None if queue_type is QueueType.QUORUM or queue_type is QueueType.STREAM: if durable is EMPTY: diff --git a/faststream/rabbit/subscriber/factory.py b/faststream/rabbit/subscriber/factory.py index b69ed79487..f87f6d7fc2 100644 --- a/faststream/rabbit/subscriber/factory.py +++ b/faststream/rabbit/subscriber/factory.py @@ -1,6 +1,7 @@ import warnings -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Optional, Union +from faststream._internal.configs.settings import Settings from faststream._internal.constants import EMPTY from faststream._internal.endpoint.subscriber.call_item import CallsCollection from faststream.exceptions import SetupError @@ -20,10 +21,10 @@ def create_subscriber( *, - queue: "RabbitQueue", - exchange: "RabbitExchange", - consume_args: dict[str, Any] | None, - channel: Optional["Channel"], + queue: Union["RabbitQueue", Settings], + exchange: Union["RabbitExchange", Settings], + consume_args: dict[str, Any] | Settings | None, + channel: Optional["Channel"] | Settings, # Subscriber args no_reply: bool, ack_policy: "AckPolicy", diff --git a/faststream/rabbit/subscriber/usecase.py b/faststream/rabbit/subscriber/usecase.py index 94fb40002d..a76ed5da64 100644 --- a/faststream/rabbit/subscriber/usecase.py +++ b/faststream/rabbit/subscriber/usecase.py @@ -1,9 +1,10 @@ import asyncio import contextlib from collections.abc import AsyncIterator, Sequence -from typing import TYPE_CHECKING, Any, Optional, cast +from typing import TYPE_CHECKING, Any, Callable, Optional, cast import anyio +from faststream._internal.configs.settings import Settings from typing_extensions import override from faststream._internal.endpoint.subscriber import SubscriberUsecase @@ -12,6 +13,7 @@ from faststream.rabbit.publisher.fake import RabbitFakePublisher from faststream.rabbit.schemas import RabbitExchange from faststream.rabbit.schemas.constants import REPLY_TO_QUEUE_EXCHANGE_DELIMITER +from faststream.rabbit.schemas import RabbitQueue if TYPE_CHECKING: from aio_pika import IncomingMessage, RobustQueue @@ -24,7 +26,6 @@ from faststream.message import StreamMessage from faststream.rabbit.configs import RabbitBrokerConfig from faststream.rabbit.message import RabbitMessage - from faststream.rabbit.schemas import RabbitQueue from .config import RabbitSubscriberConfig @@ -40,7 +41,7 @@ def __init__( specification: "SubscriberSpecification[Any, Any]", calls: "CallsCollection[IncomingMessage]", ) -> None: - parser = AioPikaParser(pattern=config.queue.path_regex) + parser = AioPikaParser() config.decoder = parser.decode_message config.parser = parser.parse_message super().__init__( @@ -67,6 +68,23 @@ def app_id(self) -> str | None: def routing(self) -> str: return f"{self._outer_config.prefix}{self.queue.routing()}" + def resolve_settings(self): + resolver: Callable[..., Any] = self._outer_config.settings.resolve + self.queue = resolver(self.queue) + self.queue: RabbitQueue = RabbitQueue.validate(resolver(self.queue)) + self.queue.set_routing() + self.exchange = resolver(self.exchange) + self.exchange = RabbitExchange.validate(resolver(self.exchange)) + self.consume_args = resolver(self.consume_args) + self.__no_ack = resolver(self.__no_ack) + self._consumer_tag = resolver(self._consumer_tag) + self._queue_obj = resolver(self._queue_obj) + self.channel = resolver(self.channel) + pattern = getattr(self.queue, "path_regex", None) + parser = AioPikaParser(pattern=pattern) + self._parser = parser.parse_message + self._decoder = parser.decode_message + @override async def start(self) -> None: """Starts the consumer for the RabbitMQ queue.""" @@ -210,7 +228,7 @@ def _make_response_publisher( @staticmethod def build_log_context( message: Optional["StreamMessage[Any]"], - queue: "RabbitQueue", + queue: "RabbitQueue | Settings", exchange: Optional["RabbitExchange"] = None, ) -> dict[str, str]: return { diff --git a/tests/brokers/rabbit/test_fastapi.py b/tests/brokers/rabbit/test_fastapi.py index aa71d86cec..3d1ea33fe2 100644 --- a/tests/brokers/rabbit/test_fastapi.py +++ b/tests/brokers/rabbit/test_fastapi.py @@ -78,6 +78,8 @@ async def test_path(self) -> None: async def hello(name): return name + await router.broker.start() + async with self.patch_broker(router.broker) as br: r = await br.request( "hi", diff --git a/tests/brokers/rabbit/test_settings_container.py b/tests/brokers/rabbit/test_settings_container.py new file mode 100644 index 0000000000..9c34054da8 --- /dev/null +++ b/tests/brokers/rabbit/test_settings_container.py @@ -0,0 +1,103 @@ +import asyncio +from typing import Any + +import pytest + +from faststream import Settings +from faststream.rabbit import RabbitBroker, RabbitExchange, RabbitQueue + + +@pytest.mark.asyncio() +@pytest.mark.rabbit() +@pytest.mark.connected() +async def test_queue_from_settings(event: asyncio.Event, queue: str) -> None: + broker = RabbitBroker(settings={"q1": queue}) + + @broker.subscriber(queue=Settings("q1")) + def h(m: Any) -> None: + event.set() + + publisher = broker.publisher(queue=Settings("q1")) + + async with broker: + await broker.start() + + await asyncio.wait( + ( + asyncio.create_task(publisher.publish("test")), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + + +@pytest.mark.asyncio() +@pytest.mark.rabbit() +@pytest.mark.connected() +async def test_queue_object_name_from_settings( + event: asyncio.Event, + queue: str, +) -> None: + broker = RabbitBroker(settings={"queue_name": queue}) + + @broker.subscriber(queue=RabbitQueue(Settings("queue_name"))) + def h(m: Any) -> None: + event.set() + + publisher = broker.publisher(queue=RabbitQueue(Settings("queue_name"))) + + async with broker: + await broker.start() + + await asyncio.wait( + ( + asyncio.create_task(publisher.publish("test")), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() + + +@pytest.mark.asyncio() +@pytest.mark.rabbit() +@pytest.mark.connected() +async def test_nested_settings( + event: asyncio.Event, + queue: str, +) -> None: + settings = { + "ex": RabbitExchange(f"{queue}2"), + "rk": queue, + } + + broker = RabbitBroker(settings=settings) + + @broker.subscriber( + queue=RabbitQueue(name=f"{queue}1", routing_key=Settings("rk")), + exchange=Settings("ex"), + ) + def h(m: Any) -> None: + event.set() + + publisher = broker.publisher( + queue=RabbitQueue(name=f"{queue}1", routing_key=Settings("rk")), + exchange=Settings("ex"), + routing_key=Settings("rk"), + ) + + async with broker: + await broker.start() + + await asyncio.wait( + ( + asyncio.create_task(publisher.publish("test")), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + assert event.is_set() diff --git a/tests/brokers/test_settings_container.py b/tests/brokers/test_settings_container.py new file mode 100644 index 0000000000..d702f16345 --- /dev/null +++ b/tests/brokers/test_settings_container.py @@ -0,0 +1,70 @@ +from dataclasses import dataclass +from typing import Any + +from faststream._internal.configs.settings import RealSettingsContainer, Settings + + +def test_smoke() -> None: + settings = RealSettingsContainer({"a": 1}) + assert settings.resolve(Settings("a")) == 1 + + +def test_nested() -> None: + @dataclass + class SomeClass: + field: Any + + obj = SomeClass(field=Settings("key")) + + settings = RealSettingsContainer({"key": 1}) + assert settings.resolve(obj) == SomeClass(field=1) + + +def test_deep_nested() -> None: + @dataclass + class SomeClass: + field: Any + + obj = SomeClass(field=SomeClass(field=Settings("key"))) + + settings = RealSettingsContainer({"key": 1}) + assert settings.resolve(obj) == SomeClass(field=SomeClass(field=1)) + + +def test_circular_dependency() -> None: + @dataclass + class SomeClass: + field: Any + other: Any = None + + obj1 = SomeClass(field=Settings("key")) + obj2 = SomeClass(field=Settings("key")) + obj2.other = obj1 + obj1.other = obj2 + + settings = RealSettingsContainer({"key": 1}) + assert settings.resolve(obj2) == SomeClass(field=1, other=obj1) + assert settings.resolve(obj1) == SomeClass(field=1, other=obj2) + + +def test_resolve_dict() -> None: + settings = RealSettingsContainer({"key": 1}) + assert settings.resolve({"key": Settings("key")}) == {"key": 1} + + +def test_resolve_complex() -> None: + @dataclass + class SomeClass: + field: Any + + settings = RealSettingsContainer({"key": 1}) + + assert settings.resolve({ + "key": Settings("key"), + "other": {"key": Settings("key")}, + "some_class": SomeClass(field={"key": Settings("key")}), + }) == { + "key": 1, + "other": {"key": 1}, + "some_class": SomeClass(field={"key": 1}), + }