Skip to content

Commit 77917da

Browse files
committed
Use mock_message_context
1 parent ac18eff commit 77917da

File tree

4 files changed

+14
-25
lines changed

4 files changed

+14
-25
lines changed

quixstreams/dataframe/windows/time_based.py

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -600,17 +600,7 @@ def _on_expired_session(
600600
timestamp_ms: int,
601601
late_by_ms: int,
602602
) -> None:
603-
try:
604-
ctx = message_context()
605-
topic = ctx.topic
606-
partition = ctx.partition
607-
offset = ctx.offset
608-
except:
609-
# In test environments, message context might not be available
610-
topic = "unknown"
611-
partition = -1
612-
offset = -1
613-
603+
ctx = message_context()
614604
to_log = True
615605

616606
# Trigger the "on_late" callback if provided
@@ -623,9 +613,9 @@ def _on_expired_session(
623613
start,
624614
end,
625615
self._name,
626-
topic,
627-
partition,
628-
offset,
616+
ctx.topic,
617+
ctx.partition,
618+
ctx.offset,
629619
)
630620
if to_log:
631621
logger.warning(
@@ -634,8 +624,8 @@ def _on_expired_session(
634624
f"session={(start, end)} "
635625
f"late_by_ms={late_by_ms} "
636626
f"store_name={self._name} "
637-
f"partition={topic}[{partition}] "
638-
f"offset={offset}"
627+
f"partition={ctx.topic}[{ctx.partition}] "
628+
f"offset={ctx.offset}"
639629
)
640630

641631

tests/test_quixstreams/test_dataframe/fixtures.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from typing import Optional
2-
from unittest.mock import MagicMock
2+
from unittest.mock import MagicMock, patch
33

44
import pytest
55

@@ -57,3 +57,9 @@ def factory(
5757
return sdf
5858

5959
return factory
60+
61+
62+
@pytest.fixture
63+
def mock_message_context():
64+
with patch("quixstreams.dataframe.windows.time_based.message_context"):
65+
yield

tests/test_quixstreams/test_dataframe/test_windows/test_session.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,7 +549,7 @@ def test_session_partition_expiration(
549549
assert key2 in expired_keys
550550

551551
def test_session_window_late_events(
552-
self, session_window_definition_factory, state_manager
552+
self, session_window_definition_factory, state_manager, mock_message_context
553553
):
554554
"""Test handling of late events that arrive after session closure"""
555555
window_def = session_window_definition_factory(

tests/test_quixstreams/test_dataframe/test_windows/test_sliding.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from dataclasses import dataclass, field
33
from itertools import chain
44
from typing import Any
5-
from unittest import mock
65

76
import pytest
87

@@ -741,12 +740,6 @@ def expected_windows_in_state(self) -> set[tuple[int, int]]:
741740
]
742741

743742

744-
@pytest.fixture
745-
def mock_message_context():
746-
with mock.patch("quixstreams.dataframe.windows.time_based.message_context"):
747-
yield
748-
749-
750743
@pytest.fixture
751744
def sliding_window_definition_factory(
752745
state_manager, dataframe_factory, topic_manager_topic_factory

0 commit comments

Comments
 (0)