Skip to content

Commit 36bead9

Browse files
committed
ruff fixes, + extra doc about lock in producer
1 parent a4c9fa1 commit 36bead9

File tree

6 files changed

+97
-31
lines changed

6 files changed

+97
-31
lines changed

backend/app/db/repositories/event_repository.py

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -125,18 +125,11 @@ async def get_events_by_aggregate(
125125
docs = await cursor.to_list(length=limit)
126126
return [self.mapper.from_mongo_document(doc) for doc in docs]
127127

128-
async def get_events_by_correlation(
129-
self, correlation_id: str, limit: int = 100, skip: int = 0
130-
) -> EventListResult:
128+
async def get_events_by_correlation(self, correlation_id: str, limit: int = 100, skip: int = 0) -> EventListResult:
131129
query: dict[str, Any] = {EventFields.METADATA_CORRELATION_ID: correlation_id}
132130
total_count = await self._collection.count_documents(query)
133131

134-
cursor = (
135-
self._collection.find(query)
136-
.sort(EventFields.TIMESTAMP, ASCENDING)
137-
.skip(skip)
138-
.limit(limit)
139-
)
132+
cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).skip(skip).limit(limit)
140133
docs = await cursor.to_list(length=limit)
141134
return EventListResult(
142135
events=[self.mapper.from_mongo_document(doc) for doc in docs],
@@ -166,18 +159,11 @@ async def get_events_by_user(
166159
docs = await cursor.to_list(length=limit)
167160
return [self.mapper.from_mongo_document(doc) for doc in docs]
168161

169-
async def get_execution_events(
170-
self, execution_id: str, limit: int = 100, skip: int = 0
171-
) -> EventListResult:
162+
async def get_execution_events(self, execution_id: str, limit: int = 100, skip: int = 0) -> EventListResult:
172163
query = {"$or": [{EventFields.PAYLOAD_EXECUTION_ID: execution_id}, {EventFields.AGGREGATE_ID: execution_id}]}
173164
total_count = await self._collection.count_documents(query)
174165

175-
cursor = (
176-
self._collection.find(query)
177-
.sort(EventFields.TIMESTAMP, ASCENDING)
178-
.skip(skip)
179-
.limit(limit)
180-
)
166+
cursor = self._collection.find(query).sort(EventFields.TIMESTAMP, ASCENDING).skip(skip).limit(limit)
181167
docs = await cursor.to_list(length=limit)
182168
return EventListResult(
183169
events=[self.mapper.from_mongo_document(doc) for doc in docs],

backend/app/events/core/producer.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@
88
from confluent_kafka import Message, Producer
99
from confluent_kafka.error import KafkaError
1010

11-
# Global lock to serialize Producer initialization (workaround for librdkafka race condition)
12-
# See: https://github.com/confluentinc/confluent-kafka-python/issues/1797
13-
_producer_init_lock = threading.Lock()
14-
1511
from app.core.lifecycle import LifecycleEnabled
1612
from app.core.logging import logger
1713
from app.core.metrics.context import get_event_metrics
@@ -23,6 +19,10 @@
2319

2420
from .types import ProducerConfig, ProducerMetrics, ProducerState
2521

22+
# Global lock to serialize Producer initialization (workaround for librdkafka race condition)
23+
# See: https://github.com/confluentinc/confluent-kafka-python/issues/1797
24+
_producer_init_lock = threading.Lock()
25+
2626
DeliveryCallback: TypeAlias = Callable[[KafkaError | None, Message], None]
2727
StatsCallback: TypeAlias = Callable[[dict[str, Any]], None]
2828

backend/app/infrastructure/mappers/event_mapper.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def from_mongo_document(document: dict[str, Any]) -> Event:
7878
)
7979

8080

81-
8281
class EventSummaryMapper:
8382
"""Handles EventSummary serialization."""
8483

@@ -211,5 +210,3 @@ def from_admin_pydantic(pflt: AdminEventFilter) -> EventFilter:
211210
search_text=pflt.search_text,
212211
text_search=pflt.search_text,
213212
)
214-
215-

backend/app/services/event_service.py

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,7 @@ async def get_execution_events(
3737
limit: int = 1000,
3838
skip: int = 0,
3939
) -> EventListResult | None:
40-
result = await self.repository.get_execution_events(
41-
execution_id=execution_id, limit=limit, skip=skip
42-
)
40+
result = await self.repository.get_execution_events(execution_id=execution_id, limit=limit, skip=skip)
4341
if not result.events:
4442
return EventListResult(events=[], total=0, skip=skip, limit=limit, has_more=False)
4543

@@ -132,9 +130,7 @@ async def get_events_by_correlation(
132130
limit: int = 100,
133131
skip: int = 0,
134132
) -> EventListResult:
135-
result = await self.repository.get_events_by_correlation(
136-
correlation_id=correlation_id, limit=limit, skip=skip
137-
)
133+
result = await self.repository.get_events_by_correlation(correlation_id=correlation_id, limit=limit, skip=skip)
138134
if not include_all_users or user_role != UserRole.ADMIN:
139135
filtered = [e for e in result.events if (e.metadata and e.metadata.user_id == user_id)]
140136
return EventListResult(
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Kafka test stability
2+
3+
## The problem
4+
5+
When running tests in parallel (e.g., with `pytest-xdist`), you might encounter sporadic crashes with messages like:
6+
7+
```
8+
Fatal Python error: Aborted
9+
```
10+
11+
The stack trace typically points to `confluent_kafka` operations, often during producer initialization in fixtures or test setup. This isn't a bug in the application code - it's a known race condition in the underlying `librdkafka` C library.
12+
13+
## Why it happens
14+
15+
The `confluent-kafka-python` library is a thin wrapper around `librdkafka`, a high-performance C library. When multiple Python processes or threads try to create Kafka `Producer` instances simultaneously, they can trigger a race condition in `librdkafka`'s internal initialization routines.
16+
17+
This manifests as:
18+
19+
- Random `SIGABRT` signals during test runs
20+
- Crashes in `rd_kafka_broker_destroy_final` or similar internal functions
21+
- Flaky CI failures that pass on retry
22+
23+
The issue is particularly common in CI environments where tests run in parallel across multiple workers.
24+
25+
## The fix
26+
27+
The solution is to serialize `Producer` initialization using a global threading lock. This prevents multiple threads from entering `librdkafka`'s initialization code simultaneously.
28+
29+
In `app/events/core/producer.py`:
30+
31+
```python
32+
import threading
33+
34+
# Global lock to serialize Producer initialization (workaround for librdkafka race condition)
35+
# See: https://github.com/confluentinc/confluent-kafka-python/issues/1797
36+
_producer_init_lock = threading.Lock()
37+
38+
class UnifiedProducer:
39+
async def start(self) -> None:
40+
# ... config setup ...
41+
42+
# Serialize Producer initialization to prevent librdkafka race condition
43+
with _producer_init_lock:
44+
self._producer = Producer(producer_config)
45+
46+
# ... rest of startup ...
47+
```
48+
49+
The lock is process-global, so all `UnifiedProducer` instances in the same process will serialize their initialization. This adds negligible overhead in production (producers are typically created once at startup) while eliminating the race condition in tests.
50+
51+
## Related issues
52+
53+
These GitHub issues document the underlying problem:
54+
55+
| Issue | Description |
56+
|-------|-------------|
57+
| [confluent-kafka-python#1797](https://github.com/confluentinc/confluent-kafka-python/issues/1797) | Segfaults in multithreaded/asyncio pytest environments |
58+
| [confluent-kafka-python#1761](https://github.com/confluentinc/confluent-kafka-python/issues/1761) | Segfault on garbage collection in multithreaded context |
59+
| [librdkafka#3608](https://github.com/confluentinc/librdkafka/issues/3608) | Crash in `rd_kafka_broker_destroy_final` |
60+
61+
## Alternative approaches
62+
63+
If you still encounter issues:
64+
65+
1. **Reduce parallelism** - Run Kafka-dependent tests with fewer workers: `pytest -n 2` instead of `-n auto`
66+
67+
2. **Isolate Kafka tests** - Mark Kafka tests and run them separately:
68+
```python
69+
@pytest.mark.kafka
70+
def test_producer_sends_message():
71+
...
72+
```
73+
```bash
74+
pytest -m "not kafka" -n auto # parallel
75+
pytest -m kafka -n 1 # sequential
76+
```
77+
78+
3. **Use fixtures carefully** - Ensure producer fixtures are properly scoped and cleaned up:
79+
```python
80+
@pytest.fixture(scope="function")
81+
async def producer():
82+
p = UnifiedProducer(config, schema_registry)
83+
await p.start()
84+
yield p
85+
await p.stop() # Always clean up
86+
```

mkdocs.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,3 +139,4 @@ nav:
139139
- Testing:
140140
- Load Testing: testing/load-testing.md
141141
- Frontend Testing: testing/frontend-testing.md
142+
- Kafka Test Stability: testing/kafka-test-stability.md

0 commit comments

Comments
 (0)