From 628d4cf92f91bc41c02cc58cc12ed60c6cc33412 Mon Sep 17 00:00:00 2001 From: Marioman2023 Date: Sun, 10 Aug 2025 12:11:57 +1200 Subject: [PATCH 1/6] Implements a Von Neumann cellular automaton as well as two new entropy coding data compression methods. --- cellular_automata/Von_Neumann_CA.py | 94 +++++++++++++++++++++++++ data_compression/Arithmetic_Coding.py | 69 +++++++++++++++++++ data_compression/CABAC.py | 98 +++++++++++++++++++++++++++ 3 files changed, 261 insertions(+) create mode 100644 cellular_automata/Von_Neumann_CA.py create mode 100644 data_compression/Arithmetic_Coding.py create mode 100644 data_compression/CABAC.py diff --git a/cellular_automata/Von_Neumann_CA.py b/cellular_automata/Von_Neumann_CA.py new file mode 100644 index 000000000000..cc4e93e10cc4 --- /dev/null +++ b/cellular_automata/Von_Neumann_CA.py @@ -0,0 +1,94 @@ +""" +Von Neumann CA with multi-state fading "heatmap" effect in the terminal. + +Requirements: numpy + +Rules: + - Uses Von Neumann neighborhood (4 neighbors). + - Alive cells follow BIRTH / SURVIVE rules. + - Dead cells fade out gradually through colored stages. +""" + +import numpy as np +import os +import time + +# ---------- Configuration ---------- +GRID_SIZE = (20, 40) # (rows, cols) +PROB_ALIVE = 0.25 # initial alive probability +WRAP = True # wrap edges (toroidal) +BIRTH = {3} # birth neighbor counts +SURVIVE = {1, 2} # survival neighbor counts +SLEEP_TIME = 0.1 # seconds between frames +MAX_AGE = 5 # how many steps before a cell fully disappears (0 = dead) +COLORS = { # Characters & colors for different ages + 0: " ", # dead + 1: "\033[92m█\033[0m", # bright green (newborn) + 2: "\033[93m█\033[0m", # yellow + 3: "\033[91m█\033[0m", # red + 4: "\033[31m░\033[0m", # dim red fading + 5: "\033[90m·\033[0m", # grey dust +} +# ----------------------------------- + +def make_initial_grid(shape, prob_alive, seed=None): + rng = np.random.default_rng(seed) + alive = (rng.random(shape) < prob_alive).astype(np.uint8) + return alive.astype(np.uint8) # age 1 for alive, 0 for dead + +def count_von_neumann_neighbors(alive_mask, wrap=True): + """Count Von Neumann neighbors (4 directions)""" + up = np.roll(alive_mask, -1, axis=0) + down = np.roll(alive_mask, 1, axis=0) + left = np.roll(alive_mask, -1, axis=1) + right = np.roll(alive_mask, 1, axis=1) + counts = up + down + left + right + + if not wrap: + counts[ 0, :] -= alive_mask[-1, :] + counts[-1, :] -= alive_mask[ 0, :] + counts[ :, 0] -= alive_mask[ :, -1] + counts[ :, -1] -= alive_mask[ :, 0] + counts = np.clip(counts, 0, 4) + + return counts + +def step(age_grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE): + alive_mask = age_grid > 0 + neighbor_counts = count_von_neumann_neighbors(alive_mask.astype(np.uint8), wrap=wrap) + + born_mask = (~alive_mask) & np.isin(neighbor_counts, list(birth)) + survive_mask = alive_mask & np.isin(neighbor_counts, list(survive)) + + new_age_grid = age_grid.copy() + + # Alive cells that survive get age reset to 1 if born, else age increment + new_age_grid[born_mask] = 1 + new_age_grid[survive_mask & alive_mask] = 1 # reset alive age for fresh color + + # Fade out dead cells + fade_mask = (~born_mask) & (~survive_mask) + new_age_grid[fade_mask & (new_age_grid > 0)] += 1 + new_age_grid[new_age_grid > max_age] = 0 # fully dead + + return new_age_grid + +def display(age_grid): + """Render grid with colors for each age stage""" + os.system("cls" if os.name == "nt" else "clear") + for row in age_grid: + print("".join(COLORS.get(age, COLORS[MAX_AGE]) for age in row)) + +def main(): + grid = make_initial_grid(GRID_SIZE, prob_alive=PROB_ALIVE) + + try: + while True: + display(grid) + grid = step(grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE) + time.sleep(SLEEP_TIME) + except KeyboardInterrupt: + print("\nStopped.") + +if __name__ == "__main__": + main() diff --git a/data_compression/Arithmetic_Coding.py b/data_compression/Arithmetic_Coding.py new file mode 100644 index 000000000000..5c3dece757c0 --- /dev/null +++ b/data_compression/Arithmetic_Coding.py @@ -0,0 +1,69 @@ +from collections_extended import bag +from decimal import Decimal, getcontext + +# Set high precision for decimal calculations +getcontext().prec = 50 + +def build_probability_table(data): + """Returns a dictionary int the form (symbol: probability)""" + freq = bag(data) # A bag is like a multiset + return {char: Decimal(freq.count(char)) / Decimal(len(data)) for char in set(freq)} + +def arithmetic_encode(data, prob_table): + """Preforms arithmetic coding compression""" + symbols = sorted(prob_table.keys()) + cumulative = {} + cumulative_sum = Decimal('0.0') + for sym in symbols: + cumulative[sym] = cumulative_sum + cumulative_sum += prob_table[sym] + + low, high = Decimal('0.0'), Decimal('1.0') + for symbol in data: + range_ = high - low + high = low + range_ * (cumulative[symbol] + prob_table[symbol]) + low = low + range_ * cumulative[symbol] + + return (low + high) / 2, len(data) + +def arithmetic_decode(encoded_value, length, prob_table): + """Decodes an arithmetic-coded value""" + symbols = sorted(prob_table.keys()) + cumulative = {} + cumulative_sum = Decimal('0.0') + for sym in symbols: + cumulative[sym] = cumulative_sum + cumulative_sum += prob_table[sym] + + result = [] + low, high = Decimal('0.0'), Decimal('1.0') + value = Decimal(str(encoded_value)) + + for _ in range(length): + range_ = high - low + for sym in symbols: + sym_low = low + range_ * cumulative[sym] + sym_high = sym_low + range_ * prob_table[sym] + if sym_low <= value < sym_high: + result.append(sym) + low, high = sym_low, sym_high + break + + return ''.join(result) + +if __name__ == "__main__": + text = "this is text used for testing" + print(f"Original: {text}") + + prob_table = build_probability_table(text) + encoded_value, length = arithmetic_encode(text, prob_table) + print(f"Encoded value: {encoded_value}") + + decoded_text = arithmetic_decode(encoded_value, length, prob_table) + print(f"Decoded: {decoded_text}") + + # Show compression ratio + import sys + original_size = sys.getsizeof(text) + encoded_size = sys.getsizeof(str(encoded_value)) + print(f"Compression ratio: {original_size / encoded_size:.2f}") diff --git a/data_compression/CABAC.py b/data_compression/CABAC.py new file mode 100644 index 000000000000..ab92bfaa7097 --- /dev/null +++ b/data_compression/CABAC.py @@ -0,0 +1,98 @@ +class CABAC: + def __init__(self): + self.low = 0 + self.high = (1 << 32) - 1 + self.context = [0.5] * 256 # probability model for 256 contexts + + def _update_context(self, ctx, bit): + # Simple adaptation: move probability toward observed bit + alpha = 0.05 + self.context[ctx] = (1 - alpha) * self.context[ctx] + alpha * bit + + def encode_bit(self, bit, ctx, output): + prob = self.context[ctx] + range_ = self.high - self.low + 1 + split = self.low + int(range_ * prob) + + if bit == 0: + self.high = split + else: + self.low = split + 1 + + while (self.high ^ self.low) < (1 << 24): + output.append((self.high >> 24) & 0xFF) + self.low = (self.low << 8) & 0xFFFFFFFF + self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF + + self._update_context(ctx, bit) + + def finish_encoding(self, output): + for _ in range(4): + output.append((self.low >> 24) & 0xFF) + self.low = (self.low << 8) & 0xFFFFFFFF + + def decode_bit(self, ctx, input_bits): + prob = self.context[ctx] + range_ = self.high - self.low + 1 + split = self.low + int(range_ * prob) + + if self.code <= split: + self.high = split + bit = 0 + else: + self.low = split + 1 + bit = 1 + + while (self.high ^ self.low) < (1 << 24): + self.code = ((self.code << 8) & 0xFFFFFFFF) | next(input_bits) + self.low = (self.low << 8) & 0xFFFFFFFF + self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF + + self._update_context(ctx, bit) + return bit + + def start_decoding(self, encoded_bytes): + self.low = 0 + self.high = (1 << 32) - 1 + self.code = 0 + input_bits = iter(encoded_bytes) + for _ in range(4): + self.code = (self.code << 8) | next(input_bits) + return input_bits + + +def string_to_bits(s): + return [(byte >> i) & 1 for byte in s.encode('utf-8') for i in range(7, -1, -1)] + +def bits_to_string(bits): + b = bytearray() + for i in range(0, len(bits), 8): + byte = 0 + for bit in bits[i:i+8]: + byte = (byte << 1) | bit + b.append(byte) + return b.decode('utf-8') + +def main(text: str): + encoder = CABAC() + output_bytes = [] + bits = string_to_bits(text) + + for i, bit in enumerate(bits): + ctx = i % 256 # simple positional context + encoder.encode_bit(bit, ctx, output_bytes) + encoder.finish_encoding(output_bytes) + + # Decode + decoder = CABAC() + bitstream = decoder.start_decoding(iter(output_bytes)) + decoded_bits = [decoder.decode_bit(i % 256, bitstream) for i in range(len(bits))] + decoded_text = bits_to_string(decoded_bits) + + print("Original:", text) + print("Decoded :", decoded_text) + print("Compressed size (bytes):", len(output_bytes)) + +if __name__ == "__main__": + # Example usage + main("Hello CABAC!") From 1a1a462fe4cc624d9505368b3a9e32d28d349330 Mon Sep 17 00:00:00 2001 From: Marioman2023 Date: Sun, 10 Aug 2025 13:04:27 +1200 Subject: [PATCH 2/6] Improvements --- cellular_automata/Von_Neumann_CA.py | 94 ---- cellular_automata/von_neumann.py | 302 ++++++++++++ data_compression/Arithmetic_Coding.py | 344 ++++++++++--- data_compression/CABAC.py | 98 ---- ...ontext_adaptive_binary_arithmetic_coder.py | 460 ++++++++++++++++++ 5 files changed, 1050 insertions(+), 248 deletions(-) delete mode 100644 cellular_automata/Von_Neumann_CA.py create mode 100644 cellular_automata/von_neumann.py delete mode 100644 data_compression/CABAC.py create mode 100644 data_compression/context_adaptive_binary_arithmetic_coder.py diff --git a/cellular_automata/Von_Neumann_CA.py b/cellular_automata/Von_Neumann_CA.py deleted file mode 100644 index cc4e93e10cc4..000000000000 --- a/cellular_automata/Von_Neumann_CA.py +++ /dev/null @@ -1,94 +0,0 @@ -""" -Von Neumann CA with multi-state fading "heatmap" effect in the terminal. - -Requirements: numpy - -Rules: - - Uses Von Neumann neighborhood (4 neighbors). - - Alive cells follow BIRTH / SURVIVE rules. - - Dead cells fade out gradually through colored stages. -""" - -import numpy as np -import os -import time - -# ---------- Configuration ---------- -GRID_SIZE = (20, 40) # (rows, cols) -PROB_ALIVE = 0.25 # initial alive probability -WRAP = True # wrap edges (toroidal) -BIRTH = {3} # birth neighbor counts -SURVIVE = {1, 2} # survival neighbor counts -SLEEP_TIME = 0.1 # seconds between frames -MAX_AGE = 5 # how many steps before a cell fully disappears (0 = dead) -COLORS = { # Characters & colors for different ages - 0: " ", # dead - 1: "\033[92m█\033[0m", # bright green (newborn) - 2: "\033[93m█\033[0m", # yellow - 3: "\033[91m█\033[0m", # red - 4: "\033[31m░\033[0m", # dim red fading - 5: "\033[90m·\033[0m", # grey dust -} -# ----------------------------------- - -def make_initial_grid(shape, prob_alive, seed=None): - rng = np.random.default_rng(seed) - alive = (rng.random(shape) < prob_alive).astype(np.uint8) - return alive.astype(np.uint8) # age 1 for alive, 0 for dead - -def count_von_neumann_neighbors(alive_mask, wrap=True): - """Count Von Neumann neighbors (4 directions)""" - up = np.roll(alive_mask, -1, axis=0) - down = np.roll(alive_mask, 1, axis=0) - left = np.roll(alive_mask, -1, axis=1) - right = np.roll(alive_mask, 1, axis=1) - counts = up + down + left + right - - if not wrap: - counts[ 0, :] -= alive_mask[-1, :] - counts[-1, :] -= alive_mask[ 0, :] - counts[ :, 0] -= alive_mask[ :, -1] - counts[ :, -1] -= alive_mask[ :, 0] - counts = np.clip(counts, 0, 4) - - return counts - -def step(age_grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE): - alive_mask = age_grid > 0 - neighbor_counts = count_von_neumann_neighbors(alive_mask.astype(np.uint8), wrap=wrap) - - born_mask = (~alive_mask) & np.isin(neighbor_counts, list(birth)) - survive_mask = alive_mask & np.isin(neighbor_counts, list(survive)) - - new_age_grid = age_grid.copy() - - # Alive cells that survive get age reset to 1 if born, else age increment - new_age_grid[born_mask] = 1 - new_age_grid[survive_mask & alive_mask] = 1 # reset alive age for fresh color - - # Fade out dead cells - fade_mask = (~born_mask) & (~survive_mask) - new_age_grid[fade_mask & (new_age_grid > 0)] += 1 - new_age_grid[new_age_grid > max_age] = 0 # fully dead - - return new_age_grid - -def display(age_grid): - """Render grid with colors for each age stage""" - os.system("cls" if os.name == "nt" else "clear") - for row in age_grid: - print("".join(COLORS.get(age, COLORS[MAX_AGE]) for age in row)) - -def main(): - grid = make_initial_grid(GRID_SIZE, prob_alive=PROB_ALIVE) - - try: - while True: - display(grid) - grid = step(grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE) - time.sleep(SLEEP_TIME) - except KeyboardInterrupt: - print("\nStopped.") - -if __name__ == "__main__": - main() diff --git a/cellular_automata/von_neumann.py b/cellular_automata/von_neumann.py new file mode 100644 index 000000000000..585273274600 --- /dev/null +++ b/cellular_automata/von_neumann.py @@ -0,0 +1,302 @@ +""" +Von Neumann cellular automaton with multi-state fading "heatmap" effect. + +This implementation demonstrates a Von Neumann cellular automaton where cells +follow custom birth/survive rules and dead cells fade gradually through multiple +visual states, creating a heatmap-like effect. + +Based on Von Neumann cellular automata architecture: +https://en.wikipedia.org/wiki/Von_Neumann_cellular_automaton + +Von Neumann neighborhood reference: +https://en.wikipedia.org/wiki/Von_Neumann_neighborhood + +Requirements: numpy +""" + +import numpy as np +from typing import Set, Tuple, Dict, Optional + + +def create_random_grid( + rows: int, columns: int, alive_probability: float, seed: Optional[int] = None +) -> np.ndarray: + """ + Create initial grid with randomly distributed alive cells. + + Args: + rows: Number of grid rows + columns: Number of grid columns + alive_probability: Probability (0.0-1.0) of each cell being initially alive + seed: Random seed for reproducibility + + Returns: + 2D numpy array where 1 represents alive cells, 0 represents dead cells + + Raises: + ValueError: If alive_probability is not between 0 and 1 + ValueError: If rows or columns are not positive integers + + Examples: + >>> grid = create_random_grid(3, 3, 0.5, seed=42) + >>> grid.shape + (3, 3) + >>> bool(np.all((grid == 0) | (grid == 1))) + True + >>> grid.dtype + dtype('uint8') + + >>> create_random_grid(0, 3, 0.5) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Rows and columns must be positive integers + + >>> create_random_grid(3, 3, 1.5) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: alive_probability must be between 0.0 and 1.0 + """ + if rows <= 0 or columns <= 0: + raise ValueError("Rows and columns must be positive integers") + if not 0.0 <= alive_probability <= 1.0: + raise ValueError("alive_probability must be between 0.0 and 1.0") + + rng = np.random.default_rng(seed) + alive_cells = (rng.random((rows, columns)) < alive_probability).astype(np.uint8) + return alive_cells + + +def count_von_neumann_neighbors( + alive_mask: np.ndarray, use_wraparound: bool = True +) -> np.ndarray: + """ + Count Von Neumann neighbors for each cell (4-directional neighborhood). + + The Von Neumann neighborhood consists of the four orthogonally adjacent cells + (up, down, left, right) but excludes diagonal neighbors. + + Args: + alive_mask: Binary 2D array where 1 represents alive cells + use_wraparound: If True, edges wrap around (toroidal topology) + + Returns: + 2D array with neighbor counts (0-4) for each cell + + Raises: + ValueError: If alive_mask is not 2D or contains invalid values + + Examples: + >>> mask = np.array([[1, 0, 1], [0, 1, 0], [1, 0, 1]], dtype=np.uint8) + >>> counts = count_von_neumann_neighbors(mask, use_wraparound=False) + >>> int(counts[1, 1]) # center cell has 0 neighbors (all adjacent are 0) + 0 + >>> int(counts[0, 1]) # top middle has 3 neighbors (down, left, right are 1) + 3 + + >>> mask_simple = np.array([[1, 1], [1, 0]], dtype=np.uint8) + >>> counts_simple = count_von_neumann_neighbors(mask_simple, use_wraparound=False) + >>> int(counts_simple[0, 0]) # top-left has 2 neighbors (right and down) + 2 + + >>> invalid_mask = np.array([1, 2, 3]) # doctest: +IGNORE_EXCEPTION_DETAIL + >>> count_von_neumann_neighbors(invalid_mask) + Traceback (most recent call last): + ValueError: alive_mask must be a 2D array + """ + if alive_mask.ndim != 2: + raise ValueError("alive_mask must be a 2D array") + if not np.all((alive_mask == 0) | (alive_mask == 1)): + raise ValueError("alive_mask must contain only 0s and 1s") + + rows, cols = alive_mask.shape + neighbor_counts = np.zeros((rows, cols), dtype=np.uint8) + + if use_wraparound: + # Use rolling for wraparound + up_neighbors = np.roll(alive_mask, -1, axis=0) + down_neighbors = np.roll(alive_mask, 1, axis=0) + left_neighbors = np.roll(alive_mask, -1, axis=1) + right_neighbors = np.roll(alive_mask, 1, axis=1) + neighbor_counts = up_neighbors + down_neighbors + left_neighbors + right_neighbors + else: + # Manually count neighbors without wraparound + for r in range(rows): + for c in range(cols): + count = 0 + # Check up + if r > 0 and alive_mask[r-1, c]: + count += 1 + # Check down + if r < rows-1 and alive_mask[r+1, c]: + count += 1 + # Check left + if c > 0 and alive_mask[r, c-1]: + count += 1 + # Check right + if c < cols-1 and alive_mask[r, c+1]: + count += 1 + neighbor_counts[r, c] = count + + return neighbor_counts + + +def apply_cellular_automaton_rules( + current_ages: np.ndarray, + birth_neighbor_counts: Set[int], + survival_neighbor_counts: Set[int], + maximum_age: int = 5, + use_wraparound: bool = True, +) -> np.ndarray: + """ + Apply cellular automaton rules to advance the grid by one generation. + + Cells are born when they have a neighbor count in birth_neighbor_counts. + Living cells survive when they have a neighbor count in survival_neighbor_counts. + Dead cells age and eventually disappear completely. + + Args: + current_ages: 2D array where values represent cell ages (0 = dead, >0 = alive) + birth_neighbor_counts: Set of neighbor counts that cause cell birth + survival_neighbor_counts: Set of neighbor counts that allow cell survival + maximum_age: Maximum age before cell disappears completely + use_wraparound: Whether to use wraparound boundaries + + Returns: + New 2D array with updated cell ages after applying rules + + Raises: + ValueError: If inputs are invalid + + Examples: + >>> ages = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=np.uint8) + >>> new_ages = apply_cellular_automaton_rules( + ... ages, birth_neighbor_counts={2}, + ... survival_neighbor_counts={2, 3}, use_wraparound=False + ... ) + >>> bool(new_ages[0, 0] > 0) # corner should be born (2 neighbors: right and down) + True + + >>> # Test aging of dead cells + >>> dead_aging = np.array([[2, 0, 0]], dtype=np.uint8) # age 2, no survival + >>> result = apply_cellular_automaton_rules( + ... dead_aging, birth_neighbor_counts=set(), + ... survival_neighbor_counts=set(), maximum_age=3 + ... ) + >>> bool(result[0, 0] == 3) # should age from 2 to 3 + True + + >>> apply_cellular_automaton_rules(np.array([1, 2]), {1}, {1}) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: current_ages must be a 2D array + """ + if current_ages.ndim != 2: + raise ValueError("current_ages must be a 2D array") + if maximum_age < 1: + raise ValueError("maximum_age must be at least 1") + + alive_cells_mask = current_ages > 0 + neighbor_counts = count_von_neumann_neighbors( + alive_cells_mask.astype(np.uint8), use_wraparound + ) + + # Determine which cells are born or survive + birth_mask = (~alive_cells_mask) & np.isin(neighbor_counts, list(birth_neighbor_counts)) + survival_mask = alive_cells_mask & np.isin(neighbor_counts, list(survival_neighbor_counts)) + + new_ages = current_ages.copy() + + # Set ages for newly born cells + new_ages[birth_mask] = 1 + + # Reset age for surviving cells (keeps them visually fresh) + new_ages[survival_mask] = 1 + + # Age cells that neither survive nor get born + fade_mask = (~birth_mask) & (~survival_mask) + new_ages[fade_mask & (new_ages > 0)] += 1 + + # Remove cells that have exceeded maximum age + new_ages[new_ages > maximum_age] = 0 + + return new_ages + + +def simulate_von_neumann_cellular_automaton( + grid_rows: int = 20, + grid_columns: int = 40, + initial_alive_probability: float = 0.25, + birth_rules: Set[int] = None, + survival_rules: Set[int] = None, + maximum_cell_age: int = 5, + generations: int = 100, + random_seed: Optional[int] = None, + use_wraparound_edges: bool = True, +) -> list[np.ndarray]: + """ + Run a complete Von Neumann cellular automaton simulation. + + This function creates an initial random grid and evolves it through multiple + generations according to the specified birth and survival rules. + + Args: + grid_rows: Number of rows in the grid + grid_columns: Number of columns in the grid + initial_alive_probability: Initial probability of cells being alive + birth_rules: Set of neighbor counts that cause birth (default: {3}) + survival_rules: Set of neighbor counts that allow survival (default: {1, 2}) + maximum_cell_age: Maximum age before cells disappear (default: 5) + generations: Number of generations to simulate + random_seed: Seed for random number generation + use_wraparound_edges: Whether to use toroidal topology + + Returns: + List of 2D numpy arrays representing each generation + + Raises: + ValueError: If parameters are invalid + + Examples: + >>> result = simulate_von_neumann_cellular_automaton( + ... grid_rows=5, grid_columns=5, generations=3, random_seed=42 + ... ) + >>> len(result) == 3 + True + >>> all(grid.shape == (5, 5) for grid in result) + True + + >>> simulate_von_neumann_cellular_automaton(generations=0) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: generations must be positive + """ + if birth_rules is None: + birth_rules = {3} + if survival_rules is None: + survival_rules = {1, 2} + + if generations <= 0: + raise ValueError("generations must be positive") + if grid_rows <= 0 or grid_columns <= 0: + raise ValueError("grid dimensions must be positive") + + # Initialize grid + current_grid = create_random_grid( + grid_rows, grid_columns, initial_alive_probability, random_seed + ) + + generation_history = [] + + # Run simulation for specified number of generations + for _ in range(generations): + generation_history.append(current_grid.copy()) + current_grid = apply_cellular_automaton_rules( + current_grid, + birth_rules, + survival_rules, + maximum_cell_age, + use_wraparound_edges, + ) + + return generation_history + + +if __name__ == "__main__": + import doctest + doctest.testmod(verbose=True) diff --git a/data_compression/Arithmetic_Coding.py b/data_compression/Arithmetic_Coding.py index 5c3dece757c0..490ebf86259c 100644 --- a/data_compression/Arithmetic_Coding.py +++ b/data_compression/Arithmetic_Coding.py @@ -1,69 +1,301 @@ -from collections_extended import bag +""" +Arithmetic coding compression algorithm implementation. + +Arithmetic coding is a form of entropy encoding used in lossless data compression. +It encodes the entire message into a single number, representing a fraction between 0 and 1. + +Algorithm reference: +https://en.wikipedia.org/wiki/Arithmetic_coding + +Data compression techniques: +https://en.wikipedia.org/wiki/Data_compression + +Requirements: None (uses only Python standard library) +""" + +from collections import Counter from decimal import Decimal, getcontext +from typing import Dict, Tuple, List, Union # Set high precision for decimal calculations getcontext().prec = 50 -def build_probability_table(data): - """Returns a dictionary int the form (symbol: probability)""" - freq = bag(data) # A bag is like a multiset - return {char: Decimal(freq.count(char)) / Decimal(len(data)) for char in set(freq)} -def arithmetic_encode(data, prob_table): - """Preforms arithmetic coding compression""" - symbols = sorted(prob_table.keys()) - cumulative = {} - cumulative_sum = Decimal('0.0') - for sym in symbols: - cumulative[sym] = cumulative_sum - cumulative_sum += prob_table[sym] - - low, high = Decimal('0.0'), Decimal('1.0') - for symbol in data: - range_ = high - low - high = low + range_ * (cumulative[symbol] + prob_table[symbol]) - low = low + range_ * cumulative[symbol] - - return (low + high) / 2, len(data) - -def arithmetic_decode(encoded_value, length, prob_table): - """Decodes an arithmetic-coded value""" - symbols = sorted(prob_table.keys()) - cumulative = {} +def calculate_symbol_probabilities(input_data: Union[str, List]) -> Dict[str, Decimal]: + """ + Calculate probability distribution for symbols in the input data. + + Args: + input_data: Input string or list to analyze for symbol frequencies + + Returns: + Dictionary mapping each symbol to its probability as a Decimal + + Raises: + ValueError: If input_data is empty + TypeError: If input_data is not string or list + + Examples: + >>> probs = calculate_symbol_probabilities("aab") + >>> round(float(probs['a']), 10) + 0.6666666667 + >>> round(float(probs['b']), 10) + 0.3333333333 + >>> len(probs) + 2 + + >>> calculate_symbol_probabilities("") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Input data cannot be empty + + >>> calculate_symbol_probabilities(123) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + TypeError: Input data must be string or list + """ + if not input_data: + raise ValueError("Input data cannot be empty") + if not isinstance(input_data, (str, list)): + raise TypeError("Input data must be string or list") + + symbol_frequencies = Counter(input_data) + total_symbols = len(input_data) + + probability_table = {} + for symbol, frequency in symbol_frequencies.items(): + probability_table[symbol] = Decimal(frequency) / Decimal(total_symbols) + + return probability_table + + +def create_cumulative_distribution(probability_table: Dict[str, Decimal]) -> Dict[str, Decimal]: + """ + Create cumulative distribution from probability table. + + Args: + probability_table: Dictionary mapping symbols to their probabilities + + Returns: + Dictionary mapping symbols to their cumulative probability positions + + Raises: + ValueError: If probability_table is empty + + Examples: + >>> probs = {'a': Decimal('0.6'), 'b': Decimal('0.4')} + >>> cumulative = create_cumulative_distribution(probs) + >>> float(cumulative['a']) + 0.0 + >>> float(cumulative['b']) + 0.6 + + >>> create_cumulative_distribution({}) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Probability table cannot be empty + """ + if not probability_table: + raise ValueError("Probability table cannot be empty") + + sorted_symbols = sorted(probability_table.keys()) + cumulative_distribution = {} cumulative_sum = Decimal('0.0') - for sym in symbols: - cumulative[sym] = cumulative_sum - cumulative_sum += prob_table[sym] - - result = [] - low, high = Decimal('0.0'), Decimal('1.0') - value = Decimal(str(encoded_value)) - - for _ in range(length): - range_ = high - low - for sym in symbols: - sym_low = low + range_ * cumulative[sym] - sym_high = sym_low + range_ * prob_table[sym] - if sym_low <= value < sym_high: - result.append(sym) - low, high = sym_low, sym_high + + for symbol in sorted_symbols: + cumulative_distribution[symbol] = cumulative_sum + cumulative_sum += probability_table[symbol] + + return cumulative_distribution + + +def encode_arithmetic_sequence(input_data: Union[str, List], + probability_table: Dict[str, Decimal]) -> Tuple[Decimal, int]: + """ + Encode input data using arithmetic coding algorithm. + + The algorithm works by maintaining an interval [low, high) that gets + progressively narrowed based on the probability of each symbol. + + Args: + input_data: Data to encode (string or list of symbols) + probability_table: Symbol probabilities as returned by calculate_symbol_probabilities + + Returns: + Tuple of (encoded_value, original_length) where encoded_value is the + arithmetic representation and original_length is needed for decoding + + Raises: + ValueError: If inputs are invalid + KeyError: If input contains symbols not in probability table + + Examples: + >>> probs = calculate_symbol_probabilities("aab") + >>> encoded_val, length = encode_arithmetic_sequence("aab", probs) + >>> length + 3 + >>> isinstance(encoded_val, Decimal) + True + + >>> encode_arithmetic_sequence("xyz", {'a': Decimal('1.0')}) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + KeyError: Symbol 'x' not found in probability table + """ + if not input_data: + raise ValueError("Input data cannot be empty") + if not probability_table: + raise ValueError("Probability table cannot be empty") + + cumulative_distribution = create_cumulative_distribution(probability_table) + + # Initialize interval bounds + low_bound = Decimal('0.0') + high_bound = Decimal('1.0') + + # Process each symbol in the input + for symbol in input_data: + if symbol not in probability_table: + raise KeyError(f"Symbol '{symbol}' not found in probability table") + + # Calculate current interval range + current_range = high_bound - low_bound + + # Update interval bounds based on symbol's probability range + symbol_cumulative_prob = cumulative_distribution[symbol] + symbol_probability = probability_table[symbol] + + new_high = low_bound + current_range * (symbol_cumulative_prob + symbol_probability) + new_low = low_bound + current_range * symbol_cumulative_prob + + low_bound = new_low + high_bound = new_high + + # Return midpoint of final interval and original length + encoded_value = (low_bound + high_bound) / 2 + return encoded_value, len(input_data) + + +def decode_arithmetic_sequence(encoded_value: Union[Decimal, float, str], + original_length: int, + probability_table: Dict[str, Decimal]) -> str: + """ + Decode an arithmetic-coded value back to original data. + + Args: + encoded_value: The encoded arithmetic value + original_length: Length of the original data sequence + probability_table: Symbol probabilities used during encoding + + Returns: + Decoded string matching the original input data + + Raises: + ValueError: If inputs are invalid + TypeError: If encoded_value cannot be converted to Decimal + + Examples: + >>> probs = calculate_symbol_probabilities("aab") + >>> encoded_val, length = encode_arithmetic_sequence("aab", probs) + >>> decoded = decode_arithmetic_sequence(encoded_val, length, probs) + >>> decoded + 'aab' + + >>> decode_arithmetic_sequence("invalid", 3, {}) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Probability table cannot be empty + """ + if original_length <= 0: + raise ValueError("Original length must be positive") + if not probability_table: + raise ValueError("Probability table cannot be empty") + + try: + value = Decimal(str(encoded_value)) + except (TypeError, ValueError) as e: + raise TypeError(f"Cannot convert encoded_value to Decimal: {e}") + + cumulative_distribution = create_cumulative_distribution(probability_table) + sorted_symbols = sorted(probability_table.keys()) + + decoded_sequence = [] + low_bound = Decimal('0.0') + high_bound = Decimal('1.0') + + # Decode each symbol position + for _ in range(original_length): + current_range = high_bound - low_bound + + # Find which symbol's interval contains the current value + for symbol in sorted_symbols: + symbol_low = low_bound + current_range * cumulative_distribution[symbol] + symbol_high = symbol_low + current_range * probability_table[symbol] + + if symbol_low <= value < symbol_high: + decoded_sequence.append(symbol) + # Update bounds to the symbol's interval + low_bound = symbol_low + high_bound = symbol_high break + + return ''.join(decoded_sequence) - return ''.join(result) -if __name__ == "__main__": - text = "this is text used for testing" - print(f"Original: {text}") +def compress_with_arithmetic_coding(input_text: str) -> Tuple[Decimal, int, Dict[str, Decimal]]: + """ + Complete arithmetic coding compression pipeline. + + Args: + input_text: Text string to compress - prob_table = build_probability_table(text) - encoded_value, length = arithmetic_encode(text, prob_table) - print(f"Encoded value: {encoded_value}") + Returns: + Tuple of (compressed_value, original_length, probability_table) + All three components are needed for decompression - decoded_text = arithmetic_decode(encoded_value, length, prob_table) - print(f"Decoded: {decoded_text}") + Raises: + ValueError: If input_text is empty + + Examples: + >>> compressed_val, length, probs = compress_with_arithmetic_coding("hello") + >>> length + 5 + >>> len(probs) # Number of unique characters + 4 + >>> isinstance(compressed_val, Decimal) + True + + >>> compress_with_arithmetic_coding("") # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Input text cannot be empty + """ + if not input_text: + raise ValueError("Input text cannot be empty") - # Show compression ratio - import sys - original_size = sys.getsizeof(text) - encoded_size = sys.getsizeof(str(encoded_value)) - print(f"Compression ratio: {original_size / encoded_size:.2f}") + probability_table = calculate_symbol_probabilities(input_text) + compressed_value, original_length = encode_arithmetic_sequence(input_text, probability_table) + + return compressed_value, original_length, probability_table + + +def decompress_arithmetic_coding(compressed_value: Decimal, + original_length: int, + probability_table: Dict[str, Decimal]) -> str: + """ + Complete arithmetic coding decompression pipeline. + + Args: + compressed_value: The arithmetic-coded value + original_length: Length of original uncompressed data + probability_table: Symbol probabilities from compression + + Returns: + Decompressed text string + + Examples: + >>> compressed_val, length, probs = compress_with_arithmetic_coding("test") + >>> decompressed = decompress_arithmetic_coding(compressed_val, length, probs) + >>> decompressed + 'test' + """ + return decode_arithmetic_sequence(compressed_value, original_length, probability_table) + + +if __name__ == "__main__": + import doctest + doctest.testmod(verbose=True) diff --git a/data_compression/CABAC.py b/data_compression/CABAC.py deleted file mode 100644 index ab92bfaa7097..000000000000 --- a/data_compression/CABAC.py +++ /dev/null @@ -1,98 +0,0 @@ -class CABAC: - def __init__(self): - self.low = 0 - self.high = (1 << 32) - 1 - self.context = [0.5] * 256 # probability model for 256 contexts - - def _update_context(self, ctx, bit): - # Simple adaptation: move probability toward observed bit - alpha = 0.05 - self.context[ctx] = (1 - alpha) * self.context[ctx] + alpha * bit - - def encode_bit(self, bit, ctx, output): - prob = self.context[ctx] - range_ = self.high - self.low + 1 - split = self.low + int(range_ * prob) - - if bit == 0: - self.high = split - else: - self.low = split + 1 - - while (self.high ^ self.low) < (1 << 24): - output.append((self.high >> 24) & 0xFF) - self.low = (self.low << 8) & 0xFFFFFFFF - self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF - - self._update_context(ctx, bit) - - def finish_encoding(self, output): - for _ in range(4): - output.append((self.low >> 24) & 0xFF) - self.low = (self.low << 8) & 0xFFFFFFFF - - def decode_bit(self, ctx, input_bits): - prob = self.context[ctx] - range_ = self.high - self.low + 1 - split = self.low + int(range_ * prob) - - if self.code <= split: - self.high = split - bit = 0 - else: - self.low = split + 1 - bit = 1 - - while (self.high ^ self.low) < (1 << 24): - self.code = ((self.code << 8) & 0xFFFFFFFF) | next(input_bits) - self.low = (self.low << 8) & 0xFFFFFFFF - self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF - - self._update_context(ctx, bit) - return bit - - def start_decoding(self, encoded_bytes): - self.low = 0 - self.high = (1 << 32) - 1 - self.code = 0 - input_bits = iter(encoded_bytes) - for _ in range(4): - self.code = (self.code << 8) | next(input_bits) - return input_bits - - -def string_to_bits(s): - return [(byte >> i) & 1 for byte in s.encode('utf-8') for i in range(7, -1, -1)] - -def bits_to_string(bits): - b = bytearray() - for i in range(0, len(bits), 8): - byte = 0 - for bit in bits[i:i+8]: - byte = (byte << 1) | bit - b.append(byte) - return b.decode('utf-8') - -def main(text: str): - encoder = CABAC() - output_bytes = [] - bits = string_to_bits(text) - - for i, bit in enumerate(bits): - ctx = i % 256 # simple positional context - encoder.encode_bit(bit, ctx, output_bytes) - encoder.finish_encoding(output_bytes) - - # Decode - decoder = CABAC() - bitstream = decoder.start_decoding(iter(output_bytes)) - decoded_bits = [decoder.decode_bit(i % 256, bitstream) for i in range(len(bits))] - decoded_text = bits_to_string(decoded_bits) - - print("Original:", text) - print("Decoded :", decoded_text) - print("Compressed size (bytes):", len(output_bytes)) - -if __name__ == "__main__": - # Example usage - main("Hello CABAC!") diff --git a/data_compression/context_adaptive_binary_arithmetic_coder.py b/data_compression/context_adaptive_binary_arithmetic_coder.py new file mode 100644 index 000000000000..2feb45c8d07e --- /dev/null +++ b/data_compression/context_adaptive_binary_arithmetic_coder.py @@ -0,0 +1,460 @@ +""" +Context-Adaptive Binary Arithmetic Coding (CABAC) implementation. + +CABAC is an entropy encoding method used in video compression standards like H.264/AVC +and H.265/HEVC. It combines arithmetic coding with adaptive context modeling to achieve +high compression efficiency. + +Algorithm references: +https://en.wikipedia.org/wiki/Context-adaptive_binary_arithmetic_coding +https://en.wikipedia.org/wiki/Arithmetic_coding + +Video compression standards: +https://en.wikipedia.org/wiki/Advanced_Video_Coding + +Requirements: None (uses only Python standard library) +""" + +from typing import List, Iterator, Tuple +import sys + + +class ContextAdaptiveBinaryArithmeticCoder: + """ + Context-Adaptive Binary Arithmetic Coder (CABAC) implementation. + + This class implements both encoding and decoding functionality for CABAC, + which uses adaptive probability models based on context to achieve efficient + binary arithmetic coding. + """ + + def __init__(self, num_contexts: int = 256): + """ + Initialize CABAC coder with default state. + + Args: + num_contexts: Number of context models to maintain + + Raises: + ValueError: If num_contexts is not positive + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder() + >>> coder.num_contexts + 256 + >>> len(coder.context_probabilities) + 256 + + >>> ContextAdaptiveBinaryArithmeticCoder(0) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Number of contexts must be positive + """ + if num_contexts <= 0: + raise ValueError("Number of contexts must be positive") + + self.num_contexts = num_contexts + self.reset_coder_state() + + def reset_coder_state(self) -> None: + """ + Reset the coder to initial state. + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder() + >>> coder.low_bound = 100 + >>> coder.reset_coder_state() + >>> coder.low_bound + 0 + """ + self.low_bound = 0 + self.high_bound = (1 << 32) - 1 + self.context_probabilities = [0.5] * self.num_contexts + self.code_value = 0 + + def update_context_probability(self, context_index: int, observed_bit: int, + learning_rate: float = 0.05) -> None: + """ + Update context probability based on observed bit value. + + Uses exponential moving average to adapt probability toward observed data. + + Args: + context_index: Index of context to update + observed_bit: The bit value that was observed (0 or 1) + learning_rate: Adaptation speed (0 < learning_rate < 1) + + Raises: + ValueError: If parameters are out of valid ranges + IndexError: If context_index is invalid + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) + >>> coder.context_probabilities[0] + 0.5 + >>> coder.update_context_probability(0, 1) + >>> coder.context_probabilities[0] > 0.5 + True + + >>> coder.update_context_probability(-1, 1) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + IndexError: Context index out of range + + >>> coder.update_context_probability(0, 2) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Observed bit must be 0 or 1 + """ + if not 0 <= context_index < self.num_contexts: + raise IndexError("Context index out of range") + if observed_bit not in (0, 1): + raise ValueError("Observed bit must be 0 or 1") + if not 0 < learning_rate < 1: + raise ValueError("Learning rate must be between 0 and 1") + + current_prob = self.context_probabilities[context_index] + self.context_probabilities[context_index] = ( + (1 - learning_rate) * current_prob + learning_rate * observed_bit + ) + + def encode_binary_symbol(self, bit_value: int, context_index: int, + output_buffer: List[int]) -> None: + """ + Encode a single binary symbol using the specified context. + + Args: + bit_value: Binary value to encode (0 or 1) + context_index: Context index for probability model + output_buffer: List to append output bytes to + + Raises: + ValueError: If bit_value is not 0 or 1 + IndexError: If context_index is invalid + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) + >>> output = [] + >>> coder.encode_binary_symbol(1, 0, output) + >>> isinstance(output, list) + True + + >>> coder.encode_binary_symbol(2, 0, output) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Bit value must be 0 or 1 + """ + if bit_value not in (0, 1): + raise ValueError("Bit value must be 0 or 1") + if not 0 <= context_index < self.num_contexts: + raise IndexError("Context index out of range") + + probability_zero = self.context_probabilities[context_index] + current_range = self.high_bound - self.low_bound + 1 + split_point = self.low_bound + int(current_range * probability_zero) + + if bit_value == 0: + self.high_bound = split_point + else: + self.low_bound = split_point + 1 + + # Renormalization: output bytes when range becomes too small + while (self.high_bound ^ self.low_bound) < (1 << 24): + output_buffer.append((self.high_bound >> 24) & 0xFF) + self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF + self.high_bound = ((self.high_bound << 8) & 0xFFFFFFFF) | 0xFF + + self.update_context_probability(context_index, bit_value) + + def finalize_encoding(self, output_buffer: List[int]) -> None: + """ + Finalize encoding by flushing remaining bits. + + Args: + output_buffer: List to append final output bytes to + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder() + >>> output = [] + >>> coder.finalize_encoding(output) + >>> len(output) + 4 + """ + # Output remaining bits in low_bound + for _ in range(4): + output_buffer.append((self.low_bound >> 24) & 0xFF) + self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF + + def initialize_decoding(self, encoded_bytes: Iterator[int]) -> Iterator[int]: + """ + Initialize decoder state from encoded byte stream. + + Args: + encoded_bytes: Iterator over encoded bytes + + Returns: + Iterator over remaining encoded bytes + + Raises: + StopIteration: If encoded_bytes has fewer than 4 bytes + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder() + >>> data = iter([1, 2, 3, 4, 5, 6]) + >>> remaining = coder.initialize_decoding(data) + >>> list(remaining) + [5, 6] + """ + self.reset_coder_state() + + # Initialize code value from first 4 bytes + for _ in range(4): + try: + next_byte = next(encoded_bytes) + self.code_value = (self.code_value << 8) | next_byte + except StopIteration: + raise StopIteration("Not enough bytes to initialize decoder") + + return encoded_bytes + + def decode_binary_symbol(self, context_index: int, + input_stream: Iterator[int]) -> int: + """ + Decode a single binary symbol using the specified context. + + Args: + context_index: Context index for probability model + input_stream: Iterator over input bytes + + Returns: + Decoded binary value (0 or 1) + + Raises: + IndexError: If context_index is invalid + StopIteration: If input_stream is exhausted during renormalization + + Examples: + >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) + >>> # This is a complex test requiring full encode/decode cycle + >>> output = [] + >>> coder.encode_binary_symbol(1, 0, output) + >>> coder.finalize_encoding(output) + >>> coder.reset_coder_state() + >>> input_iter = coder.initialize_decoding(iter(output)) + >>> decoded = coder.decode_binary_symbol(0, input_iter) + >>> decoded in (0, 1) + True + """ + if not 0 <= context_index < self.num_contexts: + raise IndexError("Context index out of range") + + probability_zero = self.context_probabilities[context_index] + current_range = self.high_bound - self.low_bound + 1 + split_point = self.low_bound + int(current_range * probability_zero) + + if self.code_value <= split_point: + self.high_bound = split_point + decoded_bit = 0 + else: + self.low_bound = split_point + 1 + decoded_bit = 1 + + # Renormalization: read new bytes when range becomes too small + while (self.high_bound ^ self.low_bound) < (1 << 24): + try: + next_byte = next(input_stream) + self.code_value = ((self.code_value << 8) & 0xFFFFFFFF) | next_byte + except StopIteration: + # Handle end of stream gracefully + self.code_value = (self.code_value << 8) & 0xFFFFFFFF + + self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF + self.high_bound = ((self.high_bound << 8) & 0xFFFFFFFF) | 0xFF + + self.update_context_probability(context_index, decoded_bit) + return decoded_bit + + +def convert_string_to_bit_sequence(input_string: str) -> List[int]: + """ + Convert string to sequence of bits using UTF-8 encoding. + + Args: + input_string: String to convert + + Returns: + List of bits (0s and 1s) representing the string + + Raises: + UnicodeEncodeError: If string cannot be UTF-8 encoded + + Examples: + >>> bits = convert_string_to_bit_sequence("A") + >>> len(bits) + 8 + >>> all(bit in (0, 1) for bit in bits) + True + + >>> convert_string_to_bit_sequence("") + [] + """ + if not input_string: + return [] + + bit_sequence = [] + utf8_bytes = input_string.encode('utf-8') + + for byte_value in utf8_bytes: + # Convert each byte to 8 bits (MSB first) + for bit_position in range(7, -1, -1): + bit_sequence.append((byte_value >> bit_position) & 1) + + return bit_sequence + + +def convert_bit_sequence_to_string(bit_sequence: List[int]) -> str: + """ + Convert sequence of bits back to string using UTF-8 decoding. + + Args: + bit_sequence: List of bits (0s and 1s) + + Returns: + Decoded UTF-8 string + + Raises: + ValueError: If bit_sequence length is not multiple of 8 + UnicodeDecodeError: If resulting bytes are not valid UTF-8 + + Examples: + >>> bits = convert_string_to_bit_sequence("Hello") + >>> reconstructed = convert_bit_sequence_to_string(bits) + >>> reconstructed + 'Hello' + + >>> convert_bit_sequence_to_string([1, 0, 1]) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Bit sequence length must be multiple of 8 + """ + if len(bit_sequence) % 8 != 0: + raise ValueError("Bit sequence length must be multiple of 8") + + if not bit_sequence: + return "" + + byte_array = bytearray() + + # Convert every 8 bits to a byte + for byte_start in range(0, len(bit_sequence), 8): + byte_value = 0 + byte_bits = bit_sequence[byte_start:byte_start + 8] + + for bit in byte_bits: + if bit not in (0, 1): + raise ValueError(f"Invalid bit value: {bit}") + byte_value = (byte_value << 1) | bit + + byte_array.append(byte_value) + + return byte_array.decode('utf-8') + + +def compress_string_with_cabac(input_text: str, num_contexts: int = 256) -> Tuple[List[int], int]: + """ + Compress a string using CABAC algorithm. + + Args: + input_text: Text string to compress + num_contexts: Number of context models to use + + Returns: + Tuple of (compressed_bytes, original_bit_length) + + Raises: + ValueError: If num_contexts is not positive + + Examples: + >>> compressed, orig_len = compress_string_with_cabac("test") + >>> len(compressed) > 0 + True + >>> orig_len > 0 + True + >>> isinstance(compressed, list) + True + + >>> compress_string_with_cabac("", 0) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ValueError: Number of contexts must be positive + """ + if num_contexts <= 0: + raise ValueError("Number of contexts must be positive") + + if not input_text: + return [], 0 + + # Convert string to bit sequence + bit_sequence = convert_string_to_bit_sequence(input_text) + + # Initialize encoder + encoder = ContextAdaptiveBinaryArithmeticCoder(num_contexts) + compressed_bytes = [] + + # Encode each bit using positional context + for bit_position, bit_value in enumerate(bit_sequence): + context_index = bit_position % num_contexts + encoder.encode_binary_symbol(bit_value, context_index, compressed_bytes) + + # Finalize encoding + encoder.finalize_encoding(compressed_bytes) + + return compressed_bytes, len(bit_sequence) + + +def decompress_string_with_cabac(compressed_bytes: List[int], original_bit_length: int, + num_contexts: int = 256) -> str: + """ + Decompress a CABAC-compressed byte sequence back to original string. + + Args: + compressed_bytes: List of compressed bytes + original_bit_length: Length of original bit sequence + num_contexts: Number of context models used during compression + + Returns: + Decompressed string + + Raises: + ValueError: If parameters are invalid + + Examples: + >>> compressed, orig_len = compress_string_with_cabac("hello") + >>> decompressed = decompress_string_with_cabac(compressed, orig_len) + >>> decompressed + 'hello' + + >>> decompress_string_with_cabac([], 8) + Traceback (most recent call last): + StopIteration: Not enough bytes to initialize decoder + """ + if num_contexts <= 0: + raise ValueError("Number of contexts must be positive") + if original_bit_length < 0: + raise ValueError("Original bit length must be non-negative") + + if original_bit_length == 0: + return "" + + # Initialize decoder + decoder = ContextAdaptiveBinaryArithmeticCoder(num_contexts) + input_stream = decoder.initialize_decoding(iter(compressed_bytes)) + + # Decode bits using same context pattern as encoding + decoded_bits = [] + for bit_position in range(original_bit_length): + context_index = bit_position % num_contexts + decoded_bit = decoder.decode_binary_symbol(context_index, input_stream) + decoded_bits.append(decoded_bit) + + # Convert bits back to string + return convert_bit_sequence_to_string(decoded_bits) + + +if __name__ == "__main__": + import doctest + doctest.testmod(verbose=True) From cd90f94b4bba0d1472aeb62c5f9a60a7c90d8cfc Mon Sep 17 00:00:00 2001 From: Marioman2023 Date: Sun, 10 Aug 2025 13:14:06 +1200 Subject: [PATCH 3/6] cabac is a super-set of ac so I removed it. I removed cabac because I can only puch 1 file at a time. --- data_compression/Arithmetic_Coding.py | 301 ------------ ...ontext_adaptive_binary_arithmetic_coder.py | 460 ------------------ 2 files changed, 761 deletions(-) delete mode 100644 data_compression/Arithmetic_Coding.py delete mode 100644 data_compression/context_adaptive_binary_arithmetic_coder.py diff --git a/data_compression/Arithmetic_Coding.py b/data_compression/Arithmetic_Coding.py deleted file mode 100644 index 490ebf86259c..000000000000 --- a/data_compression/Arithmetic_Coding.py +++ /dev/null @@ -1,301 +0,0 @@ -""" -Arithmetic coding compression algorithm implementation. - -Arithmetic coding is a form of entropy encoding used in lossless data compression. -It encodes the entire message into a single number, representing a fraction between 0 and 1. - -Algorithm reference: -https://en.wikipedia.org/wiki/Arithmetic_coding - -Data compression techniques: -https://en.wikipedia.org/wiki/Data_compression - -Requirements: None (uses only Python standard library) -""" - -from collections import Counter -from decimal import Decimal, getcontext -from typing import Dict, Tuple, List, Union - -# Set high precision for decimal calculations -getcontext().prec = 50 - - -def calculate_symbol_probabilities(input_data: Union[str, List]) -> Dict[str, Decimal]: - """ - Calculate probability distribution for symbols in the input data. - - Args: - input_data: Input string or list to analyze for symbol frequencies - - Returns: - Dictionary mapping each symbol to its probability as a Decimal - - Raises: - ValueError: If input_data is empty - TypeError: If input_data is not string or list - - Examples: - >>> probs = calculate_symbol_probabilities("aab") - >>> round(float(probs['a']), 10) - 0.6666666667 - >>> round(float(probs['b']), 10) - 0.3333333333 - >>> len(probs) - 2 - - >>> calculate_symbol_probabilities("") # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Input data cannot be empty - - >>> calculate_symbol_probabilities(123) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - TypeError: Input data must be string or list - """ - if not input_data: - raise ValueError("Input data cannot be empty") - if not isinstance(input_data, (str, list)): - raise TypeError("Input data must be string or list") - - symbol_frequencies = Counter(input_data) - total_symbols = len(input_data) - - probability_table = {} - for symbol, frequency in symbol_frequencies.items(): - probability_table[symbol] = Decimal(frequency) / Decimal(total_symbols) - - return probability_table - - -def create_cumulative_distribution(probability_table: Dict[str, Decimal]) -> Dict[str, Decimal]: - """ - Create cumulative distribution from probability table. - - Args: - probability_table: Dictionary mapping symbols to their probabilities - - Returns: - Dictionary mapping symbols to their cumulative probability positions - - Raises: - ValueError: If probability_table is empty - - Examples: - >>> probs = {'a': Decimal('0.6'), 'b': Decimal('0.4')} - >>> cumulative = create_cumulative_distribution(probs) - >>> float(cumulative['a']) - 0.0 - >>> float(cumulative['b']) - 0.6 - - >>> create_cumulative_distribution({}) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Probability table cannot be empty - """ - if not probability_table: - raise ValueError("Probability table cannot be empty") - - sorted_symbols = sorted(probability_table.keys()) - cumulative_distribution = {} - cumulative_sum = Decimal('0.0') - - for symbol in sorted_symbols: - cumulative_distribution[symbol] = cumulative_sum - cumulative_sum += probability_table[symbol] - - return cumulative_distribution - - -def encode_arithmetic_sequence(input_data: Union[str, List], - probability_table: Dict[str, Decimal]) -> Tuple[Decimal, int]: - """ - Encode input data using arithmetic coding algorithm. - - The algorithm works by maintaining an interval [low, high) that gets - progressively narrowed based on the probability of each symbol. - - Args: - input_data: Data to encode (string or list of symbols) - probability_table: Symbol probabilities as returned by calculate_symbol_probabilities - - Returns: - Tuple of (encoded_value, original_length) where encoded_value is the - arithmetic representation and original_length is needed for decoding - - Raises: - ValueError: If inputs are invalid - KeyError: If input contains symbols not in probability table - - Examples: - >>> probs = calculate_symbol_probabilities("aab") - >>> encoded_val, length = encode_arithmetic_sequence("aab", probs) - >>> length - 3 - >>> isinstance(encoded_val, Decimal) - True - - >>> encode_arithmetic_sequence("xyz", {'a': Decimal('1.0')}) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - KeyError: Symbol 'x' not found in probability table - """ - if not input_data: - raise ValueError("Input data cannot be empty") - if not probability_table: - raise ValueError("Probability table cannot be empty") - - cumulative_distribution = create_cumulative_distribution(probability_table) - - # Initialize interval bounds - low_bound = Decimal('0.0') - high_bound = Decimal('1.0') - - # Process each symbol in the input - for symbol in input_data: - if symbol not in probability_table: - raise KeyError(f"Symbol '{symbol}' not found in probability table") - - # Calculate current interval range - current_range = high_bound - low_bound - - # Update interval bounds based on symbol's probability range - symbol_cumulative_prob = cumulative_distribution[symbol] - symbol_probability = probability_table[symbol] - - new_high = low_bound + current_range * (symbol_cumulative_prob + symbol_probability) - new_low = low_bound + current_range * symbol_cumulative_prob - - low_bound = new_low - high_bound = new_high - - # Return midpoint of final interval and original length - encoded_value = (low_bound + high_bound) / 2 - return encoded_value, len(input_data) - - -def decode_arithmetic_sequence(encoded_value: Union[Decimal, float, str], - original_length: int, - probability_table: Dict[str, Decimal]) -> str: - """ - Decode an arithmetic-coded value back to original data. - - Args: - encoded_value: The encoded arithmetic value - original_length: Length of the original data sequence - probability_table: Symbol probabilities used during encoding - - Returns: - Decoded string matching the original input data - - Raises: - ValueError: If inputs are invalid - TypeError: If encoded_value cannot be converted to Decimal - - Examples: - >>> probs = calculate_symbol_probabilities("aab") - >>> encoded_val, length = encode_arithmetic_sequence("aab", probs) - >>> decoded = decode_arithmetic_sequence(encoded_val, length, probs) - >>> decoded - 'aab' - - >>> decode_arithmetic_sequence("invalid", 3, {}) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Probability table cannot be empty - """ - if original_length <= 0: - raise ValueError("Original length must be positive") - if not probability_table: - raise ValueError("Probability table cannot be empty") - - try: - value = Decimal(str(encoded_value)) - except (TypeError, ValueError) as e: - raise TypeError(f"Cannot convert encoded_value to Decimal: {e}") - - cumulative_distribution = create_cumulative_distribution(probability_table) - sorted_symbols = sorted(probability_table.keys()) - - decoded_sequence = [] - low_bound = Decimal('0.0') - high_bound = Decimal('1.0') - - # Decode each symbol position - for _ in range(original_length): - current_range = high_bound - low_bound - - # Find which symbol's interval contains the current value - for symbol in sorted_symbols: - symbol_low = low_bound + current_range * cumulative_distribution[symbol] - symbol_high = symbol_low + current_range * probability_table[symbol] - - if symbol_low <= value < symbol_high: - decoded_sequence.append(symbol) - # Update bounds to the symbol's interval - low_bound = symbol_low - high_bound = symbol_high - break - - return ''.join(decoded_sequence) - - -def compress_with_arithmetic_coding(input_text: str) -> Tuple[Decimal, int, Dict[str, Decimal]]: - """ - Complete arithmetic coding compression pipeline. - - Args: - input_text: Text string to compress - - Returns: - Tuple of (compressed_value, original_length, probability_table) - All three components are needed for decompression - - Raises: - ValueError: If input_text is empty - - Examples: - >>> compressed_val, length, probs = compress_with_arithmetic_coding("hello") - >>> length - 5 - >>> len(probs) # Number of unique characters - 4 - >>> isinstance(compressed_val, Decimal) - True - - >>> compress_with_arithmetic_coding("") # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Input text cannot be empty - """ - if not input_text: - raise ValueError("Input text cannot be empty") - - probability_table = calculate_symbol_probabilities(input_text) - compressed_value, original_length = encode_arithmetic_sequence(input_text, probability_table) - - return compressed_value, original_length, probability_table - - -def decompress_arithmetic_coding(compressed_value: Decimal, - original_length: int, - probability_table: Dict[str, Decimal]) -> str: - """ - Complete arithmetic coding decompression pipeline. - - Args: - compressed_value: The arithmetic-coded value - original_length: Length of original uncompressed data - probability_table: Symbol probabilities from compression - - Returns: - Decompressed text string - - Examples: - >>> compressed_val, length, probs = compress_with_arithmetic_coding("test") - >>> decompressed = decompress_arithmetic_coding(compressed_val, length, probs) - >>> decompressed - 'test' - """ - return decode_arithmetic_sequence(compressed_value, original_length, probability_table) - - -if __name__ == "__main__": - import doctest - doctest.testmod(verbose=True) diff --git a/data_compression/context_adaptive_binary_arithmetic_coder.py b/data_compression/context_adaptive_binary_arithmetic_coder.py deleted file mode 100644 index 2feb45c8d07e..000000000000 --- a/data_compression/context_adaptive_binary_arithmetic_coder.py +++ /dev/null @@ -1,460 +0,0 @@ -""" -Context-Adaptive Binary Arithmetic Coding (CABAC) implementation. - -CABAC is an entropy encoding method used in video compression standards like H.264/AVC -and H.265/HEVC. It combines arithmetic coding with adaptive context modeling to achieve -high compression efficiency. - -Algorithm references: -https://en.wikipedia.org/wiki/Context-adaptive_binary_arithmetic_coding -https://en.wikipedia.org/wiki/Arithmetic_coding - -Video compression standards: -https://en.wikipedia.org/wiki/Advanced_Video_Coding - -Requirements: None (uses only Python standard library) -""" - -from typing import List, Iterator, Tuple -import sys - - -class ContextAdaptiveBinaryArithmeticCoder: - """ - Context-Adaptive Binary Arithmetic Coder (CABAC) implementation. - - This class implements both encoding and decoding functionality for CABAC, - which uses adaptive probability models based on context to achieve efficient - binary arithmetic coding. - """ - - def __init__(self, num_contexts: int = 256): - """ - Initialize CABAC coder with default state. - - Args: - num_contexts: Number of context models to maintain - - Raises: - ValueError: If num_contexts is not positive - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder() - >>> coder.num_contexts - 256 - >>> len(coder.context_probabilities) - 256 - - >>> ContextAdaptiveBinaryArithmeticCoder(0) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Number of contexts must be positive - """ - if num_contexts <= 0: - raise ValueError("Number of contexts must be positive") - - self.num_contexts = num_contexts - self.reset_coder_state() - - def reset_coder_state(self) -> None: - """ - Reset the coder to initial state. - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder() - >>> coder.low_bound = 100 - >>> coder.reset_coder_state() - >>> coder.low_bound - 0 - """ - self.low_bound = 0 - self.high_bound = (1 << 32) - 1 - self.context_probabilities = [0.5] * self.num_contexts - self.code_value = 0 - - def update_context_probability(self, context_index: int, observed_bit: int, - learning_rate: float = 0.05) -> None: - """ - Update context probability based on observed bit value. - - Uses exponential moving average to adapt probability toward observed data. - - Args: - context_index: Index of context to update - observed_bit: The bit value that was observed (0 or 1) - learning_rate: Adaptation speed (0 < learning_rate < 1) - - Raises: - ValueError: If parameters are out of valid ranges - IndexError: If context_index is invalid - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) - >>> coder.context_probabilities[0] - 0.5 - >>> coder.update_context_probability(0, 1) - >>> coder.context_probabilities[0] > 0.5 - True - - >>> coder.update_context_probability(-1, 1) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - IndexError: Context index out of range - - >>> coder.update_context_probability(0, 2) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Observed bit must be 0 or 1 - """ - if not 0 <= context_index < self.num_contexts: - raise IndexError("Context index out of range") - if observed_bit not in (0, 1): - raise ValueError("Observed bit must be 0 or 1") - if not 0 < learning_rate < 1: - raise ValueError("Learning rate must be between 0 and 1") - - current_prob = self.context_probabilities[context_index] - self.context_probabilities[context_index] = ( - (1 - learning_rate) * current_prob + learning_rate * observed_bit - ) - - def encode_binary_symbol(self, bit_value: int, context_index: int, - output_buffer: List[int]) -> None: - """ - Encode a single binary symbol using the specified context. - - Args: - bit_value: Binary value to encode (0 or 1) - context_index: Context index for probability model - output_buffer: List to append output bytes to - - Raises: - ValueError: If bit_value is not 0 or 1 - IndexError: If context_index is invalid - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) - >>> output = [] - >>> coder.encode_binary_symbol(1, 0, output) - >>> isinstance(output, list) - True - - >>> coder.encode_binary_symbol(2, 0, output) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Bit value must be 0 or 1 - """ - if bit_value not in (0, 1): - raise ValueError("Bit value must be 0 or 1") - if not 0 <= context_index < self.num_contexts: - raise IndexError("Context index out of range") - - probability_zero = self.context_probabilities[context_index] - current_range = self.high_bound - self.low_bound + 1 - split_point = self.low_bound + int(current_range * probability_zero) - - if bit_value == 0: - self.high_bound = split_point - else: - self.low_bound = split_point + 1 - - # Renormalization: output bytes when range becomes too small - while (self.high_bound ^ self.low_bound) < (1 << 24): - output_buffer.append((self.high_bound >> 24) & 0xFF) - self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF - self.high_bound = ((self.high_bound << 8) & 0xFFFFFFFF) | 0xFF - - self.update_context_probability(context_index, bit_value) - - def finalize_encoding(self, output_buffer: List[int]) -> None: - """ - Finalize encoding by flushing remaining bits. - - Args: - output_buffer: List to append final output bytes to - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder() - >>> output = [] - >>> coder.finalize_encoding(output) - >>> len(output) - 4 - """ - # Output remaining bits in low_bound - for _ in range(4): - output_buffer.append((self.low_bound >> 24) & 0xFF) - self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF - - def initialize_decoding(self, encoded_bytes: Iterator[int]) -> Iterator[int]: - """ - Initialize decoder state from encoded byte stream. - - Args: - encoded_bytes: Iterator over encoded bytes - - Returns: - Iterator over remaining encoded bytes - - Raises: - StopIteration: If encoded_bytes has fewer than 4 bytes - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder() - >>> data = iter([1, 2, 3, 4, 5, 6]) - >>> remaining = coder.initialize_decoding(data) - >>> list(remaining) - [5, 6] - """ - self.reset_coder_state() - - # Initialize code value from first 4 bytes - for _ in range(4): - try: - next_byte = next(encoded_bytes) - self.code_value = (self.code_value << 8) | next_byte - except StopIteration: - raise StopIteration("Not enough bytes to initialize decoder") - - return encoded_bytes - - def decode_binary_symbol(self, context_index: int, - input_stream: Iterator[int]) -> int: - """ - Decode a single binary symbol using the specified context. - - Args: - context_index: Context index for probability model - input_stream: Iterator over input bytes - - Returns: - Decoded binary value (0 or 1) - - Raises: - IndexError: If context_index is invalid - StopIteration: If input_stream is exhausted during renormalization - - Examples: - >>> coder = ContextAdaptiveBinaryArithmeticCoder(2) - >>> # This is a complex test requiring full encode/decode cycle - >>> output = [] - >>> coder.encode_binary_symbol(1, 0, output) - >>> coder.finalize_encoding(output) - >>> coder.reset_coder_state() - >>> input_iter = coder.initialize_decoding(iter(output)) - >>> decoded = coder.decode_binary_symbol(0, input_iter) - >>> decoded in (0, 1) - True - """ - if not 0 <= context_index < self.num_contexts: - raise IndexError("Context index out of range") - - probability_zero = self.context_probabilities[context_index] - current_range = self.high_bound - self.low_bound + 1 - split_point = self.low_bound + int(current_range * probability_zero) - - if self.code_value <= split_point: - self.high_bound = split_point - decoded_bit = 0 - else: - self.low_bound = split_point + 1 - decoded_bit = 1 - - # Renormalization: read new bytes when range becomes too small - while (self.high_bound ^ self.low_bound) < (1 << 24): - try: - next_byte = next(input_stream) - self.code_value = ((self.code_value << 8) & 0xFFFFFFFF) | next_byte - except StopIteration: - # Handle end of stream gracefully - self.code_value = (self.code_value << 8) & 0xFFFFFFFF - - self.low_bound = (self.low_bound << 8) & 0xFFFFFFFF - self.high_bound = ((self.high_bound << 8) & 0xFFFFFFFF) | 0xFF - - self.update_context_probability(context_index, decoded_bit) - return decoded_bit - - -def convert_string_to_bit_sequence(input_string: str) -> List[int]: - """ - Convert string to sequence of bits using UTF-8 encoding. - - Args: - input_string: String to convert - - Returns: - List of bits (0s and 1s) representing the string - - Raises: - UnicodeEncodeError: If string cannot be UTF-8 encoded - - Examples: - >>> bits = convert_string_to_bit_sequence("A") - >>> len(bits) - 8 - >>> all(bit in (0, 1) for bit in bits) - True - - >>> convert_string_to_bit_sequence("") - [] - """ - if not input_string: - return [] - - bit_sequence = [] - utf8_bytes = input_string.encode('utf-8') - - for byte_value in utf8_bytes: - # Convert each byte to 8 bits (MSB first) - for bit_position in range(7, -1, -1): - bit_sequence.append((byte_value >> bit_position) & 1) - - return bit_sequence - - -def convert_bit_sequence_to_string(bit_sequence: List[int]) -> str: - """ - Convert sequence of bits back to string using UTF-8 decoding. - - Args: - bit_sequence: List of bits (0s and 1s) - - Returns: - Decoded UTF-8 string - - Raises: - ValueError: If bit_sequence length is not multiple of 8 - UnicodeDecodeError: If resulting bytes are not valid UTF-8 - - Examples: - >>> bits = convert_string_to_bit_sequence("Hello") - >>> reconstructed = convert_bit_sequence_to_string(bits) - >>> reconstructed - 'Hello' - - >>> convert_bit_sequence_to_string([1, 0, 1]) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Bit sequence length must be multiple of 8 - """ - if len(bit_sequence) % 8 != 0: - raise ValueError("Bit sequence length must be multiple of 8") - - if not bit_sequence: - return "" - - byte_array = bytearray() - - # Convert every 8 bits to a byte - for byte_start in range(0, len(bit_sequence), 8): - byte_value = 0 - byte_bits = bit_sequence[byte_start:byte_start + 8] - - for bit in byte_bits: - if bit not in (0, 1): - raise ValueError(f"Invalid bit value: {bit}") - byte_value = (byte_value << 1) | bit - - byte_array.append(byte_value) - - return byte_array.decode('utf-8') - - -def compress_string_with_cabac(input_text: str, num_contexts: int = 256) -> Tuple[List[int], int]: - """ - Compress a string using CABAC algorithm. - - Args: - input_text: Text string to compress - num_contexts: Number of context models to use - - Returns: - Tuple of (compressed_bytes, original_bit_length) - - Raises: - ValueError: If num_contexts is not positive - - Examples: - >>> compressed, orig_len = compress_string_with_cabac("test") - >>> len(compressed) > 0 - True - >>> orig_len > 0 - True - >>> isinstance(compressed, list) - True - - >>> compress_string_with_cabac("", 0) # doctest: +IGNORE_EXCEPTION_DETAIL - Traceback (most recent call last): - ValueError: Number of contexts must be positive - """ - if num_contexts <= 0: - raise ValueError("Number of contexts must be positive") - - if not input_text: - return [], 0 - - # Convert string to bit sequence - bit_sequence = convert_string_to_bit_sequence(input_text) - - # Initialize encoder - encoder = ContextAdaptiveBinaryArithmeticCoder(num_contexts) - compressed_bytes = [] - - # Encode each bit using positional context - for bit_position, bit_value in enumerate(bit_sequence): - context_index = bit_position % num_contexts - encoder.encode_binary_symbol(bit_value, context_index, compressed_bytes) - - # Finalize encoding - encoder.finalize_encoding(compressed_bytes) - - return compressed_bytes, len(bit_sequence) - - -def decompress_string_with_cabac(compressed_bytes: List[int], original_bit_length: int, - num_contexts: int = 256) -> str: - """ - Decompress a CABAC-compressed byte sequence back to original string. - - Args: - compressed_bytes: List of compressed bytes - original_bit_length: Length of original bit sequence - num_contexts: Number of context models used during compression - - Returns: - Decompressed string - - Raises: - ValueError: If parameters are invalid - - Examples: - >>> compressed, orig_len = compress_string_with_cabac("hello") - >>> decompressed = decompress_string_with_cabac(compressed, orig_len) - >>> decompressed - 'hello' - - >>> decompress_string_with_cabac([], 8) - Traceback (most recent call last): - StopIteration: Not enough bytes to initialize decoder - """ - if num_contexts <= 0: - raise ValueError("Number of contexts must be positive") - if original_bit_length < 0: - raise ValueError("Original bit length must be non-negative") - - if original_bit_length == 0: - return "" - - # Initialize decoder - decoder = ContextAdaptiveBinaryArithmeticCoder(num_contexts) - input_stream = decoder.initialize_decoding(iter(compressed_bytes)) - - # Decode bits using same context pattern as encoding - decoded_bits = [] - for bit_position in range(original_bit_length): - context_index = bit_position % num_contexts - decoded_bit = decoder.decode_binary_symbol(context_index, input_stream) - decoded_bits.append(decoded_bit) - - # Convert bits back to string - return convert_bit_sequence_to_string(decoded_bits) - - -if __name__ == "__main__": - import doctest - doctest.testmod(verbose=True) From 54ac57406d0ecddc6ff2d33ad0b5d063b025262a Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 10 Aug 2025 01:17:20 +0000 Subject: [PATCH 4/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cellular_automata/von_neumann.py | 27 +++++++++++++++++---------- 1 file changed, 17 insertions(+), 10 deletions(-) diff --git a/cellular_automata/von_neumann.py b/cellular_automata/von_neumann.py index 585273274600..7bc983d5ee36 100644 --- a/cellular_automata/von_neumann.py +++ b/cellular_automata/von_neumann.py @@ -26,7 +26,7 @@ def create_random_grid( Args: rows: Number of grid rows - columns: Number of grid columns + columns: Number of grid columns alive_probability: Probability (0.0-1.0) of each cell being initially alive seed: Random seed for reproducibility @@ -115,23 +115,25 @@ def count_von_neumann_neighbors( down_neighbors = np.roll(alive_mask, 1, axis=0) left_neighbors = np.roll(alive_mask, -1, axis=1) right_neighbors = np.roll(alive_mask, 1, axis=1) - neighbor_counts = up_neighbors + down_neighbors + left_neighbors + right_neighbors + neighbor_counts = ( + up_neighbors + down_neighbors + left_neighbors + right_neighbors + ) else: # Manually count neighbors without wraparound for r in range(rows): for c in range(cols): count = 0 # Check up - if r > 0 and alive_mask[r-1, c]: + if r > 0 and alive_mask[r - 1, c]: count += 1 # Check down - if r < rows-1 and alive_mask[r+1, c]: + if r < rows - 1 and alive_mask[r + 1, c]: count += 1 # Check left - if c > 0 and alive_mask[r, c-1]: + if c > 0 and alive_mask[r, c - 1]: count += 1 # Check right - if c < cols-1 and alive_mask[r, c+1]: + if c < cols - 1 and alive_mask[r, c + 1]: count += 1 neighbor_counts[r, c] = count @@ -168,7 +170,7 @@ def apply_cellular_automaton_rules( Examples: >>> ages = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=np.uint8) >>> new_ages = apply_cellular_automaton_rules( - ... ages, birth_neighbor_counts={2}, + ... ages, birth_neighbor_counts={2}, ... survival_neighbor_counts={2, 3}, use_wraparound=False ... ) >>> bool(new_ages[0, 0] > 0) # corner should be born (2 neighbors: right and down) @@ -177,7 +179,7 @@ def apply_cellular_automaton_rules( >>> # Test aging of dead cells >>> dead_aging = np.array([[2, 0, 0]], dtype=np.uint8) # age 2, no survival >>> result = apply_cellular_automaton_rules( - ... dead_aging, birth_neighbor_counts=set(), + ... dead_aging, birth_neighbor_counts=set(), ... survival_neighbor_counts=set(), maximum_age=3 ... ) >>> bool(result[0, 0] == 3) # should age from 2 to 3 @@ -198,8 +200,12 @@ def apply_cellular_automaton_rules( ) # Determine which cells are born or survive - birth_mask = (~alive_cells_mask) & np.isin(neighbor_counts, list(birth_neighbor_counts)) - survival_mask = alive_cells_mask & np.isin(neighbor_counts, list(survival_neighbor_counts)) + birth_mask = (~alive_cells_mask) & np.isin( + neighbor_counts, list(birth_neighbor_counts) + ) + survival_mask = alive_cells_mask & np.isin( + neighbor_counts, list(survival_neighbor_counts) + ) new_ages = current_ages.copy() @@ -299,4 +305,5 @@ def simulate_von_neumann_cellular_automaton( if __name__ == "__main__": import doctest + doctest.testmod(verbose=True) From 46ceaf613f9ec06dc2f3cae7a7fba43cd98db3bd Mon Sep 17 00:00:00 2001 From: Marioman2023 Date: Sun, 10 Aug 2025 13:39:05 +1200 Subject: [PATCH 5/6] Fixed ruff errors --- cellular_automata/von_neumann.py | 60 ++++++++++++++++++++------------ 1 file changed, 37 insertions(+), 23 deletions(-) diff --git a/cellular_automata/von_neumann.py b/cellular_automata/von_neumann.py index 585273274600..5f8957418a46 100644 --- a/cellular_automata/von_neumann.py +++ b/cellular_automata/von_neumann.py @@ -15,18 +15,17 @@ """ import numpy as np -from typing import Set, Tuple, Dict, Optional def create_random_grid( - rows: int, columns: int, alive_probability: float, seed: Optional[int] = None + rows: int, columns: int, alive_probability: float, seed: int | None = None ) -> np.ndarray: """ Create initial grid with randomly distributed alive cells. Args: rows: Number of grid rows - columns: Number of grid columns + columns: Number of grid columns alive_probability: Probability (0.0-1.0) of each cell being initially alive seed: Random seed for reproducibility @@ -60,7 +59,9 @@ def create_random_grid( raise ValueError("alive_probability must be between 0.0 and 1.0") rng = np.random.default_rng(seed) - alive_cells = (rng.random((rows, columns)) < alive_probability).astype(np.uint8) + alive_cells = (rng.random((rows, columns)) < alive_probability).astype( + np.uint8 + ) return alive_cells @@ -88,11 +89,13 @@ def count_von_neumann_neighbors( >>> counts = count_von_neumann_neighbors(mask, use_wraparound=False) >>> int(counts[1, 1]) # center cell has 0 neighbors (all adjacent are 0) 0 - >>> int(counts[0, 1]) # top middle has 3 neighbors (down, left, right are 1) + >>> int(counts[0, 1]) # top middle has 3 neighbors (down, left, right) 3 >>> mask_simple = np.array([[1, 1], [1, 0]], dtype=np.uint8) - >>> counts_simple = count_von_neumann_neighbors(mask_simple, use_wraparound=False) + >>> counts_simple = count_von_neumann_neighbors( + ... mask_simple, use_wraparound=False + ... ) >>> int(counts_simple[0, 0]) # top-left has 2 neighbors (right and down) 2 @@ -115,23 +118,25 @@ def count_von_neumann_neighbors( down_neighbors = np.roll(alive_mask, 1, axis=0) left_neighbors = np.roll(alive_mask, -1, axis=1) right_neighbors = np.roll(alive_mask, 1, axis=1) - neighbor_counts = up_neighbors + down_neighbors + left_neighbors + right_neighbors + neighbor_counts = ( + up_neighbors + down_neighbors + left_neighbors + right_neighbors + ) else: # Manually count neighbors without wraparound for r in range(rows): for c in range(cols): count = 0 # Check up - if r > 0 and alive_mask[r-1, c]: + if r > 0 and alive_mask[r - 1, c]: count += 1 # Check down - if r < rows-1 and alive_mask[r+1, c]: + if r < rows - 1 and alive_mask[r + 1, c]: count += 1 # Check left - if c > 0 and alive_mask[r, c-1]: + if c > 0 and alive_mask[r, c - 1]: count += 1 # Check right - if c < cols-1 and alive_mask[r, c+1]: + if c < cols - 1 and alive_mask[r, c + 1]: count += 1 neighbor_counts[r, c] = count @@ -140,8 +145,8 @@ def count_von_neumann_neighbors( def apply_cellular_automaton_rules( current_ages: np.ndarray, - birth_neighbor_counts: Set[int], - survival_neighbor_counts: Set[int], + birth_neighbor_counts: set[int], + survival_neighbor_counts: set[int], maximum_age: int = 5, use_wraparound: bool = True, ) -> np.ndarray: @@ -168,22 +173,25 @@ def apply_cellular_automaton_rules( Examples: >>> ages = np.array([[0, 1, 0], [1, 1, 1], [0, 1, 0]], dtype=np.uint8) >>> new_ages = apply_cellular_automaton_rules( - ... ages, birth_neighbor_counts={2}, + ... ages, birth_neighbor_counts={2}, ... survival_neighbor_counts={2, 3}, use_wraparound=False ... ) - >>> bool(new_ages[0, 0] > 0) # corner should be born (2 neighbors: right and down) + >>> # corner should be born (2 neighbors: right and down) + >>> bool(new_ages[0, 0] > 0) True >>> # Test aging of dead cells >>> dead_aging = np.array([[2, 0, 0]], dtype=np.uint8) # age 2, no survival >>> result = apply_cellular_automaton_rules( - ... dead_aging, birth_neighbor_counts=set(), + ... dead_aging, birth_neighbor_counts=set(), ... survival_neighbor_counts=set(), maximum_age=3 ... ) >>> bool(result[0, 0] == 3) # should age from 2 to 3 True - >>> apply_cellular_automaton_rules(np.array([1, 2]), {1}, {1}) # doctest: +IGNORE_EXCEPTION_DETAIL + >>> apply_cellular_automaton_rules( + ... np.array([1, 2]), {1}, {1} + ... ) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ValueError: current_ages must be a 2D array """ @@ -198,8 +206,12 @@ def apply_cellular_automaton_rules( ) # Determine which cells are born or survive - birth_mask = (~alive_cells_mask) & np.isin(neighbor_counts, list(birth_neighbor_counts)) - survival_mask = alive_cells_mask & np.isin(neighbor_counts, list(survival_neighbor_counts)) + birth_mask = (~alive_cells_mask) & np.isin( + neighbor_counts, list(birth_neighbor_counts) + ) + survival_mask = alive_cells_mask & np.isin( + neighbor_counts, list(survival_neighbor_counts) + ) new_ages = current_ages.copy() @@ -223,11 +235,11 @@ def simulate_von_neumann_cellular_automaton( grid_rows: int = 20, grid_columns: int = 40, initial_alive_probability: float = 0.25, - birth_rules: Set[int] = None, - survival_rules: Set[int] = None, + birth_rules: set[int] | None = None, + survival_rules: set[int] | None = None, maximum_cell_age: int = 5, generations: int = 100, - random_seed: Optional[int] = None, + random_seed: int | None = None, use_wraparound_edges: bool = True, ) -> list[np.ndarray]: """ @@ -262,7 +274,9 @@ def simulate_von_neumann_cellular_automaton( >>> all(grid.shape == (5, 5) for grid in result) True - >>> simulate_von_neumann_cellular_automaton(generations=0) # doctest: +IGNORE_EXCEPTION_DETAIL + >>> simulate_von_neumann_cellular_automaton( + ... generations=0 + ... ) # doctest: +IGNORE_EXCEPTION_DETAIL Traceback (most recent call last): ValueError: generations must be positive """ From 00e52a7d7de856ad7df46133847eaafad794c319 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sun, 10 Aug 2025 01:39:45 +0000 Subject: [PATCH 6/6] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- cellular_automata/von_neumann.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cellular_automata/von_neumann.py b/cellular_automata/von_neumann.py index ad6ea5bf94ce..c277195e8c34 100644 --- a/cellular_automata/von_neumann.py +++ b/cellular_automata/von_neumann.py @@ -59,9 +59,7 @@ def create_random_grid( raise ValueError("alive_probability must be between 0.0 and 1.0") rng = np.random.default_rng(seed) - alive_cells = (rng.random((rows, columns)) < alive_probability).astype( - np.uint8 - ) + alive_cells = (rng.random((rows, columns)) < alive_probability).astype(np.uint8) return alive_cells