Skip to content

Commit b12bba3

Browse files
committed
WIP
1 parent 0f0e99e commit b12bba3

File tree

7 files changed

+57
-55
lines changed

7 files changed

+57
-55
lines changed

docker-compose.yaml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,8 @@ services:
6767
CELERY__broker_url: "redis://redis:6379/0"
6868
CELERY__result_backend: "redis://redis:6379/1"
6969
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
70+
EVENTS__IS_PUBLISHER: "True"
71+
EVENTS__IS_SUBSCRIBER: "False"
7072
ports:
7173
- '8000:8000'
7274
working_dir: "/app/src"
@@ -99,6 +101,8 @@ services:
99101
CELERY__broker_url: "redis://redis:6379/0"
100102
CELERY__result_backend: "redis://redis:6379/1"
101103
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
104+
EVENTS__IS_PUBLISHER: "True"
105+
EVENTS__IS_SUBSCRIBER: "True"
102106
working_dir: "/app/src"
103107
volumes:
104108
- '.:/app'
@@ -108,7 +112,6 @@ services:
108112
command:
109113
- faststream
110114
- run
111-
# - event_consumer:app
112115
- event_consumer:create_app
113116
- --factory
114117

@@ -124,6 +127,8 @@ services:
124127
CELERY__broker_url: "redis://redis:6379/0"
125128
CELERY__result_backend: "redis://redis:6379/1"
126129
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
130+
EVENTS__IS_PUBLISHER: "True"
131+
EVENTS__IS_SUBSCRIBER: "True"
127132
ports:
128133
- '8000:8000'
129134
working_dir: "/app/src"

src/bootstrap/bootstrap.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,23 @@
33
from celery import Celery
44
from dependency_injector.containers import DynamicContainer
55
from dependency_injector.providers import Object
6-
from faststream.redis import RedisBroker
6+
from faststream.redis import RedisBroker, RedisRouter
77

88
# from gateways.event import FastStreamRedisGateway
99
from pydantic import BaseModel, ConfigDict
1010

1111
from .celery import init_celery
1212
from .config import AppConfig
1313
from .di_container import Container
14-
from .faststream import init_broker
14+
from .faststream import init_router
1515
from .logs import init_logger
1616
from .storage import init_storage
1717

1818

1919
class InitReference(BaseModel):
2020
celery_app: Celery
2121
di_container: DynamicContainer
22-
faststream_broker: RedisBroker
22+
faststream_broker: RedisRouter
2323

2424
model_config = ConfigDict(arbitrary_types_allowed=True)
2525

@@ -34,7 +34,7 @@ def application_init(app_config: AppConfig) -> InitReference:
3434
init_logger(app_config)
3535
init_storage()
3636
celery = init_celery(app_config)
37-
broker = init_broker(app_config)
37+
broker = init_router(app_config.EVENTS)
3838
# This is temporary, has to go directly in the Container
3939
# container.BookEventGatewayInterface.override(
4040
# Object(FastStreamRedisGateway(broker=broker))

src/bootstrap/config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ class CeleryConfig(BaseModel):
4949
class EventConfig(BaseModel):
5050
REDIS_BROKER_URL: str = ""
5151
TOPIC: Optional[str] = None
52+
IS_PUBLISHER: bool = False
53+
IS_SUBSCRIBER: bool = False
5254

5355

5456
class AppConfig(BaseSettings):

src/bootstrap/faststream.py

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,40 @@
55
from faststream import Logger
66

77
# from domains.events import get_topic_registry
8-
from faststream.redis import RedisBroker
8+
from faststream.redis import RedisRouter, RedisBroker, fastapi
99

1010
# from faststream.redis.publisher.asyncapi import AsyncAPIPublisher
1111
from opentelemetry.instrumentation.faststream import RedisOtelMiddleware
1212

13+
from bootstrap.config import EventConfig
1314
from domains import event_registry
1415

15-
from .config import AppConfig
16+
logger = structlog.getLogger(__name__)
1617

1718

18-
def init_broker(config: AppConfig) -> RedisBroker:
19+
def init_router(config: EventConfig) -> RedisRouter:
1920
broker = RedisBroker(
20-
config.EVENTS.REDIS_BROKER_URL,
21+
config.REDIS_BROKER_URL,
2122
middlewares=(RedisOtelMiddleware,),
2223
logger=structlog.getLogger("faststream.broker"),
2324
)
24-
register_publishers(broker, config.EVENTS.TOPIC)
25-
register_subscribers(broker, config.EVENTS.TOPIC)
2625

27-
return broker
26+
router = RedisRouter()
27+
register_publishers(router, config.TOPIC)
28+
if config.IS_SUBSCRIBER:
29+
register_subscribers(router, config.TOPIC)
2830

31+
broker.include_router(router)
32+
return router
2933

30-
def register_subscribers(broker, topic: Optional[str] = None):
31-
if topic is not None and topic in event_registry.keys():
32-
topics_map = {topic: event_registry[topic]}
33-
else:
34-
topics_map = event_registry.copy()
3534

36-
for topic, event_type in topics_map.items():
37-
38-
@broker.subscriber(topic)
39-
async def handler(msg: event_type, logger: Logger) -> None: # type: ignore[valid-type]
40-
logger.info(f"Received message {type(msg)} {msg}")
4135

4236

43-
def register_publishers(broker, topic: Optional[str] = None):
37+
def register_publishers(router: RedisRouter, topic: Optional[str] = None):
4438
if topic is not None and topic in event_registry.keys():
4539
topics_map = {topic: event_registry[topic]}
4640
else:
4741
topics_map = event_registry.copy()
4842

4943
for topic, event_type in topics_map.items():
50-
broker.publisher(topic, schema=event_registry[topic])
51-
52-
53-
# def init_publishers(
54-
# broker: RedisBroker,
55-
# ) -> Dict[str, AsyncAPIPublisher]:
56-
# return {
57-
# topic: broker.publisher(topic, schema=Union[event_types])
58-
# for topic, event_types in get_topic_registry().items()
59-
# }
44+
router.publisher(topic, schema=event_registry[topic])

src/event_consumer/__init__.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,20 @@
55
"""
66

77
import os
8-
from typing import Union
8+
from typing import Union, Optional
99

1010
import structlog
1111
from faststream import FastStream
12+
from faststream.redis import RedisRouter
1213
from opentelemetry import trace
1314
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
1415
from opentelemetry.sdk.resources import Resource
1516
from opentelemetry.sdk.trace import TracerProvider
1617
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1718

19+
from alembic.env import logger
1820
from bootstrap import AppConfig, application_init
21+
from domains import event_registry
1922
from domains.books.events import BookCreatedV1, BookCreatedV1Data
2023

2124

@@ -33,7 +36,8 @@ def create_app(test_config: Union[AppConfig, None] = None) -> FastStream:
3336
setup_telemetry(
3437
"faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
3538
)
36-
broker = application_init(AppConfig()).faststream_broker
39+
router = application_init(AppConfig()).faststream_broker
40+
broker = router.broker
3741
app = FastStream(broker, logger=structlog.get_logger())
3842

3943
@app.after_startup
@@ -50,3 +54,20 @@ async def after_startup():
5054
)
5155

5256
return app
57+
58+
59+
60+
# TODO: Add Routing structure similar to the one in the fastapi and
61+
# move this in the event_consumer_module
62+
def register_subscribers(router: RedisRouter, topic: Optional[str] = None):
63+
if topic is not None and topic in event_registry.keys():
64+
topics_map = {topic: event_registry[topic]}
65+
else:
66+
topics_map = event_registry.copy()
67+
68+
logger = structlog.get_logger()
69+
70+
for topic, event_type in topics_map.items():
71+
@router.subscriber(topic)
72+
async def handler(msg: event_type) -> None: # type: ignore[valid-type]
73+
logger.info(f"Received message {type(msg)} {msg}")

src/http_app/__init__.py

Lines changed: 7 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
1-
from contextlib import asynccontextmanager
2-
from typing import Any, Union
1+
from typing import Union, Any
32

43
from fastapi import FastAPI, Request
54
from faststream.broker.core.usecase import BrokerUsecase
6-
from faststream.types import Lifespan
5+
from faststream.redis import RedisRouter, fastapi
76
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
87
from starlette.responses import JSONResponse
98
from starlette_prometheus import PrometheusMiddleware, metrics
@@ -19,20 +18,14 @@ def create_app(
1918
app_config = test_config or AppConfig()
2019
ref = application_init(app_config)
2120

22-
"""
23-
We don't want to couple together FastAPI and FastStream
24-
so we use a lifespan handler instead of the plugin to
25-
remain decoupled but still able use any generic broker
26-
for message publishing in FastAPI.
27-
"""
2821
app = FastAPI(
2922
debug=app_config.DEBUG,
3023
title=app_config.APP_NAME,
31-
lifespan=faststream_lifespan(ref.faststream_broker),
3224
)
3325
init_exception_handlers(app)
3426

3527
init_routes(app)
28+
add_faststream_router(app, ref.faststream_broker)
3629

3730
"""
3831
OpenTelemetry prometheus exporter does not work together with automatic
@@ -67,13 +60,7 @@ async def add_exception_middleware(request: Request, call_next):
6760
return JSONResponse({"error": "Internal server error"}, status_code=500)
6861

6962

70-
def faststream_lifespan(broker: BrokerUsecase[Any, Any]) -> Lifespan:
71-
@asynccontextmanager
72-
async def handler(app: FastAPI):
73-
await broker.start()
74-
try:
75-
yield
76-
finally:
77-
await broker.close()
78-
79-
return handler
63+
def add_faststream_router(app: FastAPI, router: RedisRouter) -> None:
64+
f_router = fastapi.RedisRouter()
65+
f_router.include_router(router)
66+
app.include_router(f_router)

src/http_app/routes/ping.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from fastapi import APIRouter
22
from pydantic import BaseModel, ConfigDict
3+
from starlette.responses import JSONResponse
34

45
router = APIRouter()
56

@@ -18,4 +19,5 @@ class PingResponse(BaseModel):
1819

1920
@router.get("/ping")
2021
async def ping() -> PingResponse:
22+
JSONResponse({"ping": "pong!"})
2123
return PingResponse(ping="pong!")

0 commit comments

Comments
 (0)