Skip to content
Open
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions financial/streaming_moving_average.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""This module contains a class that
calculates the moving average in real-time.

Reference
---------
https://en.wikipedia.org/wiki/Moving_average
"""

from collections import deque


class StreamingMovingAverage:
"""Streaming Moving Average calculator that updates
the average in real-time.

Attributes
----------
window_size : int
The size of the moving average window.

Methods
-------
update(new_value: float) -> None
Update the moving average with a new data point.
mov_avg() -> float
Return the current moving average value.

Raises
------
ValueError
If the window size is less than 1.


Examples
--------
>>> mov_avg_calculator = StreamingMovingAverage(3)
>>> mov_avg_calculator.update(10.)
>>> mov_avg_calculator.update(12.)
>>> mov_avg_calculator.update(2.)
>>> mov_avg_calculator.mov_avg
8.0
>>> mov_avg_calculator = StreamingMovingAverage(5)
>>> mov_avg_calculator.update(2.)
>>> mov_avg_calculator.update(3.)
>>> mov_avg_calculator.mov_avg
2.5
"""

def __init__(self, window_size: int):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please provide return type hint for the function: __init__. If the function does not return a value, please provide the type hint as: def function() -> None:

if window_size < 1:
raise ValueError("Window size must be a positive integer")
self.window_size = window_size
self.window: deque[float] = deque(maxlen=window_size)
self.total: float = 0.0 # Placeholder for total sum.
self._mov_avg: float = 0.0 # Placeholder for online moving average.

def update(self, new_value: float) -> None:
"""Update the moving average with a new data point.

Parameters
----------
new_value : float
The new data point to update the moving average.

Raises
------
TypeError
If the type of new value is not float.
"""
if not isinstance(new_value, float):
raise TypeError("Type of new_value must be either float.")
self.window.append(new_value)
self.total += new_value
self._mov_avg = self.total / len(self.window)

@property
def mov_avg(self) -> float:
"""Return the current moving average value."""
return self._mov_avg


if __name__ == "__main__":
import doctest
import random

doctest.testmod()
moving_average_calculator = StreamingMovingAverage(window_size=3)
example_data = [float(random.randint(a=1, b=10)) for _ in range(10)]
for value in example_data:
print(f"adding {value=} to streaming data.")
moving_average_calculator.update(new_value=value)
print(f"updated moving average={moving_average_calculator.mov_avg}")