Skip to content

Commit a92b853

Browse files
committed
Remove faststream references
1 parent 7e09a3b commit a92b853

File tree

9 files changed

+64
-194
lines changed

9 files changed

+64
-194
lines changed

pyproject.toml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,13 @@ dependencies = [
2121
"opentelemetry-instrumentation-httpx",
2222
"opentelemetry-instrumentation-sqlalchemy",
2323
"opentelemetry-instrumentor-dramatiq",
24-
"opentelemetry-instrumentation-faststream",
2524
"orjson<4.0.0,>=3.10.12",
2625
"pydantic<3.0.0,>=2.2.1",
2726
"pydantic-settings<3.0.0,>=2.0.3",
2827
"rich<14.0.0,>=13.2.0",
2928
"SQLAlchemy[asyncio,mypy]<3.0.0,>=2.0.0",
3029
"sqlalchemy-bind-manager",
3130
"structlog<25.1.1,>=25.1.0",
32-
"faststream>=0.5.34",
3331
]
3432

3533
[dependency-groups]

src/common/bootstrap.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,22 +2,20 @@
22

33
from dependency_injector.containers import DynamicContainer
44
from dependency_injector.providers import Object
5-
from faststream.redis import RedisBroker
65

76
# from gateways.event import FastStreamRedisGateway
87
from pydantic import BaseModel, ConfigDict
98

109
from .config import AppConfig
1110
from .di_container import Container
1211
from .dramatiq import init_dramatiq
13-
from .faststream import init_broker
12+
from .event_publisher import init_broker
1413
from .logs import init_logger
1514
from .storage import init_storage
1615

1716

1817
class InitReference(BaseModel):
1918
di_container: DynamicContainer
20-
faststream_broker: RedisBroker
2119

2220
model_config = ConfigDict(arbitrary_types_allowed=True)
2321

src/common/event_publisher.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
# from typing import Dict, Union
2+
from typing import Optional
3+
4+
import structlog
5+
6+
from domains import event_registry
7+
8+
from .config import EventConfig
9+
10+
logger = structlog.getLogger(__name__)
11+
12+
13+
def init_broker(config: EventConfig):
14+
15+
if config.REGISTER_PUBLISHERS:
16+
register_publishers(config.SUBSCRIBER_TOPIC)
17+
18+
19+
20+
def register_publishers(topic: Optional[str] = None):
21+
if topic is not None and topic in event_registry.keys():
22+
logger.info(f"Registering publishers for topic {topic}")
23+
else:
24+
logger.info(f"Registering publishers for all topics")

src/common/faststream.py

Lines changed: 0 additions & 37 deletions
This file was deleted.

src/common/logs/processors.py

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
from faststream import context
21
from opentelemetry import trace
32
from structlog.typing import EventDict
43

@@ -13,29 +12,6 @@ def extract_from_record(_, __, event_dict: EventDict) -> EventDict:
1312
return event_dict
1413

1514

16-
def faststream_context(_, __, event_dict: EventDict) -> EventDict:
17-
"""
18-
Extract FastStream context information and adds them to the event dict.
19-
"""
20-
c = context.get_local("log_context") or {}
21-
event_context = event_dict.get(
22-
"event_context",
23-
c.copy(),
24-
)
25-
26-
# Handle undesired extra override from FastStream
27-
extra = event_dict.get("extra", {}).copy()
28-
if {"channel", "message_id"} == set(extra.keys()):
29-
event_context["channel"] = extra["channel"]
30-
event_context["message_id"] = extra["message_id"]
31-
del event_dict["extra"]
32-
33-
if event_context:
34-
event_dict["event_context"] = event_context
35-
36-
return event_dict
37-
38-
3915
def drop_color_message_key(_, __, event_dict: EventDict) -> EventDict:
4016
"""
4117
Uvicorn logs the message a second time in the extra `color_message`, but we don't

src/event_consumer/__init__.py

Lines changed: 39 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,6 @@
88
from typing import Optional, Union
99

1010
import structlog
11-
from faststream import FastStream
12-
from faststream.redis import RedisBroker, RedisRouter
13-
from opentelemetry import trace
14-
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
15-
from opentelemetry.sdk.resources import Resource
16-
from opentelemetry.sdk.trace import TracerProvider
17-
from opentelemetry.sdk.trace.export import BatchSpanProcessor
1811

1912
from common import AppConfig, application_init
2013
from conftest import test_config
@@ -29,58 +22,58 @@
2922
subscriber_registry = event_registry
3023

3124

32-
def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
33-
resource = Resource.create(attributes={"service.name": service_name})
34-
tracer_provider = TracerProvider(resource=resource)
35-
exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
36-
processor = BatchSpanProcessor(exporter)
37-
tracer_provider.add_span_processor(processor)
38-
trace.set_tracer_provider(tracer_provider)
39-
return tracer_provider
25+
# def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
26+
# resource = Resource.create(attributes={"service.name": service_name})
27+
# tracer_provider = TracerProvider(resource=resource)
28+
# exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
29+
# processor = BatchSpanProcessor(exporter)
30+
# tracer_provider.add_span_processor(processor)
31+
# trace.set_tracer_provider(tracer_provider)
32+
# return tracer_provider
4033

4134

42-
def create_app(test_config: Union[AppConfig, None] = None) -> FastStream:
35+
def create_app(test_config: Union[AppConfig, None] = None):
4336
config = test_config or AppConfig()
44-
setup_telemetry(
45-
"faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
46-
)
47-
broker = application_init(config).faststream_broker
48-
register_subscribers(broker)
37+
# setup_telemetry(
38+
# "faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
39+
# )
40+
ref = application_init(config)
41+
# register_subscribers(broker)
4942
if config.EVENTS.REGISTER_SUBSCRIBERS:
50-
register_subscribers(broker)
51-
52-
app = FastStream(broker, logger=structlog.get_logger())
53-
54-
@app.after_startup
55-
async def after_startup():
56-
await broker.publish(
57-
BookCreatedV1.event_factory(
58-
data=BookCreatedV1Data(
59-
book_id=123,
60-
title="AAA",
61-
author_name="BBB",
62-
)
63-
),
64-
"books",
65-
)
66-
67-
return app
43+
register_subscribers(config.EVENTS.SUBSCRIBER_TOPIC)
44+
45+
# app = FastStream(broker, logger=structlog.get_logger())
46+
#
47+
# @app.after_startup
48+
# async def after_startup():
49+
# await broker.publish(
50+
# BookCreatedV1.event_factory(
51+
# data=BookCreatedV1Data(
52+
# book_id=123,
53+
# title="AAA",
54+
# author_name="BBB",
55+
# )
56+
# ),
57+
# "books",
58+
# )
59+
#
60+
# return app
6861

6962

7063
# TODO: Add Routing structure similar to the one in the fastapi implementation
71-
def register_subscribers(broker: RedisBroker, topic: Optional[str] = None):
64+
def register_subscribers(topic: Optional[str] = None):
7265
if topic is not None and topic in subscriber_registry.keys():
7366
topics_map = {topic: subscriber_registry[topic]}
7467
else:
7568
topics_map = subscriber_registry.copy()
7669

7770
logger = structlog.get_logger()
78-
router = RedisRouter()
7971

8072
for topic, event_type in topics_map.items():
73+
logger.info(f"Registering {event_type} on topic {topic}")
8174

82-
@router.subscriber(topic)
83-
async def handler(msg: event_type) -> None: # type: ignore[valid-type]
84-
logger.info(f"Received message {type(msg)} {msg}")
85-
86-
broker.include_router(router)
75+
# @router.subscriber(topic)
76+
# async def handler(msg: event_type) -> None: # type: ignore[valid-type]
77+
# logger.info(f"Received message {type(msg)} {msg}")
78+
#
79+
# broker.include_router(router)

src/gateways/event.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -11,32 +11,3 @@ async def emit(
1111
"Event emitted",
1212
cloudevent=event.model_dump(),
1313
)
14-
15-
16-
# class FastStreamRedisGateway:
17-
# _broker: RedisBroker
18-
# _publishers: Dict[type[BaseEvent], AsyncAPIPublisher]
19-
#
20-
# def __init__(
21-
# self,
22-
# broker: RedisBroker,
23-
# topic_filter: Optional[Collection[str]] = None,
24-
# ):
25-
# self._broker = broker
26-
# publishers = {
27-
# topic: broker.publisher(topic, schema=Union[event_types])
28-
# for topic, event_types in get_topic_registry(topic_filter).items()
29-
# }
30-
# self._publishers = {
31-
# event_type: publishers[topic]
32-
# for topic, event_types in get_topic_registry(topic_filter).items()
33-
# for event_type in event_types
34-
# }
35-
#
36-
# async def emit(
37-
# self, event: BaseEvent
38-
# ) -> None: # pragma: no cover # No need to test this
39-
# try:
40-
# await self._publishers[type(event)].publish(event)
41-
# except KeyError:
42-
# raise RuntimeError(f"Unknown event type: {type(event)}")

src/http_app/__init__.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
from typing import Any, Union
22

33
from fastapi import FastAPI, Request
4-
from faststream.broker.core.usecase import BrokerUsecase
5-
from faststream.redis import RedisRouter, fastapi
64
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
75
from starlette.responses import JSONResponse
86
from starlette_prometheus import PrometheusMiddleware, metrics
@@ -69,9 +67,3 @@ async def add_exception_middleware(request: Request, call_next):
6967
logger = get_logger(__name__)
7068
await logger.aexception(e)
7169
return JSONResponse({"error": "Internal server error"}, status_code=500)
72-
73-
74-
def add_faststream_router(app: FastAPI, router: RedisRouter) -> None:
75-
f_router = fastapi.RedisRouter()
76-
f_router.include_router(router)
77-
app.include_router(f_router)

uv.lock

Lines changed: 0 additions & 45 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)