Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion DIRECTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,10 @@
* [Test Non Overlapping Intervals](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/arrays/non_overlapping_intervals/test_non_overlapping_intervals.py)
* Subarrays With Fixed Bounds
* [Test Subarrays With Fixed Bounds](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/arrays/subarrays_with_fixed_bounds/test_subarrays_with_fixed_bounds.py)
* Circular Buffer
* [Circular Buffer](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/circular_buffer/circular_buffer.py)
* [Exceptions](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/circular_buffer/exceptions.py)
* [Test Circular Buffer](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/circular_buffer/test_circular_buffer.py)
* Dicts
* [Default Dicts](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/dicts/default_dicts.py)
* [Ordered Dict](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/dicts/ordered_dict.py)
Expand Down Expand Up @@ -325,6 +329,12 @@
* [Test Min Stack](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/stacks/minstack/test_min_stack.py)
* [Test Stacks](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/stacks/test_stacks.py)
* Streams
* Moving Average
* [Exponential Moving Average](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/moving_average/exponential_moving_average.py)
* [Moving Average](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/moving_average/moving_average.py)
* [Moving Average With Buffer](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/moving_average/moving_average_with_buffer.py)
* [Test Moving Average](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/moving_average/test_moving_average.py)
* [Weighted Moving Average](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/moving_average/weighted_moving_average.py)
* Stream Checker
* [Test Stream Checker](https://github.com/BrianLusina/PythonSnips/blob/master/datastructures/streams/stream_checker/test_stream_checker.py)
* Timemap
Expand Down Expand Up @@ -930,7 +940,6 @@
* Linked List
* [Test Reorder List](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/linked_list/test_reorder_list.py)
* [Test Build One 2 3](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/test_build_one_2_3.py)
* [Test Circular Buffer](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/test_circular_buffer.py)
* [Test Consecutive](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/test_consecutive.py)
* [Test Data Reverse](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/test_data_reverse.py)
* [Test Dominator](https://github.com/BrianLusina/PythonSnips/blob/master/tests/datastructures/test_dominator.py)
Expand Down
56 changes: 11 additions & 45 deletions datastructures/circular_buffer/__init__.py
Original file line number Diff line number Diff line change
@@ -1,45 +1,11 @@
class BufferFullException(Exception):
pass


class BufferEmptyException(Exception):
pass


class CircularBuffer:
def __init__(self, size):
self.buffer = bytearray(size)
self.read_point = 0
self.write_point = 0

def _update_buffer(self, data):
"""
Protected helper method for Python 2/3
"""
try:
self.buffer[self.write_point] = data
except TypeError:
self.buffer[self.write_point] = ord(data)

def clear(self):
self.buffer = bytearray(len(self.buffer))

def write(self, data):
if all(self.buffer):
raise BufferFullException
self._update_buffer(data)
self.write_point = (self.write_point + 1) % len(self.buffer)

def read(self):
if not any(self.buffer):
raise BufferEmptyException
data = chr(self.buffer[self.read_point])
self.buffer[self.read_point] = 0
self.read_point = (self.read_point + 1) % len(self.buffer)
return data

def overwrite(self, data):
self._update_buffer(data)
if all(self.buffer) and self.read_point == self.write_point:
self.read_point = (self.read_point + 1) % len(self.buffer)
self.write_point = (self.write_point + 1) % len(self.buffer)
from datastructures.circular_buffer.circular_buffer import CircularBuffer
from datastructures.circular_buffer.exceptions import (
BufferFullException,
BufferEmptyException,
)

__all__ = [
"CircularBuffer",
"BufferFullException",
"BufferEmptyException",
]
43 changes: 43 additions & 0 deletions datastructures/circular_buffer/circular_buffer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from datastructures.circular_buffer.exceptions import (
BufferFullException,
BufferEmptyException,
)


class CircularBuffer:
def __init__(self, size):
self.buffer = bytearray(size)
self.read_point = 0
self.write_point = 0

def _update_buffer(self, data):
"""
Protected helper method for Python 2/3
"""
try:
self.buffer[self.write_point] = data
except TypeError:
self.buffer[self.write_point] = ord(data)

def clear(self):
self.buffer = bytearray(len(self.buffer))

def write(self, data):
if all(self.buffer):
raise BufferFullException
self._update_buffer(data)
self.write_point = (self.write_point + 1) % len(self.buffer)

def read(self):
if not any(self.buffer):
raise BufferEmptyException
data = chr(self.buffer[self.read_point])
self.buffer[self.read_point] = 0
self.read_point = (self.read_point + 1) % len(self.buffer)
return data

def overwrite(self, data):
self._update_buffer(data)
if all(self.buffer) and self.read_point == self.write_point:
self.read_point = (self.read_point + 1) % len(self.buffer)
self.write_point = (self.write_point + 1) % len(self.buffer)
6 changes: 6 additions & 0 deletions datastructures/circular_buffer/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
class BufferFullException(Exception):
pass


class BufferEmptyException(Exception):
pass
62 changes: 62 additions & 0 deletions datastructures/streams/moving_average/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# Moving Average from Data Stream

Given a stream of integers and a window size, calculate the moving average of all integers in the sliding window.
Implement a class called MovingAverage that has the following methods:

Constructor (int size): This constructor initializes the object with the specified window size.

double next (int val): This method takes an integer value as input and returns the moving average of the last size
values from the stream.

## Constraints

- 1 <= size <= 100
- -10^3 <= val <= 10^3
- At most 10^2 calls will be made to next

## Examples

![Example 1](./images/examples/moving_average_example_1.png)

## Solution

The algorithm calculates the moving average of the most recent values within a fixed-size window. It employs a queue to
store these values and maintains a running sum for efficient computation. Each time a new value is added to the window,
it is appended to the queue, and the running sum is updated accordingly. If the queue exceeds the specified size (i.e.,
the window is full), the oldest value is removed from the queue, and the sum is adjusted by subtracting this value. The
moving average is then calculated by dividing the running sum by the number of values in the queue. This approach ensures
that the moving average is updated, achieving constant time complexity for each operation.

1. **Constructor**: The constructor initializes the following variables:
- _size_: This represents the window size, the maximum number of recent values for calculating the moving average.
- _queue_: A queue data structure stores the most recent values up to the specified window size. The queue supports
efficient operations for adding new values to the end and removing the oldest value from the front, essential for
maintaining the sliding window.
- _window_sum_: This keeps a running sum of the values currently in the window. This allows the moving average to be
calculated efficiently without the need to sum all values repeatedly.

2. The **next** method: The next method calculates the moving average by following these steps:
- Enqueue the new value (val) to queue and add it to window_sum. This effectively extends the current sliding window
to include the new value.
- If the number of elements in queue exceeds size (indicating the window is full), remove the oldest value from queue
and update window_sum by subtracting this value. This ensures the queue size stays within the specified window size.
- Compute the moving average as window_sum / len(queue). Use floating-point division to ensure the result is a float.

### Time Complexity

1. **Constructor**: Initializing the MovingAverage instance with a given size takes O(1) time. This is because the operation
involves only setting the size, initializing an empty queue, and initializing the sum.
2. **Next method**:
- Appending a value: Adding a value to the queue takes O(1) time as operations like append and popleft are optimized
for constant time execution in a queue.
- Removing the oldest value: If the window is full, removing the oldest value also takes O(1) time, as popleft is a
constant time operation in a queue.
- Calculating the moving average: This step involves dividing window_sum by the number of elements in the queue,
which is an O(1) operation.

Thus the time complexity of the next method is O(1).

### Space Complexity

The queue holds up to _size_ elements at any given time (i.e., the number of elements in the sliding window). Therefore,
the space complexity of the queue is O(size), where size is the maximum window size.
6 changes: 6 additions & 0 deletions datastructures/streams/moving_average/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from datastructures.streams.moving_average.moving_average_with_buffer import (
MovingAverageWithBuffer,
)
from datastructures.streams.moving_average.moving_average import MovingAverage

__all__ = ["MovingAverage", "MovingAverageWithBuffer"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
class ExponentialMovingAverage:
"""
The Exponential Moving Average (EMA) is widely used in finance and signal processing because it reacts faster to
recent price changes than a simple moving average.

The beauty of the EMA is its efficiency: it does not require a buffer or a window of previous values. You only need
to store the previous EMA result. This makes the time and space complexity both O(1).

Why use EMA?
1. Reduced Lag: Because it weights the most recent data most heavily, it catches trend reversals much sooner than an SMA.
2. Memory Efficiency: You don't need to store a list of numbers; you only need to store one variable (self.ema).
3. Smoothness: It creates a smooth curve that isn't as sensitive to an old "outlier" dropping out of the window (a common issue with SMA).
"""

def __init__(self, size: int):
self.size = size
self.alpha = 2 / (size + 1)
self.ema = None # Initialized with the first value received

def next(self, val: int) -> float:
if self.ema is None:
# The first value acts as the starting point
self.ema = float(val)
else:
# Apply the EMA formula
self.ema = (val * self.alpha) + (self.ema * (1 - self.alpha))

return self.ema
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
34 changes: 34 additions & 0 deletions datastructures/streams/moving_average/moving_average.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
from typing import Deque
from collections import deque


class MovingAverage:
def __init__(self, size: int):
"""
Initializes the moving average object
Args:
size (int): The size of the moving average
"""
self.queue: Deque[int] = deque()
self.size: int = size
self.window_sum: float = 0.0

def next(self, val: int) -> float:
"""
Adds a value to the stream and returns the moving average of the stream
Args:
val (int): The value to add to the stream
Returns:
float: The moving average of the stream
"""
if len(self.queue) == self.size:
# remove oldest value
oldest_value = self.queue.popleft()
self.window_sum -= oldest_value

# add new value to queue
self.queue.append(val)
self.window_sum += val

# calculate average
return self.window_sum / len(self.queue)
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
class MovingAverageWithBuffer:
"""
Using a Circular Buffer (implemented with a fixed-size list) is an excellent way to optimize memory. Instead of
dynamically resizing or shifting elements, we use a fixed array and a pointer that "wraps around" using the modulo
operator (index(modsize)).

This approach is often preferred in embedded systems or high-performance scenarios because it avoids the overhead of
frequent memory allocations.

The expression self.head = (self.head + 1) % self.size ensures that if our size is 3, the pointer sequence will be:
0 → 1 → 2 → 0 → 1...

This effectively "recycles" the array positions, making it behave like a continuous loop.
"""

def __init__(self, size: int):
# Pre-allocate a list of zeros
self.size = size
self.buffer = [0] * size
self.head = 0 # Pointer to the next position to overwrite
self.count = 0 # Track how many elements we've actually added
self.current_sum = 0.0

def next(self, val: int) -> float:
# If the buffer is full, subtract the value we are about to overwrite
if self.count == self.size:
self.current_sum -= self.buffer[self.head]
else:
self.count += 1

# Overwrite the old value at the head pointer
self.buffer[self.head] = val
self.current_sum += val

# Move the pointer to the next index, wrapping around if at the end
self.head = (self.head + 1) % self.size

return self.current_sum / self.count
40 changes: 40 additions & 0 deletions datastructures/streams/moving_average/test_moving_average.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import unittest
from typing import List, Tuple
from parameterized import parameterized
from datastructures.streams.moving_average import MovingAverage
from datastructures.streams.moving_average.moving_average_with_buffer import (
MovingAverageWithBuffer,
)


TEST_CASES = [
(3, [(1, 1.00000), (10, 5.50000), (3, 4.66667), (5, 6.00000)]),
(2, [(7, 7.00000), (14, 10.50000), (21, 17.50000), (28, 24.50000), (35, 31.50000)]),
(3, [(5, 5.00000), (10, 7.50000), (15, 10.00000), (20, 15.00000)]),
(10, [(1, 1.00000), (2, 1.50000)]),
(100, [(-100, -100.00000)]),
]


class MyTestCase(unittest.TestCase):
@parameterized.expand(TEST_CASES)
def test_moving_average(self, size: int, data_to_expected: List[Tuple[int, float]]):
moving_average = MovingAverage(size)
for data, expected in data_to_expected:
actual = moving_average.next(data)
round(actual, 5)
self.assertEqual(expected, round(actual, 5))

@parameterized.expand(TEST_CASES)
def test_moving_average_with_buffer(
self, size: int, data_to_expected: List[Tuple[int, float]]
):
moving_average = MovingAverageWithBuffer(size)
for data, expected in data_to_expected:
actual = moving_average.next(data)
round(actual, 5)
self.assertEqual(expected, round(actual, 5))


if __name__ == "__main__":
unittest.main()
Loading