Skip to content

Commit 4e191e2

Browse files
committed
Add timeseries.Source and Sink types
All tools provided by this package should be steps in a pipeline that takes Source as input and produces values in a Sink. Signed-off-by: Leandro Lucarella <[email protected]>
1 parent a9ef990 commit 4e191e2

File tree

2 files changed

+33
-4
lines changed

2 files changed

+33
-4
lines changed

src/frequenz/sdk/timeseries/__init__.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,22 @@
55
Handling of timeseries streams.
66
77
A timeseries is a stream (normally an async iterator) of
8-
[samples][frequenz.sdk.timeseries.Sample].
8+
[`Sample`][frequenz.sdk.timeseries.Sample]s.
99
10-
This module provides tools to operate on timeseries.
10+
Timeseries can be consumed from
11+
[`Source`][frequenz.sdk.timeseries.Source]s and new timeseries can be
12+
generated by sending samples to
13+
a [`Sink`][frequenz.sdk.timeseries.Sink]
1114
"""
1215

13-
from ._base_types import Sample
16+
from ._base_types import Sample, Sink, Source
1417
from ._resampler import GroupResampler, Resampler, ResamplingFunction
1518

1619
__all__ = [
1720
"GroupResampler",
1821
"Resampler",
1922
"ResamplingFunction",
2023
"Sample",
24+
"Sink",
25+
"Source",
2126
]

src/frequenz/sdk/timeseries/_base_types.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55

66
from dataclasses import dataclass
77
from datetime import datetime
8-
from typing import Optional
8+
from typing import AsyncIterator, Callable, Coroutine, Optional
99

1010

1111
@dataclass(frozen=True)
@@ -19,3 +19,27 @@ class Sample:
1919

2020
timestamp: datetime
2121
value: Optional[float] = None
22+
23+
24+
Source = AsyncIterator[Sample]
25+
"""A source for a timeseries.
26+
27+
A timeseries can be received sample by sample in a streaming way
28+
using a source.
29+
"""
30+
31+
Sink = Callable[[Sample], Coroutine[None, None, None]]
32+
"""A sink for a timeseries.
33+
34+
A new timeseries can be generated by sending samples to a sink.
35+
36+
This should be an `async` callable, for example:
37+
38+
``` python
39+
async some_sink(Sample) -> None:
40+
...
41+
```
42+
43+
Args:
44+
sample (Sample): A sample to be sent out.
45+
"""

0 commit comments

Comments
 (0)