Skip to content

Events consumer/producer #240

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 12 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 61 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ services:
env_file: local.env
environment:
OTEL_SERVICE_NAME: "bootstrap-fastapi-dev"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
EVENTS__IS_PUBLISHER: "True"
EVENTS__IS_SUBSCRIBER: "False"
ports:
- '8000:8000'
working_dir: "/app/src"
Expand All @@ -24,7 +28,60 @@ services:
- "8000"
- --factory
# Remember to disable the reloader in order to allow otel instrumentation
- --reload
# - --reload

event-consumer:
build:
dockerfile: Dockerfile
context: .
target: dev
environment:
OTEL_SERVICE_NAME: "bootstrap-fastapi-dev"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
EVENTS__REGISTER_PUBLISHERS: "True"
EVENTS__REGISTER_SUBSCRIBERS: "True"
working_dir: "/app/src"
volumes:
- '.:/app'
depends_on:
- redis
- otel-collector
command:
- faststream
- run
# - --help
- --factory
- event_consumer:create_app

event-docs:
build:
dockerfile: Dockerfile
context: .
target: dev
environment:
WATCHFILES_FORCE_POLLING: true
OTEL_SERVICE_NAME: "bootstrap-fastapi-dev"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
EVENTS__REGISTER_PUBLISHERS: "True"
EVENTS__REGISTER_SUBSCRIBERS: "True"
ports:
- '8000:8000'
working_dir: "/app/src"
volumes:
- '.:/app'
depends_on:
- redis
- otel-collector
command:
- faststream
- docs
- serve
- event_consumer:create_app
- --factory
- --host
- 0.0.0.0

# Production image
http:
Expand All @@ -37,6 +94,9 @@ services:
env_file: local.env
environment:
OTEL_SERVICE_NAME: "bootstrap-fastapi-http"
OTEL_EXPORTER_OTLP_ENDPOINT: "http://otel-collector:4317"
EVENTS__REDIS_BROKER_URL: "redis://redis:6379/2"
EVENTS__REGISTER_PUBLISHERS: "True"
ports:
- '8001:8000'
volumes:
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
authors = [
{name = "Federico Busetti", email = "[email protected]"},
]
requires-python = "<3.14,>=3.9"
requires-python = "<3.14,>=3.11"
name = "bootstrap-fastapi-service"
version = "0.1.0"
description = ""
Expand All @@ -14,7 +14,7 @@ dependencies = [
"cloudevents-pydantic<1.0.0,>=0.0.3",
"dependency-injector[pydantic]<5.0.0,>=4.41.0",
"dramatiq[redis,watch]<2.0.0,>=1.17.1",
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"hiredis<4.0.0,>=3.1.0", # Recommended by dramatiq
"httpx>=0.23.0",
"opentelemetry-distro[otlp]",
"opentelemetry-instrumentation",
Expand Down
9 changes: 9 additions & 0 deletions src/common/bootstrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,14 @@

from dependency_injector.containers import DynamicContainer
from dependency_injector.providers import Object

# from gateways.event import FastStreamRedisGateway
from pydantic import BaseModel, ConfigDict

from .config import AppConfig
from .di_container import Container
from .dramatiq import init_dramatiq
from .event_publisher import init_broker
from .logs import init_logger
from .storage import init_storage

Expand All @@ -27,7 +30,13 @@ def application_init(app_config: AppConfig) -> InitReference:
init_logger(app_config)
init_storage()
init_dramatiq(app_config)
router = init_broker(app_config.EVENTS)
# This is temporary, has to go directly in the Container
# container.BookEventGatewayInterface.override(
# Object(FastStreamRedisGateway(broker=broker))
# )

return InitReference(
di_container=container,
faststream_broker=router,
)
8 changes: 8 additions & 0 deletions src/common/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,19 @@ class AuthConfig(BaseModel):
JWKS_URL: Optional[str] = None


class EventConfig(BaseModel):
REDIS_BROKER_URL: str = ""
SUBSCRIBER_TOPIC: Optional[str] = None
REGISTER_PUBLISHERS: bool = False
REGISTER_SUBSCRIBERS: bool = False


class AppConfig(BaseSettings):
model_config = SettingsConfigDict(env_nested_delimiter="__")

APP_NAME: str = "bootstrap"
AUTH: AuthConfig = AuthConfig()
EVENTS: EventConfig = EventConfig()
DRAMATIQ: DramatiqConfig = DramatiqConfig()
DEBUG: bool = False
ENVIRONMENT: TYPE_ENVIRONMENT = "local"
Expand Down
24 changes: 24 additions & 0 deletions src/common/event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# from typing import Dict, Union
from typing import Optional

import structlog

from domains import event_registry

from .config import EventConfig

logger = structlog.getLogger(__name__)


def init_broker(config: EventConfig):

if config.REGISTER_PUBLISHERS:
register_publishers(config.SUBSCRIBER_TOPIC)



def register_publishers(topic: Optional[str] = None):
if topic is not None and topic in event_registry.keys():
logger.info(f"Registering publishers for topic {topic}")
else:
logger.info(f"Registering publishers for all topics")
1 change: 1 addition & 0 deletions src/domains/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from ._event_registry import event_registry
10 changes: 10 additions & 0 deletions src/domains/_event_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Annotated, Dict, Union

from pydantic import Field
from typing_extensions import TypeAlias

from .books.events import BookCreatedV1, BookUpdatedV1

event_registry: Dict[str, TypeAlias] = {
"books": Annotated[Union[BookCreatedV1, BookUpdatedV1], Field(discriminator="type")]
}
79 changes: 79 additions & 0 deletions src/event_consumer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
"""
This is a tiny layer that takes care of initialising the shared
application layers (storage, logs) when running standalone workers
without having to initialise the HTTP framework (or other ones)
"""

import os
from typing import Optional, Union

import structlog

from common import AppConfig, application_init
from conftest import test_config
from domains import event_registry
from domains.books.events import BookCreatedV1, BookCreatedV1Data

"""
For the sake of this example app we reuse the domain registry,
which is used for publishing. In a real world these registries
are different and separate.
"""
subscriber_registry = event_registry


# def setup_telemetry(service_name: str, otlp_endpoint: str) -> TracerProvider:
# resource = Resource.create(attributes={"service.name": service_name})
# tracer_provider = TracerProvider(resource=resource)
# exporter = OTLPSpanExporter(endpoint=otlp_endpoint)
# processor = BatchSpanProcessor(exporter)
# tracer_provider.add_span_processor(processor)
# trace.set_tracer_provider(tracer_provider)
# return tracer_provider


def create_app(test_config: Union[AppConfig, None] = None):
config = test_config or AppConfig()
# setup_telemetry(
# "faststream", otlp_endpoint=os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"]
# )
ref = application_init(config)
# register_subscribers(broker)
if config.EVENTS.REGISTER_SUBSCRIBERS:
register_subscribers(config.EVENTS.SUBSCRIBER_TOPIC)

# app = FastStream(broker, logger=structlog.get_logger())
#
# @app.after_startup
# async def after_startup():
# await broker.publish(
# BookCreatedV1.event_factory(
# data=BookCreatedV1Data(
# book_id=123,
# title="AAA",
# author_name="BBB",
# )
# ),
# "books",
# )
#
# return app


# TODO: Add Routing structure similar to the one in the fastapi implementation
def register_subscribers(topic: Optional[str] = None):
if topic is not None and topic in subscriber_registry.keys():
topics_map = {topic: subscriber_registry[topic]}
else:
topics_map = subscriber_registry.copy()

logger = structlog.get_logger()

for topic, event_type in topics_map.items():
logger.info(f"Registering {event_type} on topic {topic}")

# @router.subscriber(topic)
# async def handler(msg: event_type) -> None: # type: ignore[valid-type]
# logger.info(f"Received message {type(msg)} {msg}")
#
# broker.include_router(router)
20 changes: 20 additions & 0 deletions src/gateways/event.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import httpx
from cloudevents_pydantic.events import CloudEvent
from structlog import get_logger

Expand All @@ -11,3 +12,22 @@ async def emit(
"Event emitted",
cloudevent=event.model_dump(),
)


class HttpEventGateway:
def __init__(self):
self.client = httpx.AsyncClient()

async def emit(
self, event: CloudEvent
) -> None:
logger = get_logger()
await logger.ainfo(
"Event emitted via HTTP request",
cloudevent=event.model_dump(),
)


# https://www.confluent.io/blog/kafka-python-asyncio-integration/
class KafkaEventGateway:
pass
6 changes: 4 additions & 2 deletions src/http_app/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Union
from typing import Any, Union

from fastapi import FastAPI, Request
from opentelemetry.instrumentation.asgi import OpenTelemetryMiddleware
Expand All @@ -25,14 +25,16 @@ def create_app(
"""
context.app_config.set(app_config)

application_init(app_config)
ref = application_init(app_config)

app = FastAPI(
debug=app_config.DEBUG,
title=app_config.APP_NAME,
)
init_exception_handlers(app)

init_routes(app)
# add_faststream_router(app, ref.faststream_broker)

"""
OpenTelemetry prometheus exporter does not work together with automatic
Expand Down
2 changes: 2 additions & 0 deletions src/http_app/routes/ping.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from fastapi import APIRouter
from pydantic import BaseModel, ConfigDict
from starlette.responses import JSONResponse

router = APIRouter()

Expand All @@ -18,4 +19,5 @@ class PingResponse(BaseModel):

@router.get("/ping")
async def ping() -> PingResponse:
JSONResponse({"ping": "pong!"})
return PingResponse(ping="pong!")
Loading
Loading