File tree Expand file tree Collapse file tree 1 file changed +8
-1
lines changed
Expand file tree Collapse file tree 1 file changed +8
-1
lines changed Original file line number Diff line number Diff line change 11import asyncio
22import json
33import socket
4+ import threading
45from datetime import datetime , timezone
56from typing import Any , Callable , TypeAlias
67
78from confluent_kafka import Message , Producer
89from confluent_kafka .error import KafkaError
910
11+ # Global lock to serialize Producer initialization (workaround for librdkafka race condition)
12+ # See: https://github.com/confluentinc/confluent-kafka-python/issues/1797
13+ _producer_init_lock = threading .Lock ()
14+
1015from app .core .lifecycle import LifecycleEnabled
1116from app .core .logging import logger
1217from app .core .metrics .context import get_event_metrics
@@ -113,7 +118,9 @@ async def start(self) -> None:
113118 producer_config ["stats_cb" ] = self ._handle_stats
114119 producer_config ["statistics.interval.ms" ] = 30000
115120
116- self ._producer = Producer (producer_config )
121+ # Serialize Producer initialization to prevent librdkafka race condition
122+ with _producer_init_lock :
123+ self ._producer = Producer (producer_config )
117124 self ._running = True
118125 self ._poll_task = asyncio .create_task (self ._poll_loop ())
119126 self ._state = ProducerState .RUNNING
You can’t perform that action at this time.
0 commit comments