Skip to content

Commit 0f0e99e

Browse files
committed
Eventy consumer refactor
1 parent 5309c9e commit 0f0e99e

File tree

10 files changed

+160
-71
lines changed

10 files changed

+160
-71
lines changed

docker-compose.yaml

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ services:
2020
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
2121
CELERY__broker_url: "redis://redis:6379/0"
2222
CELERY__result_backend: "redis://redis:6379/1"
23+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
2324
working_dir: "/app/src"
2425
volumes:
2526
- '.:/app'
@@ -38,6 +39,7 @@ services:
3839
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
3940
CELERY__broker_url: "redis://redis:6379/0"
4041
CELERY__result_backend: "redis://redis:6379/1"
42+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
4143
working_dir: "/app/src"
4244
volumes:
4345
- '.:/app'
@@ -64,6 +66,7 @@ services:
6466
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
6567
CELERY__broker_url: "redis://redis:6379/0"
6668
CELERY__result_backend: "redis://redis:6379/1"
69+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
6770
ports:
6871
- '8000:8000'
6972
working_dir: "/app/src"
@@ -82,7 +85,61 @@ services:
8285
- "8000"
8386
- --factory
8487
# Remember to disable the reloader in order to allow otel instrumentation
85-
- --reload
88+
# - --reload
89+
90+
event-consumer:
91+
build:
92+
dockerfile: Dockerfile
93+
context: .
94+
target: dev
95+
environment:
96+
WATCHFILES_FORCE_POLLING: true
97+
OTEL_SERVICE_NAME: "bootstrap-fastapi-dev"
98+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
99+
CELERY__broker_url: "redis://redis:6379/0"
100+
CELERY__result_backend: "redis://redis:6379/1"
101+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
102+
working_dir: "/app/src"
103+
volumes:
104+
- '.:/app'
105+
depends_on:
106+
- redis
107+
- otel-collector
108+
command:
109+
- faststream
110+
- run
111+
# - event_consumer:app
112+
- event_consumer:create_app
113+
- --factory
114+
115+
event-docs:
116+
build:
117+
dockerfile: Dockerfile
118+
context: .
119+
target: dev
120+
environment:
121+
WATCHFILES_FORCE_POLLING: true
122+
OTEL_SERVICE_NAME: "bootstrap-fastapi-dev"
123+
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
124+
CELERY__broker_url: "redis://redis:6379/0"
125+
CELERY__result_backend: "redis://redis:6379/1"
126+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
127+
ports:
128+
- '8000:8000'
129+
working_dir: "/app/src"
130+
volumes:
131+
- '.:/app'
132+
depends_on:
133+
- redis
134+
- otel-collector
135+
command:
136+
- faststream
137+
- docs
138+
- serve
139+
- event_consumer:create_app
140+
- --factory
141+
- --host
142+
- 0.0.0.0
86143

87144
http:
88145
build:
@@ -97,6 +154,7 @@ services:
97154
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
98155
CELERY__broker_url: "redis://redis:6379/0"
99156
CELERY__result_backend: "redis://redis:6379/1"
157+
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
100158
ports:
101159
- '8001:8000'
102160
volumes:

poetry.lock

Lines changed: 21 additions & 17 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,8 @@ aiosqlite = ">=0.18.0"
2323
alembic = "^1.11.1"
2424
asgiref = "^3.7.2"
2525
celery = { version = "^5.3.1", extras = ["redis"] }
26-
cloudevents-pydantic = "^0.0.2"
26+
cloudevents-pydantic = { git = "https://github.com/febus982/cloudevents-pydantic", branch = "main" }
27+
#cloudevents-pydantic = "^0.0.2"
2728
dependency-injector = { version = "^4.41.0", extras = ["pydantic"] }
2829
faststream = { version = "^0.5.25", extras = ["redis"] }
2930
httpx = ">=0.23.0"
@@ -99,9 +100,7 @@ exclude_also = [
99100
[tool.mypy]
100101
files = ["src", "tests"]
101102
exclude = ["alembic"]
102-
# Pydantic plugin causes some issues: https://github.com/pydantic/pydantic-settings/issues/403
103-
#plugins = "pydantic.mypy,strawberry.ext.mypy_plugin"
104-
plugins = "strawberry.ext.mypy_plugin"
103+
plugins = "pydantic.mypy,strawberry.ext.mypy_plugin"
105104
python_version = "3.9"
106105

107106
# We can remove celery by installing `celery-types` but needs

src/bootstrap/bootstrap.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
from typing import cast, Any
1+
from typing import cast
22

33
from celery import Celery
44
from dependency_injector.containers import DynamicContainer
55
from dependency_injector.providers import Object
6-
from faststream.broker.core.usecase import BrokerUsecase
76
from faststream.redis import RedisBroker
7+
88
# from gateways.event import FastStreamRedisGateway
99
from pydantic import BaseModel, ConfigDict
1010

src/bootstrap/config.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
# mypy: disable-error-code="call-arg,syntax"
2+
# `call-arg` is because of nested models (they have to be supplied via ENV)
3+
# `syntax` is because of Pydantic plugin
4+
# https://github.com/pydantic/pydantic-settings/issues/403
15
from pathlib import Path
26
from typing import Dict, Literal, Optional
37

@@ -43,15 +47,20 @@ class CeleryConfig(BaseModel):
4347

4448

4549
class EventConfig(BaseModel):
46-
REDIS_BROKER_URL: str
50+
REDIS_BROKER_URL: str = ""
51+
TOPIC: Optional[str] = None
4752

4853

4954
class AppConfig(BaseSettings):
50-
model_config = SettingsConfigDict(env_nested_delimiter="__")
55+
model_config = SettingsConfigDict(
56+
case_sensitive=True,
57+
env_nested_delimiter="__",
58+
nested_model_default_partial_update=True,
59+
)
5160

5261
APP_NAME: str = "bootstrap"
5362
CELERY: CeleryConfig = CeleryConfig()
54-
EVENTS: EventConfig
63+
EVENTS: EventConfig = EventConfig()
5564
DEBUG: bool = False
5665
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
5766
SQLALCHEMY_CONFIG: Dict[str, SQLAlchemyConfig] = dict(

src/bootstrap/faststream.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
# from typing import Dict, Union
2+
from typing import Optional
23

34
import structlog
5+
from faststream import Logger
6+
47
# from domains.events import get_topic_registry
58
from faststream.redis import RedisBroker
9+
610
# from faststream.redis.publisher.asyncapi import AsyncAPIPublisher
711
from opentelemetry.instrumentation.faststream import RedisOtelMiddleware
812

13+
from domains import event_registry
14+
915
from .config import AppConfig
1016

1117

@@ -15,10 +21,35 @@ def init_broker(config: AppConfig) -> RedisBroker:
1521
middlewares=(RedisOtelMiddleware,),
1622
logger=structlog.getLogger("faststream.broker"),
1723
)
24+
register_publishers(broker, config.EVENTS.TOPIC)
25+
register_subscribers(broker, config.EVENTS.TOPIC)
1826

1927
return broker
2028

2129

30+
def register_subscribers(broker, topic: Optional[str] = None):
31+
if topic is not None and topic in event_registry.keys():
32+
topics_map = {topic: event_registry[topic]}
33+
else:
34+
topics_map = event_registry.copy()
35+
36+
for topic, event_type in topics_map.items():
37+
38+
@broker.subscriber(topic)
39+
async def handler(msg: event_type, logger: Logger) -> None: # type: ignore[valid-type]
40+
logger.info(f"Received message {type(msg)} {msg}")
41+
42+
43+
def register_publishers(broker, topic: Optional[str] = None):
44+
if topic is not None and topic in event_registry.keys():
45+
topics_map = {topic: event_registry[topic]}
46+
else:
47+
topics_map = event_registry.copy()
48+
49+
for topic, event_type in topics_map.items():
50+
broker.publisher(topic, schema=event_registry[topic])
51+
52+
2253
# def init_publishers(
2354
# broker: RedisBroker,
2455
# ) -> Dict[str, AsyncAPIPublisher]:

src/domains/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from ._event_registry import event_registry

src/domains/_event_registry.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
from typing import Annotated, Dict, Union
2+
3+
from pydantic import Field
4+
from typing_extensions import TypeAlias
5+
6+
from .books.events import BookCreatedV1, BookUpdatedV1
7+
8+
event_registry: Dict[str, TypeAlias] = {
9+
"books": Annotated[Union[BookCreatedV1, BookUpdatedV1], Field(discriminator="type")]
10+
}

src/event_consumer/__init__.py

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,27 +3,21 @@
33
application layers (storage, logs) when running standalone workers
44
without having to initialise the HTTP framework (or other ones)
55
"""
6+
67
import os
7-
from typing import Annotated, Union, Dict, Type, Optional, List
8+
from typing import Union
89

910
import structlog
10-
from pydantic import Field
11-
12-
from bootstrap import AppConfig, application_init
13-
from faststream import FastStream, Logger
11+
from faststream import FastStream
1412
from opentelemetry import trace
1513
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
1614
from opentelemetry.sdk.resources import Resource
1715
from opentelemetry.sdk.trace import TracerProvider
1816
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1917

20-
from domains.books.events import BookCreatedV1, BookUpdatedV1, BookCreatedV1Data
18+
from bootstrap import AppConfig, application_init
19+
from domains.books.events import BookCreatedV1, BookCreatedV1Data
2120

22-
_event_registry: Dict[str, Type] = {
23-
'books': Annotated[
24-
Union[BookCreatedV1, BookUpdatedV1], Field(discriminator="type")
25-
]
26-
}
2721

2822
def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
2923
resource = Resource.create(attributes={"service.name": service_name})
@@ -35,41 +29,24 @@ def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
3529
return tracer_provider
3630

3731

38-
def register_subscribers(broker, topics: Optional[List[str]] = None):
39-
if topics is None:
40-
topics_map: Dict[str, Type] = _event_registry
41-
else:
42-
topics_map: Dict[str, Type] = {k: v for k, v in _event_registry.items() if k in topics}
43-
44-
45-
for topic, event_type in topics_map.items():
46-
@broker.subscriber(topic) # type: ignore
47-
async def handler(msg: event_type, logger: Logger) -> None:
48-
logger.info(f"Received message {type(msg)} {msg}")
49-
# logger.info(f"Received message {type(msg)} {msg}", extra={"msg": "some_extra_here"})
50-
# l = logging.getLogger()
51-
# l.info("AAAAA", extra={"eee": "AAA"})
52-
53-
54-
def create_app(
55-
test_config: Union[AppConfig, None] = None
56-
) -> FastStream:
57-
setup_telemetry("faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"])
32+
def create_app(test_config: Union[AppConfig, None] = None) -> FastStream:
33+
setup_telemetry(
34+
"faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
35+
)
5836
broker = application_init(AppConfig()).faststream_broker
5937
app = FastStream(broker, logger=structlog.get_logger())
60-
register_subscribers(broker)
61-
62-
publisher = broker.publisher("books", schema=_event_registry["books"])
6338

6439
@app.after_startup
6540
async def after_startup():
66-
await broker.publish(BookCreatedV1.event_factory(
67-
data=BookCreatedV1Data(
68-
book_id=123,
69-
title="AAA",
70-
author_name="BBB",
71-
)
72-
), "books")
73-
41+
await broker.publish(
42+
BookCreatedV1.event_factory(
43+
data=BookCreatedV1Data(
44+
book_id=123,
45+
title="AAA",
46+
author_name="BBB",
47+
)
48+
),
49+
"books",
50+
)
7451

7552
return app

0 commit comments

Comments
 (0)