Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions backend/app/core/providers.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from app.events.core import ProducerConfig, UnifiedProducer
from app.events.event_store import EventStore, create_event_store
from app.events.event_store_consumer import EventStoreConsumer, create_event_store_consumer
from app.events.schema.schema_registry import SchemaRegistryManager, create_schema_registry_manager
from app.events.schema.schema_registry import SchemaRegistryManager
from app.infrastructure.kafka.topics import get_all_topics
from app.services.admin import AdminEventsService, AdminSettingsService, AdminUserService
from app.services.auth_service import AuthService
Expand Down Expand Up @@ -192,7 +192,7 @@ class EventProvider(Provider):

@provide
def get_schema_registry(self, settings: Settings, logger: logging.Logger) -> SchemaRegistryManager:
return create_schema_registry_manager(settings, logger)
return SchemaRegistryManager(settings, logger)

@provide
async def get_event_store(self, schema_registry: SchemaRegistryManager, logger: logging.Logger) -> EventStore:
Expand Down
115 changes: 52 additions & 63 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable

from confluent_kafka import Consumer, KafkaError, Message, Producer
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
from opentelemetry.trace import SpanKind

from app.core.lifecycle import LifecycleEnabled
Expand All @@ -30,8 +31,8 @@ class DLQManager(LifecycleEnabled):
def __init__(
self,
settings: Settings,
consumer: Consumer,
producer: Producer,
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
Expand All @@ -48,8 +49,8 @@ def __init__(
self.default_retry_policy = default_retry_policy or RetryPolicy(
topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF
)
self.consumer: Consumer = consumer
self.producer: Producer = producer
self.consumer: AIOKafkaConsumer = consumer
self.producer: AIOKafkaProducer = producer

self._process_task: asyncio.Task[None] | None = None
self._monitor_task: asyncio.Task[None] | None = None
Expand Down Expand Up @@ -116,13 +117,13 @@ def _message_to_doc(self, message: DLQMessage) -> DLQMessageDocument:
headers=message.headers,
)

def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
"""Parse Kafka message into DLQMessage."""
raw_bytes = msg.value()
def _kafka_msg_to_message(self, msg: Any) -> DLQMessage:
"""Parse Kafka ConsumerRecord into DLQMessage."""
raw_bytes = msg.value
raw: str = raw_bytes.decode("utf-8") if isinstance(raw_bytes, (bytes, bytearray)) else str(raw_bytes or "")
data: dict[str, Any] = json.loads(raw) if raw else {}

headers_list = msg.headers() or []
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -141,15 +142,16 @@ def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
else datetime.now(timezone.utc),
status=DLQMessageStatus(data.get("status", DLQMessageStatus.PENDING)),
producer_id=data.get("producer_id", headers.get("producer_id", "unknown")),
dlq_offset=msg.offset(),
dlq_partition=msg.partition(),
dlq_offset=msg.offset,
dlq_partition=msg.partition,
headers=headers,
)

async def _on_start(self) -> None:
"""Start DLQ manager."""
topic_name = f"{self.settings.KAFKA_TOPIC_PREFIX}{self.dlq_topic}"
self.consumer.subscribe([topic_name])
# Start producer and consumer
await self.producer.start()
await self.consumer.start()

# Start processing tasks
self._process_task = asyncio.create_task(self._process_messages())
Expand All @@ -169,8 +171,8 @@ async def _on_stop(self) -> None:
pass

# Stop Kafka clients
self.consumer.close()
self.producer.flush(10)
await self.consumer.stop()
await self.producer.stop()

self.logger.info("DLQ Manager stopped")

Expand All @@ -181,9 +183,6 @@ async def _process_messages(self) -> None:
if msg is None:
continue

if not await self._validate_message(msg):
continue

start_time = asyncio.get_running_loop().time()
dlq_message = self._kafka_msg_to_message(msg)

Expand All @@ -195,23 +194,19 @@ async def _process_messages(self) -> None:
self.logger.error(f"Error in DLQ processing loop: {e}")
await asyncio.sleep(5)

async def _poll_message(self) -> Message | None:
"""Poll for a message from Kafka."""
return await asyncio.to_thread(self.consumer.poll, timeout=1.0)

async def _validate_message(self, msg: Message) -> bool:
"""Validate the Kafka message."""
if msg.error():
error = msg.error()
if error and error.code() == KafkaError._PARTITION_EOF:
return False
self.logger.error(f"Consumer error: {error}")
return False
return True

def _extract_headers(self, msg: Message) -> dict[str, str]:
"""Extract headers from Kafka message."""
headers_list = msg.headers() or []
async def _poll_message(self) -> Any | None:
"""Poll for a message from Kafka using async getone()."""
try:
return await asyncio.wait_for(self.consumer.getone(), timeout=1.0)
except asyncio.TimeoutError:
return None
except KafkaError as e:
self.logger.error(f"Consumer error: {e}")
return None

def _extract_headers(self, msg: Any) -> dict[str, str]:
"""Extract headers from Kafka ConsumerRecord."""
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -222,7 +217,7 @@ async def _record_message_metrics(self, dlq_message: DLQMessage) -> None:
self.metrics.record_dlq_message_received(dlq_message.original_topic, dlq_message.event_type)
self.metrics.record_dlq_message_age(dlq_message.age_seconds)

async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMessage) -> None:
async def _process_message_with_tracing(self, msg: Any, dlq_message: DLQMessage) -> None:
"""Process message with distributed tracing."""
headers = self._extract_headers(msg)
ctx = extract_trace_context(headers)
Expand All @@ -242,7 +237,7 @@ async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMess

async def _commit_and_record_duration(self, start_time: float) -> None:
"""Commit offset and record processing duration."""
await asyncio.to_thread(self.consumer.commit, asynchronous=False)
await self.consumer.commit()
duration = asyncio.get_running_loop().time() - start_time
self.metrics.record_dlq_processing_duration(duration, "process")

Expand Down Expand Up @@ -324,31 +319,27 @@ async def _retry_message(self, message: DLQMessage) -> None:
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)
kafka_headers: list[tuple[str, str | bytes]] = [(k, v.encode()) for k, v in hdrs.items()]
kafka_headers: list[tuple[str, bytes]] = [(k, v.encode()) for k, v in hdrs.items()]

# Get the original event
event = message.event

await asyncio.to_thread(
self.producer.produce,
# Send to retry topic
await self.producer.send_and_wait(
topic=retry_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Send to original topic
await asyncio.to_thread(
self.producer.produce,
await self.producer.send_and_wait(
topic=message.original_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Flush to ensure messages are sent
await asyncio.to_thread(self.producer.flush, timeout=5)

# Update metrics
self.metrics.record_dlq_message_retried(message.original_topic, message.event_type, "success")

Expand Down Expand Up @@ -521,25 +512,23 @@ def create_dlq_manager(
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
) -> DLQManager:
consumer = Consumer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"group.id": f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"client.id": "dlq-manager-consumer",
}
topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{dlq_topic}"
consumer = AIOKafkaConsumer(
topic_name,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
enable_auto_commit=False,
auto_offset_reset="earliest",
client_id="dlq-manager-consumer",
)
producer = Producer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"client.id": "dlq-manager-producer",
"acks": "all",
"enable.idempotence": True,
"compression.type": "gzip",
"batch.size": 16384,
"linger.ms": 10,
}
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
client_id="dlq-manager-producer",
acks="all",
compression_type="gzip",
max_batch_size=16384,
linger_ms=10,
enable_idempotence=True,
)
if default_retry_policy is None:
default_retry_policy = RetryPolicy(topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF)
Expand Down
69 changes: 0 additions & 69 deletions backend/app/events/admin_utils.py

This file was deleted.

Loading
Loading