Skip to content

Commit ba16a93

Browse files
committed
Add stream filter function
Signed-off-by: Mathias L. Baumann <[email protected]>
1 parent 902efe3 commit ba16a93

File tree

3 files changed

+102
-1
lines changed

3 files changed

+102
-1
lines changed

RELEASE_NOTES.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
case int() as output:
2727
print(f"Received message: {output}")
2828
```
29+
* In the `streaming` module, the new function `filter_stream_events` can be used to filter out stream events and retain the old behavior.
2930

3031
## Bug Fixes
3132

src/frequenz/client/base/streaming.py

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from collections.abc import Callable
99
from dataclasses import dataclass
1010
from datetime import timedelta
11-
from typing import AsyncIterable, Generic, TypeAlias, TypeVar
11+
from typing import AsyncIterable, Generic, Tuple, Type, TypeAlias, TypeGuard, TypeVar
1212

1313
import grpc.aio
1414

@@ -58,6 +58,35 @@ class StreamFatalError:
5858
"""Type alias for the events that can be sent over the stream."""
5959

6060

61+
FilteredOutputT = TypeVar("FilteredOutputT")
62+
"""Type alias for the output type of the stream after filtering."""
63+
64+
65+
def filter_stream_events(
66+
receiver: channels.Receiver[StreamEvent | FilteredOutputT],
67+
ignore_events: Tuple[Type[StreamEvent], ...] = (
68+
StreamStarted,
69+
StreamRetrying,
70+
StreamFatalError,
71+
),
72+
) -> channels.Receiver[FilteredOutputT]:
73+
"""Filter the stream events to only return the transformed output type.
74+
75+
Args:
76+
receiver: The receiver to filter.
77+
ignore_events: A tuple of event types to filter out.
78+
79+
Returns:
80+
A new receiver that only returns the transformed output type.
81+
"""
82+
83+
def _filter(sample: FilteredOutputT | StreamEvent) -> TypeGuard[FilteredOutputT]:
84+
"""Check if the received message is of the output type."""
85+
return not isinstance(sample, ignore_events)
86+
87+
return receiver.filter(_filter)
88+
89+
6190
class GrpcStreamBroadcaster(Generic[InputT, OutputT]):
6291
"""Helper class to handle grpc streaming methods.
6392
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
# License: MIT
2+
# Copyright © 2025 Frequenz Energy-as-a-Service GmbH
3+
4+
"""Test filtering of stream events."""
5+
6+
import logging
7+
from datetime import timedelta
8+
from typing import Tuple, Type
9+
10+
import pytest
11+
from frequenz.channels import Broadcast
12+
13+
from frequenz.client.base.streaming import (
14+
StreamEvent,
15+
StreamFatalError,
16+
StreamRetrying,
17+
StreamStarted,
18+
filter_stream_events,
19+
)
20+
21+
22+
@pytest.mark.parametrize(
23+
"filter_events",
24+
(
25+
(StreamStarted, StreamRetrying, StreamFatalError),
26+
(StreamRetrying, StreamFatalError),
27+
(StreamFatalError),
28+
(),
29+
(StreamStarted, StreamRetrying),
30+
),
31+
)
32+
async def test_filter_stream_events(
33+
filter_events: Tuple[Type[StreamEvent], ...],
34+
) -> None:
35+
"""Test filtering all events."""
36+
channel = Broadcast[int | StreamEvent](name="FilterStreamEventsTestChannel")
37+
38+
receiver = filter_stream_events(channel.new_receiver(), filter_events)
39+
sender = channel.new_sender()
40+
41+
events = (
42+
StreamStarted(),
43+
1,
44+
2,
45+
3,
46+
StreamRetrying(delay=timedelta(seconds=1)),
47+
4,
48+
5,
49+
6,
50+
StreamFatalError(exception=Exception("Test error")),
51+
)
52+
53+
num_samples = 6
54+
num_received_samples = 0
55+
56+
for event in events:
57+
logging.info("Sending event: %s", event)
58+
await sender.send(event)
59+
60+
await channel.close()
61+
62+
async for event in receiver:
63+
logging.info("Received event: %s", event)
64+
if isinstance(event, int):
65+
num_received_samples += 1
66+
else:
67+
assert not isinstance(
68+
event, filter_events
69+
), "Received unexpected event type"
70+
71+
assert num_received_samples == num_samples, "Unexpected number of samples received"

0 commit comments

Comments
 (0)