Skip to content

Commit 628d4cf

Browse files
committed
Implements a Von Neumann cellular automaton as well as two new entropy coding data compression methods.
1 parent 7a0fee4 commit 628d4cf

File tree

3 files changed

+261
-0
lines changed

3 files changed

+261
-0
lines changed

cellular_automata/Von_Neumann_CA.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
"""
2+
Von Neumann CA with multi-state fading "heatmap" effect in the terminal.
3+
4+
Requirements: numpy
5+
6+
Rules:
7+
- Uses Von Neumann neighborhood (4 neighbors).
8+
- Alive cells follow BIRTH / SURVIVE rules.
9+
- Dead cells fade out gradually through colored stages.
10+
"""
11+
12+
import numpy as np
13+
import os
14+
import time
15+
16+
# ---------- Configuration ----------
17+
GRID_SIZE = (20, 40) # (rows, cols)
18+
PROB_ALIVE = 0.25 # initial alive probability
19+
WRAP = True # wrap edges (toroidal)
20+
BIRTH = {3} # birth neighbor counts
21+
SURVIVE = {1, 2} # survival neighbor counts
22+
SLEEP_TIME = 0.1 # seconds between frames
23+
MAX_AGE = 5 # how many steps before a cell fully disappears (0 = dead)
24+
COLORS = { # Characters & colors for different ages
25+
0: " ", # dead
26+
1: "\033[92m█\033[0m", # bright green (newborn)
27+
2: "\033[93m█\033[0m", # yellow
28+
3: "\033[91m█\033[0m", # red
29+
4: "\033[31m░\033[0m", # dim red fading
30+
5: "\033[90m·\033[0m", # grey dust
31+
}
32+
# -----------------------------------
33+
34+
def make_initial_grid(shape, prob_alive, seed=None):
35+
rng = np.random.default_rng(seed)
36+
alive = (rng.random(shape) < prob_alive).astype(np.uint8)
37+
return alive.astype(np.uint8) # age 1 for alive, 0 for dead
38+
39+
def count_von_neumann_neighbors(alive_mask, wrap=True):
40+
"""Count Von Neumann neighbors (4 directions)"""
41+
up = np.roll(alive_mask, -1, axis=0)
42+
down = np.roll(alive_mask, 1, axis=0)
43+
left = np.roll(alive_mask, -1, axis=1)
44+
right = np.roll(alive_mask, 1, axis=1)
45+
counts = up + down + left + right
46+
47+
if not wrap:
48+
counts[ 0, :] -= alive_mask[-1, :]
49+
counts[-1, :] -= alive_mask[ 0, :]
50+
counts[ :, 0] -= alive_mask[ :, -1]
51+
counts[ :, -1] -= alive_mask[ :, 0]
52+
counts = np.clip(counts, 0, 4)
53+
54+
return counts
55+
56+
def step(age_grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE):
57+
alive_mask = age_grid > 0
58+
neighbor_counts = count_von_neumann_neighbors(alive_mask.astype(np.uint8), wrap=wrap)
59+
60+
born_mask = (~alive_mask) & np.isin(neighbor_counts, list(birth))
61+
survive_mask = alive_mask & np.isin(neighbor_counts, list(survive))
62+
63+
new_age_grid = age_grid.copy()
64+
65+
# Alive cells that survive get age reset to 1 if born, else age increment
66+
new_age_grid[born_mask] = 1
67+
new_age_grid[survive_mask & alive_mask] = 1 # reset alive age for fresh color
68+
69+
# Fade out dead cells
70+
fade_mask = (~born_mask) & (~survive_mask)
71+
new_age_grid[fade_mask & (new_age_grid > 0)] += 1
72+
new_age_grid[new_age_grid > max_age] = 0 # fully dead
73+
74+
return new_age_grid
75+
76+
def display(age_grid):
77+
"""Render grid with colors for each age stage"""
78+
os.system("cls" if os.name == "nt" else "clear")
79+
for row in age_grid:
80+
print("".join(COLORS.get(age, COLORS[MAX_AGE]) for age in row))
81+
82+
def main():
83+
grid = make_initial_grid(GRID_SIZE, prob_alive=PROB_ALIVE)
84+
85+
try:
86+
while True:
87+
display(grid)
88+
grid = step(grid, birth=BIRTH, survive=SURVIVE, wrap=WRAP, max_age=MAX_AGE)
89+
time.sleep(SLEEP_TIME)
90+
except KeyboardInterrupt:
91+
print("\nStopped.")
92+
93+
if __name__ == "__main__":
94+
main()

data_compression/Arithmetic_Coding.py

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
from collections_extended import bag
2+
from decimal import Decimal, getcontext
3+
4+
# Set high precision for decimal calculations
5+
getcontext().prec = 50
6+
7+
def build_probability_table(data):
8+
"""Returns a dictionary int the form (symbol: probability)"""
9+
freq = bag(data) # A bag is like a multiset
10+
return {char: Decimal(freq.count(char)) / Decimal(len(data)) for char in set(freq)}
11+
12+
def arithmetic_encode(data, prob_table):
13+
"""Preforms arithmetic coding compression"""
14+
symbols = sorted(prob_table.keys())
15+
cumulative = {}
16+
cumulative_sum = Decimal('0.0')
17+
for sym in symbols:
18+
cumulative[sym] = cumulative_sum
19+
cumulative_sum += prob_table[sym]
20+
21+
low, high = Decimal('0.0'), Decimal('1.0')
22+
for symbol in data:
23+
range_ = high - low
24+
high = low + range_ * (cumulative[symbol] + prob_table[symbol])
25+
low = low + range_ * cumulative[symbol]
26+
27+
return (low + high) / 2, len(data)
28+
29+
def arithmetic_decode(encoded_value, length, prob_table):
30+
"""Decodes an arithmetic-coded value"""
31+
symbols = sorted(prob_table.keys())
32+
cumulative = {}
33+
cumulative_sum = Decimal('0.0')
34+
for sym in symbols:
35+
cumulative[sym] = cumulative_sum
36+
cumulative_sum += prob_table[sym]
37+
38+
result = []
39+
low, high = Decimal('0.0'), Decimal('1.0')
40+
value = Decimal(str(encoded_value))
41+
42+
for _ in range(length):
43+
range_ = high - low
44+
for sym in symbols:
45+
sym_low = low + range_ * cumulative[sym]
46+
sym_high = sym_low + range_ * prob_table[sym]
47+
if sym_low <= value < sym_high:
48+
result.append(sym)
49+
low, high = sym_low, sym_high
50+
break
51+
52+
return ''.join(result)
53+
54+
if __name__ == "__main__":
55+
text = "this is text used for testing"
56+
print(f"Original: {text}")
57+
58+
prob_table = build_probability_table(text)
59+
encoded_value, length = arithmetic_encode(text, prob_table)
60+
print(f"Encoded value: {encoded_value}")
61+
62+
decoded_text = arithmetic_decode(encoded_value, length, prob_table)
63+
print(f"Decoded: {decoded_text}")
64+
65+
# Show compression ratio
66+
import sys
67+
original_size = sys.getsizeof(text)
68+
encoded_size = sys.getsizeof(str(encoded_value))
69+
print(f"Compression ratio: {original_size / encoded_size:.2f}")

data_compression/CABAC.py

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
class CABAC:
2+
def __init__(self):
3+
self.low = 0
4+
self.high = (1 << 32) - 1
5+
self.context = [0.5] * 256 # probability model for 256 contexts
6+
7+
def _update_context(self, ctx, bit):
8+
# Simple adaptation: move probability toward observed bit
9+
alpha = 0.05
10+
self.context[ctx] = (1 - alpha) * self.context[ctx] + alpha * bit
11+
12+
def encode_bit(self, bit, ctx, output):
13+
prob = self.context[ctx]
14+
range_ = self.high - self.low + 1
15+
split = self.low + int(range_ * prob)
16+
17+
if bit == 0:
18+
self.high = split
19+
else:
20+
self.low = split + 1
21+
22+
while (self.high ^ self.low) < (1 << 24):
23+
output.append((self.high >> 24) & 0xFF)
24+
self.low = (self.low << 8) & 0xFFFFFFFF
25+
self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF
26+
27+
self._update_context(ctx, bit)
28+
29+
def finish_encoding(self, output):
30+
for _ in range(4):
31+
output.append((self.low >> 24) & 0xFF)
32+
self.low = (self.low << 8) & 0xFFFFFFFF
33+
34+
def decode_bit(self, ctx, input_bits):
35+
prob = self.context[ctx]
36+
range_ = self.high - self.low + 1
37+
split = self.low + int(range_ * prob)
38+
39+
if self.code <= split:
40+
self.high = split
41+
bit = 0
42+
else:
43+
self.low = split + 1
44+
bit = 1
45+
46+
while (self.high ^ self.low) < (1 << 24):
47+
self.code = ((self.code << 8) & 0xFFFFFFFF) | next(input_bits)
48+
self.low = (self.low << 8) & 0xFFFFFFFF
49+
self.high = ((self.high << 8) & 0xFFFFFFFF) | 0xFF
50+
51+
self._update_context(ctx, bit)
52+
return bit
53+
54+
def start_decoding(self, encoded_bytes):
55+
self.low = 0
56+
self.high = (1 << 32) - 1
57+
self.code = 0
58+
input_bits = iter(encoded_bytes)
59+
for _ in range(4):
60+
self.code = (self.code << 8) | next(input_bits)
61+
return input_bits
62+
63+
64+
def string_to_bits(s):
65+
return [(byte >> i) & 1 for byte in s.encode('utf-8') for i in range(7, -1, -1)]
66+
67+
def bits_to_string(bits):
68+
b = bytearray()
69+
for i in range(0, len(bits), 8):
70+
byte = 0
71+
for bit in bits[i:i+8]:
72+
byte = (byte << 1) | bit
73+
b.append(byte)
74+
return b.decode('utf-8')
75+
76+
def main(text: str):
77+
encoder = CABAC()
78+
output_bytes = []
79+
bits = string_to_bits(text)
80+
81+
for i, bit in enumerate(bits):
82+
ctx = i % 256 # simple positional context
83+
encoder.encode_bit(bit, ctx, output_bytes)
84+
encoder.finish_encoding(output_bytes)
85+
86+
# Decode
87+
decoder = CABAC()
88+
bitstream = decoder.start_decoding(iter(output_bytes))
89+
decoded_bits = [decoder.decode_bit(i % 256, bitstream) for i in range(len(bits))]
90+
decoded_text = bits_to_string(decoded_bits)
91+
92+
print("Original:", text)
93+
print("Decoded :", decoded_text)
94+
print("Compressed size (bytes):", len(output_bytes))
95+
96+
if __name__ == "__main__":
97+
# Example usage
98+
main("Hello CABAC!")

0 commit comments

Comments
 (0)