Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from app.events.core import ProducerConfig, UnifiedProducer
from app.events.event_store import EventStore, create_event_store
from app.events.event_store_consumer import EventStoreConsumer, create_event_store_consumer
from app.events.schema.schema_registry import SchemaRegistryManager, create_schema_registry_manager
from app.events.schema.schema_registry import SchemaRegistryManager
from app.infrastructure.kafka.topics import get_all_topics
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
Expand Down Expand Up @@ -192,7 +192,7 @@ class EventProvider(Provider):

@provide
def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
return create_schema_registry_manager(settings, logger)
return SchemaRegistryManager(settings, logger)

@provide
async def get_event_store(self, schema_registry: SchemaRegistryManager, logger: logging.Logger) -> EventStore:
Expand Down
114 changes: 51 additions & 63 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable

from confluent_kafka import Consumer, KafkaError, Message, Producer
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
from opentelemetry.trace import SpanKind

from app.core.lifecycle import LifecycleEnabled
Expand All @@ -30,8 +31,8 @@ class DLQManager(LifecycleEnabled):
def __init__(
self,
settings: Settings,
consumer: Consumer,
producer: Producer,
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
Expand All @@ -48,8 +49,8 @@ def __init__(
self.default_retry_policy = default_retry_policy or RetryPolicy(
topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF
)
self.consumer: Consumer = consumer
self.producer: Producer = producer
self.consumer: AIOKafkaConsumer = consumer
self.producer: AIOKafkaProducer = producer

self._process_task: asyncio.Task[None] | None = None
self._monitor_task: asyncio.Task[None] | None = None
Expand Down Expand Up @@ -116,13 +117,13 @@ def _message_to_doc(self, message: DLQMessage) -> DLQMessageDocument:
headers=message.headers,
)

def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
"""Parse Kafka message into DLQMessage."""
raw_bytes = msg.value()
def _kafka_msg_to_message(self, msg: Any) -> DLQMessage:
"""Parse Kafka ConsumerRecord into DLQMessage."""
raw_bytes = msg.value
raw: str = raw_bytes.decode("utf-8") if isinstance(raw_bytes, (bytes, bytearray)) else str(raw_bytes or "")
data: dict[str, Any] = json.loads(raw) if raw else {}

headers_list = msg.headers() or []
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -141,15 +142,16 @@ def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
else datetime.now(timezone.utc),
status=DLQMessageStatus(data.get("status", DLQMessageStatus.PENDING)),
producer_id=data.get("producer_id", headers.get("producer_id", "unknown")),
dlq_offset=msg.offset(),
dlq_partition=msg.partition(),
dlq_offset=msg.offset,
dlq_partition=msg.partition,
headers=headers,
)

async def _on_start(self) -> None:
"""Start DLQ manager."""
topic_name = f"{self.settings.KAFKA_TOPIC_PREFIX}{self.dlq_topic}"
self.consumer.subscribe([topic_name])
# Start producer and consumer
await self.producer.start()
await self.consumer.start()

# Start processing tasks
self._process_task = asyncio.create_task(self._process_messages())
Expand All @@ -169,8 +171,8 @@ async def _on_stop(self) -> None:
pass

# Stop Kafka clients
self.consumer.close()
self.producer.flush(10)
await self.consumer.stop()
await self.producer.stop()

self.logger.info("DLQ Manager stopped")

Expand All @@ -181,9 +183,6 @@ async def _process_messages(self) -> None:
if msg is None:
continue

if not await self._validate_message(msg):
continue

start_time = asyncio.get_running_loop().time()
dlq_message = self._kafka_msg_to_message(msg)

Expand All @@ -195,23 +194,19 @@ async def _process_messages(self) -> None:
self.logger.error(f"Error in DLQ processing loop: {e}")
await asyncio.sleep(5)

async def _poll_message(self) -> Message | None:
"""Poll for a message from Kafka."""
return await asyncio.to_thread(self.consumer.poll, timeout=1.0)

async def _validate_message(self, msg: Message) -> bool:
"""Validate the Kafka message."""
if msg.error():
error = msg.error()
if error and error.code() == KafkaError._PARTITION_EOF:
return False
self.logger.error(f"Consumer error: {error}")
return False
return True

def _extract_headers(self, msg: Message) -> dict[str, str]:
"""Extract headers from Kafka message."""
headers_list = msg.headers() or []
async def _poll_message(self) -> Any | None:
"""Poll for a message from Kafka using async getone()."""
try:
return await asyncio.wait_for(self.consumer.getone(), timeout=1.0)
except asyncio.TimeoutError:
return None
except KafkaError as e:
self.logger.error(f"Consumer error: {e}")
return None

def _extract_headers(self, msg: Any) -> dict[str, str]:
"""Extract headers from Kafka ConsumerRecord."""
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -222,7 +217,7 @@ async def _record_message_metrics(self, dlq_message: DLQMessage) -> None:
self.metrics.record_dlq_message_received(dlq_message.original_topic, dlq_message.event_type)
self.metrics.record_dlq_message_age(dlq_message.age_seconds)

async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMessage) -> None:
async def _process_message_with_tracing(self, msg: Any, dlq_message: DLQMessage) -> None:
"""Process message with distributed tracing."""
headers = self._extract_headers(msg)
ctx = extract_trace_context(headers)
Expand All @@ -242,7 +237,7 @@ async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMess

async def _commit_and_record_duration(self, start_time: float) -> None:
"""Commit offset and record processing duration."""
await asyncio.to_thread(self.consumer.commit, asynchronous=False)
await self.consumer.commit()
duration = asyncio.get_running_loop().time() - start_time
self.metrics.record_dlq_processing_duration(duration, "process")

Expand Down Expand Up @@ -324,31 +319,27 @@ async def _retry_message(self, message: DLQMessage) -> None:
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)
kafka_headers: list[tuple[str, str | bytes]] = [(k, v.encode()) for k, v in hdrs.items()]
kafka_headers: list[tuple[str, bytes]] = [(k, v.encode()) for k, v in hdrs.items()]

# Get the original event
event = message.event

await asyncio.to_thread(
self.producer.produce,
# Send to retry topic
await self.producer.send_and_wait(
topic=retry_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Send to original topic
await asyncio.to_thread(
self.producer.produce,
await self.producer.send_and_wait(
topic=message.original_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Flush to ensure messages are sent
await asyncio.to_thread(self.producer.flush, timeout=5)

# Update metrics
self.metrics.record_dlq_message_retried(message.original_topic, message.event_type, "success")

Expand Down Expand Up @@ -521,25 +512,22 @@ def create_dlq_manager(
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
) -> DLQManager:
consumer = Consumer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"group.id": f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"client.id": "dlq-manager-consumer",
}
topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{dlq_topic}"
consumer = AIOKafkaConsumer(
topic_name,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
enable_auto_commit=False,
auto_offset_reset="earliest",
client_id="dlq-manager-consumer",
)
producer = Producer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"client.id": "dlq-manager-producer",
"acks": "all",
"enable.idempotence": True,
"compression.type": "gzip",
"batch.size": 16384,
"linger.ms": 10,
}
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
client_id="dlq-manager-producer",
acks="all",
compression_type="gzip",
max_batch_size=16384,
linger_ms=10,
)
if default_retry_policy is None:
default_retry_policy = RetryPolicy(topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF)
Expand Down
69 changes: 0 additions & 69 deletions backend/app/events/admin_utils.py

This file was deleted.

Loading
Loading