Skip to content

Commit 5309c9e

Browse files
committed
Initial working draft
1 parent ccbc9b0 commit 5309c9e

File tree

4 files changed

+53
-36
lines changed

4 files changed

+53
-36
lines changed

src/bootstrap/bootstrap.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,21 +4,22 @@
44
from dependency_injector.containers import DynamicContainer
55
from dependency_injector.providers import Object
66
from faststream.broker.core.usecase import BrokerUsecase
7+
from faststream.redis import RedisBroker
78
# from gateways.event import FastStreamRedisGateway
89
from pydantic import BaseModel, ConfigDict
910

1011
from .celery import init_celery
1112
from .config import AppConfig
1213
from .di_container import Container
13-
# from .faststream import init_broker
14+
from .faststream import init_broker
1415
from .logs import init_logger
1516
from .storage import init_storage
1617

1718

1819
class InitReference(BaseModel):
1920
celery_app: Celery
2021
di_container: DynamicContainer
21-
# faststream_broker: BrokerUsecase[Any, Any]
22+
faststream_broker: RedisBroker
2223

2324
model_config = ConfigDict(arbitrary_types_allowed=True)
2425

@@ -33,7 +34,7 @@ def application_init(app_config: AppConfig) -> InitReference:
3334
init_logger(app_config)
3435
init_storage()
3536
celery = init_celery(app_config)
36-
# broker = init_broker(app_config)
37+
broker = init_broker(app_config)
3738
# This is temporary, has to go directly in the Container
3839
# container.BookEventGatewayInterface.override(
3940
# Object(FastStreamRedisGateway(broker=broker))
@@ -42,5 +43,5 @@ def application_init(app_config: AppConfig) -> InitReference:
4243
return InitReference(
4344
celery_app=celery,
4445
di_container=container,
45-
# faststream_broker=broker,
46+
faststream_broker=broker,
4647
)

src/bootstrap/config.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ class AppConfig(BaseSettings):
5151

5252
APP_NAME: str = "bootstrap"
5353
CELERY: CeleryConfig = CeleryConfig()
54-
# EVENTS: EventConfig
54+
EVENTS: EventConfig
5555
DEBUG: bool = False
5656
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
5757
SQLALCHEMY_CONFIG: Dict[str, SQLAlchemyConfig] = dict(

src/bootstrap/faststream.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,24 @@
11
# from typing import Dict, Union
2-
#
3-
# import structlog
4-
# # from domains.events import get_topic_registry
5-
# from faststream.redis import RedisBroker
2+
3+
import structlog
4+
# from domains.events import get_topic_registry
5+
from faststream.redis import RedisBroker
66
# from faststream.redis.publisher.asyncapi import AsyncAPIPublisher
7-
# from opentelemetry.instrumentation.faststream import RedisOtelMiddleware
8-
#
9-
# from .config import AppConfig
10-
#
11-
#
12-
# def init_broker(config: AppConfig) -> RedisBroker:
13-
# broker = RedisBroker(
14-
# config.EVENTS.REDIS_BROKER_URL,
15-
# middlewares=(RedisOtelMiddleware,),
16-
# logger=structlog.getLogger("faststream.broker"),
17-
# )
18-
#
19-
# return broker
20-
#
21-
#
7+
from opentelemetry.instrumentation.faststream import RedisOtelMiddleware
8+
9+
from .config import AppConfig
10+
11+
12+
def init_broker(config: AppConfig) -> RedisBroker:
13+
broker = RedisBroker(
14+
config.EVENTS.REDIS_BROKER_URL,
15+
middlewares=(RedisOtelMiddleware,),
16+
logger=structlog.getLogger("faststream.broker"),
17+
)
18+
19+
return broker
20+
21+
2222
# def init_publishers(
2323
# broker: RedisBroker,
2424
# ) -> Dict[str, AsyncAPIPublisher]:

src/event_consumer/__init__.py

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
application layers (storage, logs) when running standalone workers
44
without having to initialise the HTTP framework (or other ones)
55
"""
6-
76
import os
87
from typing import Annotated, Union, Dict, Type, Optional, List
98

@@ -18,7 +17,7 @@
1817
from opentelemetry.sdk.trace import TracerProvider
1918
from opentelemetry.sdk.trace.export import BatchSpanProcessor
2019

21-
from domains.books.events import BookCreatedV1, BookUpdatedV1
20+
from domains.books.events import BookCreatedV1, BookUpdatedV1, BookCreatedV1Data
2221

2322
_event_registry: Dict[str, Type] = {
2423
'books': Annotated[
@@ -36,24 +35,41 @@ def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
3635
return tracer_provider
3736

3837

39-
def create_app(
40-
test_config: Union[AppConfig, None] = None
41-
) -> FastStream:
42-
setup_telemetry("faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"])
43-
broker = application_init(test_config or AppConfig()).faststream_broker
44-
return FastStream(broker, logger=structlog.get_logger())
45-
46-
4738
def register_subscribers(broker, topics: Optional[List[str]] = None):
4839
if topics is None:
4940
topics_map: Dict[str, Type] = _event_registry
5041
else:
5142
topics_map: Dict[str, Type] = {k: v for k, v in _event_registry.items() if k in topics}
5243

53-
for topic, event_type in topics_map:
44+
45+
for topic, event_type in topics_map.items():
5446
@broker.subscriber(topic) # type: ignore
5547
async def handler(msg: event_type, logger: Logger) -> None:
56-
logger.info("Received message", extra={"msg": "some_extra_here"})
48+
logger.info(f"Received message {type(msg)} {msg}")
49+
# logger.info(f"Received message {type(msg)} {msg}", extra={"msg": "some_extra_here"})
5750
# l = logging.getLogger()
5851
# l.info("AAAAA", extra={"eee": "AAA"})
5952

53+
54+
def create_app(
55+
test_config: Union[AppConfig, None] = None
56+
) -> FastStream:
57+
setup_telemetry("faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"])
58+
broker = application_init(AppConfig()).faststream_broker
59+
app = FastStream(broker, logger=structlog.get_logger())
60+
register_subscribers(broker)
61+
62+
publisher = broker.publisher("books", schema=_event_registry["books"])
63+
64+
@app.after_startup
65+
async def after_startup():
66+
await broker.publish(BookCreatedV1.event_factory(
67+
data=BookCreatedV1Data(
68+
book_id=123,
69+
title="AAA",
70+
author_name="BBB",
71+
)
72+
), "books")
73+
74+
75+
return app

0 commit comments

Comments
 (0)