Skip to content

Commit e8c6072

Browse files
committed
Test FastStream 0.6.0
1 parent 59b1ba1 commit e8c6072

File tree

11 files changed

+74
-66
lines changed

11 files changed

+74
-66
lines changed

.pre-commit-config.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ repos:
9191
rev: 0.7.19
9292
hooks:
9393
- id: uv-lock
94+
args: [--prerelease=allow]
9495

9596
- repo: local
9697
hooks:

data_rentgen/consumer/__init__.py

Lines changed: 29 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@
88
import anyio
99
from fast_depends import dependency_provider
1010
from faststream import ContextRepo, FastStream
11-
from faststream._compat import ExceptionGroup
11+
from faststream._internal._compat import ExceptionGroup
1212
from faststream.asgi import AsgiFastStream, AsgiResponse, get
1313
from faststream.kafka import KafkaBroker
14-
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
14+
from faststream.kafka.publisher import DefaultPublisher
15+
from faststream.kafka.subscriber.usecase import BatchSubscriber
16+
from faststream.specification.asyncapi import AsyncAPI
1517
from sqlalchemy.ext.asyncio import AsyncSession
1618

1719
import data_rentgen
1820
from data_rentgen.consumer.settings import ConsumerApplicationSettings
1921
from data_rentgen.consumer.subscribers import runs_events_subscriber
20-
from data_rentgen.db.factory import create_session_factory
22+
from data_rentgen.db.factory import session_generator
2123
from data_rentgen.logging.setup_logging import setup_logging
2224

2325
logger = logging.getLogger(__name__)
@@ -43,21 +45,34 @@ def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
4345
# register subscribers using settings
4446
consumer_settings = settings.consumer.model_dump(exclude={"topics_list", "topics_pattern", "malformed_topic"})
4547

46-
subscriber = broker.subscriber(
48+
subscribe = broker.subscriber(
4749
*settings.consumer.topics_list,
4850
pattern=settings.consumer.topics_pattern,
4951
**consumer_settings,
5052
batch=True,
5153
# Disable parsing JSONs on FastStream level
5254
decoder=lambda _: None,
5355
)
56+
57+
# register subscriber
58+
batch_subscriber = subscribe(runs_events_subscriber)
59+
60+
async def get_subscriber():
61+
return batch_subscriber
62+
63+
# FastStream uses WeakSet for subscribers, so we need to keep long lived reference somewhere
64+
dependency_provider.override(BatchSubscriber, get_subscriber)
65+
66+
# register publisher
5467
publisher = broker.publisher(settings.producer.malformed_topic)
5568

56-
# perform registration
57-
subscriber(runs_events_subscriber)
69+
async def get_publisher():
70+
return publisher
71+
72+
dependency_provider.override(DefaultPublisher, get_publisher)
5873

59-
dependency_provider.override(AsyncSession, create_session_factory(settings.database))
60-
dependency_provider.override(AsyncAPIDefaultPublisher, lambda: publisher)
74+
# Override session generator
75+
dependency_provider.override(AsyncSession, session_generator(settings.database))
6176
return broker
6277

6378

@@ -78,11 +93,13 @@ async def security_lifespan(context: ContextRepo):
7893
raise exception from None
7994

8095
return FastStream(
81-
broker=broker_factory(settings),
96+
broker_factory(settings),
8297
lifespan=security_lifespan,
83-
title="Data.Rentgen",
84-
description="Data.Rentgen is a nextgen DataLineage service",
85-
version=data_rentgen.__version__,
98+
specification=AsyncAPI(
99+
title="Data.Rentgen",
100+
description="Data.Rentgen is a nextgen DataLineage service",
101+
version=data_rentgen.__version__,
102+
),
86103
logger=logger,
87104
).as_asgi(asgi_routes=[("/monitoring/ping", liveness)])
88105

data_rentgen/consumer/__main__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import sys
99
from pathlib import Path
1010

11-
from faststream.cli.main import cli
11+
from faststream.cli import cli
1212

1313
here = Path(__file__).resolve()
1414

data_rentgen/consumer/subscribers.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,17 @@
55

66
import asyncio
77
from collections.abc import AsyncGenerator
8-
from typing import cast
8+
from typing import Annotated, cast
99

1010
from aiokafka import ConsumerRecord
1111
from faststream import Depends, Logger, NoCast
1212
from faststream.kafka import KafkaMessage
13-
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
13+
from faststream.kafka.publisher import DefaultPublisher
1414
from pydantic import TypeAdapter
1515
from sqlalchemy.ext.asyncio import AsyncSession
1616

1717
from data_rentgen.consumer.extractors import BatchExtractionResult, BatchExtractor
18-
from data_rentgen.dependencies import Stub
18+
from data_rentgen.dependencies.stub import Stub
1919
from data_rentgen.openlineage.run_event import OpenLineageRunEvent
2020
from data_rentgen.services.uow import UnitOfWork
2121

@@ -30,8 +30,8 @@ async def runs_events_subscriber(
3030
_events: NoCast[list[OpenLineageRunEvent]],
3131
batch: KafkaMessage,
3232
logger: Logger,
33-
publisher: AsyncAPIDefaultPublisher = Depends(Stub(AsyncAPIDefaultPublisher)),
34-
session: AsyncSession = Depends(Stub(AsyncSession)),
33+
publisher: Annotated[DefaultPublisher, Depends(Stub(DefaultPublisher))],
34+
session: Annotated[AsyncSession, Depends(Stub(AsyncSession))],
3535
):
3636
message_id = batch.message_id
3737
correlation_id = batch.correlation_id
@@ -182,7 +182,7 @@ async def report_malformed(
182182
messages: list[ConsumerRecord],
183183
message_id: str,
184184
correlation_id: str,
185-
publisher: AsyncAPIDefaultPublisher,
185+
publisher: DefaultPublisher,
186186
):
187187
# Return malformed messages back to the broker
188188
for message in messages:

data_rentgen/http2kafka/__init__.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66

77
import anyio
88
from fastapi import FastAPI
9-
from faststream._compat import ExceptionGroup
9+
from faststream._internal._compat import ExceptionGroup
1010
from faststream.kafka import KafkaBroker
11-
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
11+
from faststream.kafka.publisher import DefaultPublisher
1212
from sqlalchemy.ext.asyncio import AsyncSession
1313

1414
import data_rentgen
@@ -94,7 +94,7 @@ async def get_publisher():
9494
application.dependency_overrides.update(
9595
{
9696
Http2KafkaApplicationSettings: get_settings,
97-
AsyncAPIDefaultPublisher: get_publisher,
97+
DefaultPublisher: get_publisher,
9898
AsyncSession: session_generator(settings.database), # type: ignore[dict-item]
9999
},
100100
)

data_rentgen/http2kafka/router/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from asgi_correlation_id import correlation_id
88
from fastapi import APIRouter, Body, Depends, Request, Response
9-
from faststream.kafka.publisher.asyncapi import AsyncAPIDefaultPublisher
9+
from faststream.kafka.publisher import DefaultPublisher
1010

1111
from data_rentgen.db.models.user import User
1212
from data_rentgen.dependencies.stub import Stub
@@ -37,7 +37,7 @@
3737
async def send_events_to_kafka(
3838
event: Annotated[OpenLineageRunEvent, Body()],
3939
request: Request,
40-
kafka_publisher: Annotated[AsyncAPIDefaultPublisher, Depends(Stub(AsyncAPIDefaultPublisher))],
40+
kafka_publisher: Annotated[DefaultPublisher, Depends(Stub(DefaultPublisher))],
4141
current_user: Annotated[User, Depends(get_user(personal_token_policy=PersonalTokenPolicy.REQUIRE))],
4242
):
4343
body_json_bytes = await request.body()

data_rentgen/logging/presets/colored.yml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ filters:
1010
(): asgi_correlation_id.CorrelationIdFilter
1111
uuid_length: 32
1212
default_value: '-'
13-
faststream:
14-
(): faststream.log.logging.ExtendedFilter
15-
default_context:
16-
topic: ''
17-
group_id: ''
18-
message_id_ln: 10
1913

2014
formatters:
2115
colored:
@@ -25,7 +19,7 @@ formatters:
2519
datefmt: '%Y-%m-%d %H:%M:%S'
2620
kafka_colored:
2721
(): coloredlogs.ColoredFormatter
28-
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(topic)s %(group_id)s %(message_id)s %(message)s'
22+
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(message)s'
2923
datefmt: '%Y-%m-%d %H:%M:%S'
3024

3125
handlers:
@@ -37,7 +31,7 @@ handlers:
3731
faststream:
3832
class: logging.StreamHandler
3933
formatter: kafka_colored
40-
filters: [faststream]
34+
filters: []
4135
stream: ext://sys.stdout
4236

4337
loggers:

data_rentgen/logging/presets/json.yml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,6 @@ filters:
99
(): asgi_correlation_id.CorrelationIdFilter
1010
uuid_length: 32
1111
default_value: '-'
12-
faststream:
13-
(): faststream.log.logging.ExtendedFilter
14-
default_context:
15-
topic: ''
16-
group_id: ''
17-
message_id_ln: 10
1812

1913
formatters:
2014
json:
@@ -24,7 +18,7 @@ formatters:
2418
timestamp: true
2519
kafka_json:
2620
(): pythonjsonlogger.jsonlogger.JsonFormatter
27-
fmt: '%(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(topic)s %(group_id)s %(message_id)s %(message)s'
21+
fmt: '%(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(message)s'
2822
timestamp: true
2923

3024
handlers:
@@ -36,7 +30,7 @@ handlers:
3630
faststream:
3731
class: logging.StreamHandler
3832
formatter: kafka_json
39-
filters: [faststream]
33+
filters: []
4034
stream: ext://sys.stdout
4135

4236
loggers:

data_rentgen/logging/presets/plain.yml

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,6 @@ filters:
1010
(): asgi_correlation_id.CorrelationIdFilter
1111
uuid_length: 32
1212
default_value: '-'
13-
faststream:
14-
(): faststream.log.logging.ExtendedFilter
15-
default_context:
16-
topic: ''
17-
group_id: ''
18-
message_id_ln: 10
1913

2014
formatters:
2115
plain:
@@ -25,7 +19,7 @@ formatters:
2519
datefmt: '%Y-%m-%d %H:%M:%S'
2620
kafka_plain:
2721
(): logging.Formatter
28-
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(topic)s %(group_id)s %(message_id)s %(message)s'
22+
fmt: '%(asctime)s.%(msecs)03d %(processName)s:%(process)d %(name)s:%(lineno)d [%(levelname)s] %(message)s'
2923
datefmt: '%Y-%m-%d %H:%M:%S'
3024

3125
handlers:
@@ -37,7 +31,7 @@ handlers:
3731
faststream:
3832
class: logging.StreamHandler
3933
formatter: kafka_plain
40-
filters: [faststream]
34+
filters: []
4135
stream: ext://sys.stdout
4236

4337
loggers:

pyproject.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ server = [
7575
"cachetools~=6.2.0",
7676
]
7777
consumer = [
78-
"faststream[kafka,cli]~=0.5.44",
78+
"faststream[kafka,cli]~=0.6.0rc2",
7979
"cramjam~=2.11.0",
8080
"pydantic-settings~=2.10.1",
8181
"alembic~=1.16.2",
@@ -90,8 +90,8 @@ consumer = [
9090
"packaging~=25.0",
9191
]
9292
http2kafka = [
93-
"fastapi>=0.115.14,<0.117.0",
94-
"starlette>=0.46.2,<0.48.0",
93+
"fastapi~=0.116.1",
94+
"starlette~=0.47.2",
9595
"uvicorn~=0.35.0",
9696
"starlette-exporter~=0.23.0",
9797
"asgi-correlation-id~=4.3.4",
@@ -101,7 +101,7 @@ http2kafka = [
101101
"coloredlogs~=15.0.1",
102102
"uuid6~=2025.0.0",
103103
"packaging~=25.0",
104-
"faststream[kafka,cli]~=0.5.44",
104+
"faststream[kafka,cli]~=0.6.0rc2",
105105
"cramjam~=2.11.0",
106106
"cachetools~=6.2.0",
107107
]
@@ -155,7 +155,7 @@ docs = [
155155
# uncomment after https://github.com/zqmillet/sphinx-plantuml/pull/4
156156
# "sphinx-plantuml~=1.0.0",
157157
"sphinx-argparse~=0.5.2",
158-
"sphinx-tabs>=3.4.7",
158+
"sphinx-tabs~=3.4.7",
159159
]
160160

161161
[tool.pytest.ini_options]

0 commit comments

Comments
 (0)