Skip to content

Commit 7db84af

Browse files
committed
Use mock_message_context
1 parent 892938e commit 7db84af

File tree

4 files changed

+14
-25
lines changed

4 files changed

+14
-25
lines changed

quixstreams/dataframe/windows/time_based.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -581,17 +581,7 @@ def _on_expired_session(
581581
timestamp_ms: int,
582582
late_by_ms: int,
583583
) -> None:
584-
try:
585-
ctx = message_context()
586-
topic = ctx.topic
587-
partition = ctx.partition
588-
offset = ctx.offset
589-
except:
590-
# In test environments, message context might not be available
591-
topic = "unknown"
592-
partition = -1
593-
offset = -1
594-
584+
ctx = message_context()
595585
to_log = True
596586

597587
# Trigger the "on_late" callback if provided
@@ -604,9 +594,9 @@ def _on_expired_session(
604594
start,
605595
end,
606596
self._name,
607-
topic,
608-
partition,
609-
offset,
597+
ctx.topic,
598+
ctx.partition,
599+
ctx.offset,
610600
)
611601
if to_log:
612602
logger.warning(
@@ -615,8 +605,8 @@ def _on_expired_session(
615605
f"session={(start, end)} "
616606
f"late_by_ms={late_by_ms} "
617607
f"store_name={self._name} "
618-
f"partition={topic}[{partition}] "
619-
f"offset={offset}"
608+
f"partition={ctx.topic}[{ctx.partition}] "
609+
f"offset={ctx.offset}"
620610
)
621611

622612

tests/test_quixstreams/test_dataframe/fixtures.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Optional
2-
from unittest.mock import MagicMock
2+
from unittest.mock import MagicMock, patch
33

44
import pytest
55

@@ -59,3 +59,9 @@ def factory(
5959
return sdf
6060

6161
return factory
62+
63+
64+
@pytest.fixture
65+
def mock_message_context():
66+
with patch("quixstreams.dataframe.windows.time_based.message_context"):
67+
yield

tests/test_quixstreams/test_dataframe/test_windows/test_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ def test_session_partition_expiration(
549549
assert key2 in expired_keys
550550

551551
def test_session_window_late_events(
552-
self, session_window_definition_factory, state_manager
552+
self, session_window_definition_factory, state_manager, mock_message_context
553553
):
554554
"""Test handling of late events that arrive after session closure"""
555555
window_def = session_window_definition_factory(

tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from dataclasses import dataclass, field
33
from itertools import chain
44
from typing import Any
5-
from unittest import mock
65

76
import pytest
87

@@ -741,12 +740,6 @@ def expected_windows_in_state(self) -> set[tuple[int, int]]:
741740
]
742741

743742

744-
@pytest.fixture
745-
def mock_message_context():
746-
with mock.patch("quixstreams.dataframe.windows.time_based.message_context"):
747-
yield
748-
749-
750743
@pytest.fixture
751744
def sliding_window_definition_factory(
752745
state_manager, dataframe_factory, topic_manager_topic_factory

0 commit comments

Comments
 (0)