Skip to content

Commit 1d68fee

Browse files
committed
[DOP-22883] Add consumer-specific settings
1 parent 66fdd45 commit 1d68fee

File tree

9 files changed

+302
-31
lines changed

9 files changed

+302
-31
lines changed

data_rentgen/consumer/__init__.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,30 +9,40 @@
99
from sqlalchemy.ext.asyncio import AsyncSession
1010

1111
import data_rentgen
12-
from data_rentgen.consumer.handlers import router
1312
from data_rentgen.consumer.settings import ConsumerApplicationSettings
1413
from data_rentgen.consumer.settings.security import get_broker_security
14+
from data_rentgen.consumer.subscribers import runs_events_subscriber
1515
from data_rentgen.db.factory import create_session_factory
1616
from data_rentgen.logging.setup_logging import setup_logging
1717

1818
logger = logging.getLogger(__name__)
1919

2020

21-
def broker_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
21+
def consumer_factory(settings: ConsumerApplicationSettings) -> KafkaBroker:
2222
broker = KafkaBroker(
2323
bootstrap_servers=settings.kafka.bootstrap_servers,
2424
security=get_broker_security(settings.kafka.security),
2525
compression_type=settings.kafka.compression.value if settings.kafka.compression else None,
26+
client_id=f"data-rentgen-{data_rentgen.__version__}",
2627
logger=logger,
2728
)
28-
broker.include_router(router)
29+
30+
# add subscribers using settings
31+
consumer_settings = settings.consumer.model_dump(exclude={"topics_list", "topics_pattern"})
32+
broker.subscriber(
33+
*settings.consumer.topics_list,
34+
pattern=settings.consumer.topics_pattern,
35+
**consumer_settings,
36+
batch=True,
37+
)(runs_events_subscriber)
38+
2939
dependency_provider.override(AsyncSession, create_session_factory(settings.database))
3040
return broker
3141

3242

3343
def application_factory(settings: ConsumerApplicationSettings) -> FastStream:
3444
return FastStream(
35-
broker=broker_factory(settings),
45+
broker=consumer_factory(settings),
3646
title="Data.Rentgen",
3747
description="Data.Rentgen is a nextgen DataLineage service",
3848
version=data_rentgen.__version__,

data_rentgen/consumer/settings/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pydantic import Field
55
from pydantic_settings import BaseSettings, SettingsConfigDict
66

7+
from data_rentgen.consumer.settings.consumer import ConsumerSettings
78
from data_rentgen.consumer.settings.kafka import KafkaSettings
89
from data_rentgen.db.settings import DatabaseSettings
910
from data_rentgen.logging.settings import LoggingSettings
@@ -38,12 +39,16 @@ class ConsumerApplicationSettings(BaseSettings):
3839
"""
3940

4041
database: DatabaseSettings = Field(description=":ref:`Database settings <configuration-database>`")
41-
kafka: KafkaSettings = Field(
42-
description=":ref:`Kafka settings <configuration-consumer-kafka>`",
43-
)
4442
logging: LoggingSettings = Field(
4543
default_factory=LoggingSettings,
4644
description=":ref:`Logging settings <configuration-consumer-logging>`",
4745
)
46+
kafka: KafkaSettings = Field(
47+
description=":ref:`Kafka settings <configuration-consumer-kafka>`",
48+
)
49+
consumer: ConsumerSettings = Field(
50+
default_factory=ConsumerSettings,
51+
description=":ref:`Consumer settings <configuration-consumer-specific>`",
52+
)
4853

4954
model_config = SettingsConfigDict(env_prefix="DATA_RENTGEN__", env_nested_delimiter="__", extra="forbid")

data_rentgen/consumer/settings/compression.py

Lines changed: 0 additions & 14 deletions
This file was deleted.
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
# SPDX-FileCopyrightText: 2024 MTS PJSC
2+
# SPDX-License-Identifier: Apache-2.0
3+
4+
import textwrap
5+
from typing import Literal
6+
7+
from pydantic import BaseModel, ByteSize, Field, model_validator
8+
9+
10+
class ConsumerSettings(BaseModel):
11+
"""Data.Rentgen consumer-specific settings.
12+
13+
These options are passed directly to
14+
`AIOKafkaConsumer <https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer>`_.
15+
16+
Examples
17+
--------
18+
19+
.. code-block:: bash
20+
21+
DATA_RENTGEN__CONSUMER__TOPICS_LIST=["input.runs"]
22+
DATA_RENTGEN__CONSUMER__GROUP_ID=data-rentgen
23+
DATA_RENTGEN__CONSUMER__FETCH_MAX_WAIT_MS=5000
24+
DATA_RENTGEN__CONSUMER__MAX_PARTITION_FETCH_BYTES=5MiB
25+
"""
26+
27+
topics_list: list[str] = Field(
28+
default=["input.runs"],
29+
description="List of Kafka topics to subscribe. Mutually exclusive with :obj:`~topics_pattern`.",
30+
)
31+
topics_pattern: str | None = Field(
32+
default=None,
33+
description="Regex pattern of topics to subscribe. Mutually exclusive with :obj:`~topics_list`.",
34+
)
35+
36+
@model_validator(mode="after")
37+
def _check_topics(self):
38+
if not self.topics_list and not self.topics_pattern:
39+
raise ValueError("input should contain either 'topics_list' or 'topics_pattern' field, both are empty")
40+
if self.topics_list and self.topics_pattern:
41+
raise ValueError("input should contain either 'topics_list' or 'topics_pattern' field, both are set")
42+
return self
43+
44+
group_id: str | None = Field(
45+
default="data-rentgen",
46+
description=textwrap.dedent(
47+
"""
48+
Name of the consumer group to join for dynamic partition assignment (if enabled),
49+
and to use for fetching and committing offsets.
50+
If ``None``, auto-partition assignment (via group coordinator) and offset commits are disabled.
51+
""",
52+
),
53+
)
54+
55+
# Defaults are copied from FastStream: https://github.com/airtai/faststream/blob/0.5.33/faststream/kafka/fastapi/fastapi.py#L618
56+
# But only options, related to consumer
57+
max_records: int | None = Field(
58+
default=None,
59+
description=textwrap.dedent(
60+
"""
61+
Number of messages to consume as one batch.
62+
``None`` means no limit applied.
63+
""",
64+
),
65+
)
66+
fetch_max_bytes: ByteSize = Field(
67+
default=ByteSize(50 * 1024 * 1024),
68+
description="The maximum amount of data the server should return for a fetch request.",
69+
)
70+
fetch_min_bytes: ByteSize = Field(
71+
default=ByteSize(1),
72+
description=textwrap.dedent(
73+
"""
74+
Minimum amount of data the server should
75+
return for a fetch request, otherwise wait up to
76+
:obj:`~fetch_max_wait_ms` for more data to accumulate.
77+
""",
78+
),
79+
)
80+
fetch_max_wait_ms: int = Field(
81+
default=500,
82+
description=textwrap.dedent(
83+
"""
84+
The maximum amount of time in milliseconds
85+
the server will block before answering the fetch request if
86+
there isn't sufficient data to immediately satisfy the
87+
requirement given by :obj:`~fetch_min_bytes`.
88+
""",
89+
),
90+
)
91+
max_partition_fetch_bytes: ByteSize = Field(
92+
default=ByteSize(1024 * 1024),
93+
description=textwrap.dedent(
94+
"""
95+
The maximum amount of data
96+
per-partition the server will return. The maximum total memory
97+
used for a request ``= #partitions * max_partition_fetch_bytes``.
98+
99+
This size must be at least as large as the maximum message size
100+
the server allows or else it is possible for the producer to
101+
send messages larger than the consumer can fetch. If that
102+
happens, the consumer can get stuck trying to fetch a large
103+
message on a certain partition.
104+
""",
105+
),
106+
)
107+
batch_timeout_ms: int = Field(
108+
default=200,
109+
description=textwrap.dedent(
110+
"""
111+
Milliseconds spent waiting if data is not available in the buffer.
112+
If 0, returns immediately with any records that are available currently in the buffer,
113+
else returns empty.
114+
""",
115+
),
116+
)
117+
auto_offset_reset: Literal["latest", "earliest", "none"] = Field(
118+
default="latest",
119+
description=textwrap.dedent(
120+
""""
121+
A policy for resetting offsets on ``OffsetOutOfRangeError`` errors:
122+
123+
* ``earliest`` will move to the oldest available message
124+
* ``latest`` will move to the most recent
125+
* ``none`` will raise an exception so you can handle this case
126+
""",
127+
),
128+
)
129+
max_poll_interval_ms: int = Field(
130+
default=5 * 60 * 1000,
131+
description=textwrap.dedent(
132+
"""
133+
Maximum allowed time between calls to consume messages in batches.
134+
If this interval is exceeded the consumer is considered failed and the group will
135+
rebalance in order to reassign the partitions to another consumer
136+
group member.
137+
If API methods block waiting for messages, that time
138+
does not count against this timeout.
139+
""",
140+
),
141+
)
142+
session_timeout_ms: int = Field(
143+
default=10 * 1000,
144+
description=textwrap.dedent(
145+
"""
146+
Client group session and failure detection
147+
timeout. The consumer sends periodic heartbeats
148+
(``heartbeat.interval.ms``) to indicate its liveness to the broker.
149+
150+
If no hearts are received by the broker for a group member within
151+
the session timeout, the broker will remove the consumer from the
152+
group and trigger a rebalance.
153+
154+
The allowed range is configured with the **broker** configuration properties
155+
``group.min.session.timeout.ms`` and ``group.max.session.timeout.ms``.
156+
""",
157+
),
158+
)
159+
heartbeat_interval_ms: int = Field(
160+
default=3 * 1000,
161+
description=textwrap.dedent(
162+
"""
163+
The expected time in milliseconds
164+
between heartbeats to the consumer coordinator when using
165+
Kafka's group management feature. Heartbeats are used to ensure
166+
that the consumer's session stays active and to facilitate
167+
rebalancing when new consumers join or leave the group.
168+
169+
The value must be set lower than :obj:`~session_timeout_ms`, but typically
170+
should be set no higher than 1/3 of that value. It can be
171+
adjusted even lower to control the expected time for normal
172+
rebalances.
173+
""",
174+
),
175+
)
176+
consumer_timeout_ms: int = Field(
177+
default=200,
178+
description=textwrap.dedent(
179+
"""
180+
Maximum wait timeout for background fetching routine.
181+
Mostly defines how fast the system will see rebalance and
182+
request new data for new partitions.
183+
""",
184+
),
185+
)
186+
isolation_level: Literal["read_uncommitted", "read_committed"] = Field(
187+
default="read_uncommitted",
188+
description=textwrap.dedent(
189+
"""
190+
Controls how to read messages written
191+
transactionally.
192+
193+
* ``read_committed`` - batch consumer will only return
194+
transactional messages which have been committed.
195+
196+
* ``read_uncommitted`` (the default) - batch consumer will
197+
return all messages, even transactional messages which have been
198+
aborted.
199+
200+
Non-transactional messages will be returned unconditionally in
201+
either mode.
202+
203+
Messages will always be returned in offset order. Hence, in
204+
``read_committed`` mode, batch consumer will only return
205+
messages up to the last stable offset (LSO), which is the one less
206+
than the offset of the first open transaction. In particular any
207+
messages appearing after messages belonging to ongoing transactions
208+
will be withheld until the relevant transaction has been completed.
209+
As a result, ``read_committed`` consumers will not be able to read up
210+
to the high watermark when there are in flight transactions.
211+
Further, when in ``read_committed`` the seek_to_end method will
212+
return the LSO.
213+
""",
214+
),
215+
)
Lines changed: 50 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,33 +1,79 @@
11
# SPDX-FileCopyrightText: 2024 MTS PJSC
22
# SPDX-License-Identifier: Apache-2.0
33

4+
import textwrap
5+
from enum import Enum
6+
47
from pydantic import BaseModel, Field
58

6-
from data_rentgen.consumer.settings.compression import KafkaCompression
79
from data_rentgen.consumer.settings.security import KafkaSecuritySettings
810

911

12+
class KafkaCompression(str, Enum):
13+
GZIP = "gzip"
14+
SNAPPY = "snappy"
15+
LZ4 = "lz4"
16+
ZSTD = "zstd"
17+
18+
def __str__(self):
19+
return self.value
20+
21+
1022
class KafkaSettings(BaseModel):
1123
"""Data.Rentgen consumer Kafka-specific settings.
1224
25+
These options are passed directly to
26+
`AIOKafkaConsumer <https://aiokafka.readthedocs.io/en/stable/api.html#aiokafka.AIOKafkaConsumer>`_.
27+
1328
Examples
1429
--------
1530
1631
.. code-block:: bash
1732
1833
DATA_RENTGEN__KAFKA__BOOTSTRAP_SERVERS=localhost:9092
1934
DATA_RENTGEN__KAFKA__SECURITY__TYPE=scram-256
35+
DATA_RENTGEN__KAFKA__REQUEST_TIMEOUT_MS=5000
36+
DATA_RENTGEN__KAFKA__CONNECTIONS_MAX_IDLE_MS=540000
2037
"""
2138

2239
bootstrap_servers: str = Field(
23-
description="List of Kafka bootstrap servers",
40+
description="List of Kafka bootstrap servers.",
2441
min_length=1,
2542
)
2643
security: KafkaSecuritySettings = Field(
2744
default_factory=KafkaSecuritySettings,
28-
description="Kafka security settings",
45+
description="Kafka security settings.",
2946
)
3047
compression: KafkaCompression | None = Field(
3148
default=None,
32-
description="Kafka message compression type",
49+
description="Kafka message compression type.",
50+
)
51+
# Defaults are copied from FastStream: https://github.com/airtai/faststream/blob/0.5.33/faststream/kafka/fastapi/fastapi.py#L78
52+
# But only options, related to consuming messages
53+
request_timeout_ms: int = Field(
54+
default=40 * 1000,
55+
description="Client request timeout in milliseconds.",
56+
)
57+
retry_backoff_ms: int = Field(
58+
default=100,
59+
description="Milliseconds to backoff when retrying on errors.",
60+
)
61+
metadata_max_age_ms: int = Field(
62+
default=5 * 60 * 1000,
63+
description=textwrap.dedent(
64+
"""
65+
The period of time in milliseconds after which we force a refresh of metadata,
66+
even if we haven't seen any partition leadership changes,
67+
to proactively discover any new brokers or partitions.
68+
""",
69+
),
70+
)
71+
connections_max_idle_ms: int = Field(
72+
default=9 * 60 * 1000,
73+
description=textwrap.dedent(
74+
"""
75+
Close idle connections after the number of milliseconds specified by this config.
76+
Specifying ``None`` will disable idle checks.
77+
""",
78+
),
3379
)

0 commit comments

Comments
 (0)