Skip to content

Commit 34ba626

Browse files
committed
WIP
Signed-off-by: Federico Busetti <[email protected]>
1 parent bd3408b commit 34ba626

File tree

10 files changed

+261
-53
lines changed

10 files changed

+261
-53
lines changed

docker-compose.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ services:
77

88
redis:
99
image: redis
10+
ports:
11+
- "6379:6379"
1012

1113
celery-worker:
1214
build:

poetry.lock

Lines changed: 75 additions & 49 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@ asgiref = "^3.7.2"
2525
celery = { version = "^5.3.1", extras = ["redis"] }
2626
cloudevents-pydantic = "^0.0.2"
2727
dependency-injector = { version = "^4.41.0", extras = ["pydantic"] }
28+
faststream = { version = "^0.5.25", extras = ["redis"] }
2829
httpx = ">=0.23.0"
2930
opentelemetry-distro = { version = "*", extras = ["otlp"] }
3031
opentelemetry-instrumentation = "*"
3132
opentelemetry-instrumentation-celery = "*"
33+
opentelemetry-instrumentation-faststream = "*"
3234
opentelemetry-instrumentation-httpx = "*"
3335
opentelemetry-instrumentation-sqlalchemy = "*"
3436
pydantic = "^2.2.1"
3537
pydantic-settings = "^2.0.3"
36-
python = ">=3.9,<3.13"
38+
python = ">=3.11,<3.13"
3739
rich = "^13.2.0"
3840
SQLAlchemy = { version = "^2.0.0", extras = ["asyncio", "mypy"] }
3941
sqlalchemy-bind-manager = "*"

src/bootstrap/bootstrap.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,24 @@
1-
from typing import cast
1+
from typing import cast, Any
22

33
from celery import Celery
44
from dependency_injector.containers import DynamicContainer
55
from dependency_injector.providers import Object
6+
from faststream.broker.core.usecase import BrokerUsecase
7+
# from gateways.event import FastStreamRedisGateway
68
from pydantic import BaseModel, ConfigDict
79

810
from .celery import init_celery
911
from .config import AppConfig
1012
from .di_container import Container
13+
# from .faststream import init_broker
1114
from .logs import init_logger
1215
from .storage import init_storage
1316

1417

1518
class InitReference(BaseModel):
1619
celery_app: Celery
1720
di_container: DynamicContainer
21+
# faststream_broker: BrokerUsecase[Any, Any]
1822

1923
model_config = ConfigDict(arbitrary_types_allowed=True)
2024

@@ -29,8 +33,14 @@ def application_init(app_config: AppConfig) -> InitReference:
2933
init_logger(app_config)
3034
init_storage()
3135
celery = init_celery(app_config)
36+
# broker = init_broker(app_config)
37+
# This is temporary, has to go directly in the Container
38+
# container.BookEventGatewayInterface.override(
39+
# Object(FastStreamRedisGateway(broker=broker))
40+
# )
3241

3342
return InitReference(
3443
celery_app=celery,
3544
di_container=container,
45+
# faststream_broker=broker,
3646
)

src/bootstrap/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,11 +42,16 @@ class CeleryConfig(BaseModel):
4242
# }
4343

4444

45+
class EventConfig(BaseModel):
46+
REDIS_BROKER_URL: str
47+
48+
4549
class AppConfig(BaseSettings):
4650
model_config = SettingsConfigDict(env_nested_delimiter="__")
4751

4852
APP_NAME: str = "bootstrap"
4953
CELERY: CeleryConfig = CeleryConfig()
54+
# EVENTS: EventConfig
5055
DEBUG: bool = False
5156
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
5257
SQLALCHEMY_CONFIG: Dict[str, SQLAlchemyConfig] = dict(

src/bootstrap/faststream.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# from typing import Dict, Union
2+
#
3+
# import structlog
4+
# # from domains.events import get_topic_registry
5+
# from faststream.redis import RedisBroker
6+
# from faststream.redis.publisher.asyncapi import AsyncAPIPublisher
7+
# from opentelemetry.instrumentation.faststream import RedisOtelMiddleware
8+
#
9+
# from .config import AppConfig
10+
#
11+
#
12+
# def init_broker(config: AppConfig) -> RedisBroker:
13+
# broker = RedisBroker(
14+
# config.EVENTS.REDIS_BROKER_URL,
15+
# middlewares=(RedisOtelMiddleware,),
16+
# logger=structlog.getLogger("faststream.broker"),
17+
# )
18+
#
19+
# return broker
20+
#
21+
#
22+
# def init_publishers(
23+
# broker: RedisBroker,
24+
# ) -> Dict[str, AsyncAPIPublisher]:
25+
# return {
26+
# topic: broker.publisher(topic, schema=Union[event_types])
27+
# for topic, event_types in get_topic_registry().items()
28+
# }

0 commit comments

Comments
 (0)