Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 51 additions & 63 deletions backend/app/dlq/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
from datetime import datetime, timezone
from typing import Any, Awaitable, Callable

from confluent_kafka import Consumer, KafkaError, Message, Producer
from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from aiokafka.errors import KafkaError
from opentelemetry.trace import SpanKind

from app.core.lifecycle import LifecycleEnabled
Expand All @@ -30,8 +31,8 @@ class DLQManager(LifecycleEnabled):
def __init__(
self,
settings: Settings,
consumer: Consumer,
producer: Producer,
consumer: AIOKafkaConsumer,
producer: AIOKafkaProducer,
schema_registry: SchemaRegistryManager,
logger: logging.Logger,
dlq_topic: KafkaTopic = KafkaTopic.DEAD_LETTER_QUEUE,
Expand All @@ -48,8 +49,8 @@ def __init__(
self.default_retry_policy = default_retry_policy or RetryPolicy(
topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF
)
self.consumer: Consumer = consumer
self.producer: Producer = producer
self.consumer: AIOKafkaConsumer = consumer
self.producer: AIOKafkaProducer = producer

self._process_task: asyncio.Task[None] | None = None
self._monitor_task: asyncio.Task[None] | None = None
Expand Down Expand Up @@ -116,13 +117,13 @@ def _message_to_doc(self, message: DLQMessage) -> DLQMessageDocument:
headers=message.headers,
)

def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
"""Parse Kafka message into DLQMessage."""
raw_bytes = msg.value()
def _kafka_msg_to_message(self, msg: Any) -> DLQMessage:
"""Parse Kafka ConsumerRecord into DLQMessage."""
raw_bytes = msg.value
raw: str = raw_bytes.decode("utf-8") if isinstance(raw_bytes, (bytes, bytearray)) else str(raw_bytes or "")
data: dict[str, Any] = json.loads(raw) if raw else {}

headers_list = msg.headers() or []
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -141,15 +142,16 @@ def _kafka_msg_to_message(self, msg: Message) -> DLQMessage:
else datetime.now(timezone.utc),
status=DLQMessageStatus(data.get("status", DLQMessageStatus.PENDING)),
producer_id=data.get("producer_id", headers.get("producer_id", "unknown")),
dlq_offset=msg.offset(),
dlq_partition=msg.partition(),
dlq_offset=msg.offset,
dlq_partition=msg.partition,
headers=headers,
)

async def _on_start(self) -> None:
"""Start DLQ manager."""
topic_name = f"{self.settings.KAFKA_TOPIC_PREFIX}{self.dlq_topic}"
self.consumer.subscribe([topic_name])
# Start producer and consumer
await self.producer.start()
await self.consumer.start()

# Start processing tasks
self._process_task = asyncio.create_task(self._process_messages())
Expand All @@ -169,8 +171,8 @@ async def _on_stop(self) -> None:
pass

# Stop Kafka clients
self.consumer.close()
self.producer.flush(10)
await self.consumer.stop()
await self.producer.stop()

self.logger.info("DLQ Manager stopped")

Expand All @@ -181,9 +183,6 @@ async def _process_messages(self) -> None:
if msg is None:
continue

if not await self._validate_message(msg):
continue

start_time = asyncio.get_running_loop().time()
dlq_message = self._kafka_msg_to_message(msg)

Expand All @@ -195,23 +194,19 @@ async def _process_messages(self) -> None:
self.logger.error(f"Error in DLQ processing loop: {e}")
await asyncio.sleep(5)

async def _poll_message(self) -> Message | None:
"""Poll for a message from Kafka."""
return await asyncio.to_thread(self.consumer.poll, timeout=1.0)

async def _validate_message(self, msg: Message) -> bool:
"""Validate the Kafka message."""
if msg.error():
error = msg.error()
if error and error.code() == KafkaError._PARTITION_EOF:
return False
self.logger.error(f"Consumer error: {error}")
return False
return True

def _extract_headers(self, msg: Message) -> dict[str, str]:
"""Extract headers from Kafka message."""
headers_list = msg.headers() or []
async def _poll_message(self) -> Any | None:
"""Poll for a message from Kafka using async getone()."""
try:
return await asyncio.wait_for(self.consumer.getone(), timeout=1.0)
except asyncio.TimeoutError:
return None
except KafkaError as e:
self.logger.error(f"Consumer error: {e}")
return None

def _extract_headers(self, msg: Any) -> dict[str, str]:
"""Extract headers from Kafka ConsumerRecord."""
headers_list = msg.headers or []
headers: dict[str, str] = {}
for k, v in headers_list:
headers[str(k)] = v.decode("utf-8") if isinstance(v, (bytes, bytearray)) else (v or "")
Expand All @@ -222,7 +217,7 @@ async def _record_message_metrics(self, dlq_message: DLQMessage) -> None:
self.metrics.record_dlq_message_received(dlq_message.original_topic, dlq_message.event_type)
self.metrics.record_dlq_message_age(dlq_message.age_seconds)

async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMessage) -> None:
async def _process_message_with_tracing(self, msg: Any, dlq_message: DLQMessage) -> None:
"""Process message with distributed tracing."""
headers = self._extract_headers(msg)
ctx = extract_trace_context(headers)
Expand All @@ -242,7 +237,7 @@ async def _process_message_with_tracing(self, msg: Message, dlq_message: DLQMess

async def _commit_and_record_duration(self, start_time: float) -> None:
"""Commit offset and record processing duration."""
await asyncio.to_thread(self.consumer.commit, asynchronous=False)
await self.consumer.commit()
duration = asyncio.get_running_loop().time() - start_time
self.metrics.record_dlq_processing_duration(duration, "process")

Expand Down Expand Up @@ -324,31 +319,27 @@ async def _retry_message(self, message: DLQMessage) -> None:
"dlq_retry_timestamp": datetime.now(timezone.utc).isoformat(),
}
hdrs = inject_trace_context(hdrs)
kafka_headers: list[tuple[str, str | bytes]] = [(k, v.encode()) for k, v in hdrs.items()]
kafka_headers: list[tuple[str, bytes]] = [(k, v.encode()) for k, v in hdrs.items()]

# Get the original event
event = message.event

await asyncio.to_thread(
self.producer.produce,
# Send to retry topic
await self.producer.send_and_wait(
topic=retry_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Send to original topic
await asyncio.to_thread(
self.producer.produce,
await self.producer.send_and_wait(
topic=message.original_topic,
value=json.dumps(event.to_dict()).encode(),
key=message.event_id.encode(),
headers=kafka_headers,
)

# Flush to ensure messages are sent
await asyncio.to_thread(self.producer.flush, timeout=5)

# Update metrics
self.metrics.record_dlq_message_retried(message.original_topic, message.event_type, "success")

Expand Down Expand Up @@ -521,25 +512,22 @@ def create_dlq_manager(
retry_topic_suffix: str = "-retry",
default_retry_policy: RetryPolicy | None = None,
) -> DLQManager:
consumer = Consumer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"group.id": f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
"enable.auto.commit": False,
"auto.offset.reset": "earliest",
"client.id": "dlq-manager-consumer",
}
topic_name = f"{settings.KAFKA_TOPIC_PREFIX}{dlq_topic}"
consumer = AIOKafkaConsumer(
topic_name,
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
group_id=f"{GroupId.DLQ_MANAGER}.{settings.KAFKA_GROUP_SUFFIX}",
enable_auto_commit=False,
auto_offset_reset="earliest",
client_id="dlq-manager-consumer",
)
producer = Producer(
{
"bootstrap.servers": settings.KAFKA_BOOTSTRAP_SERVERS,
"client.id": "dlq-manager-producer",
"acks": "all",
"enable.idempotence": True,
"compression.type": "gzip",
"batch.size": 16384,
"linger.ms": 10,
}
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
client_id="dlq-manager-producer",
acks="all",
compression_type="gzip",
max_batch_size=16384,
linger_ms=10,
)
Comment on lines +524 to 531
Copy link

@cubic-dev-ai cubic-dev-ai bot Jan 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1: Missing enable_idempotence=True parameter. The original confluent_kafka producer had idempotence enabled for exactly-once delivery semantics. This should be preserved in the aiokafka migration to prevent duplicate messages during retries.

Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/dlq/manager.py, line 524:

<comment>Missing `enable_idempotence=True` parameter. The original confluent_kafka producer had idempotence enabled for exactly-once delivery semantics. This should be preserved in the aiokafka migration to prevent duplicate messages during retries.</comment>

<file context>
@@ -521,25 +512,22 @@ def create_dlq_manager(
-            "batch.size": 16384,
-            "linger.ms": 10,
-        }
+    producer = AIOKafkaProducer(
+        bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
+        client_id="dlq-manager-producer",
</file context>
Suggested change
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
client_id="dlq-manager-producer",
acks="all",
compression_type="gzip",
max_batch_size=16384,
linger_ms=10,
)
producer = AIOKafkaProducer(
bootstrap_servers=settings.KAFKA_BOOTSTRAP_SERVERS,
client_id="dlq-manager-producer",
acks="all",
enable_idempotence=True,
compression_type="gzip",
max_batch_size=16384,
linger_ms=10,
)
Fix with Cubic

if default_retry_policy is None:
default_retry_policy = RetryPolicy(topic="default", strategy=RetryStrategy.EXPONENTIAL_BACKOFF)
Expand Down
69 changes: 0 additions & 69 deletions backend/app/events/admin_utils.py

This file was deleted.

Loading
Loading