|
| 1 | +from typing import List |
| 2 | +import struct |
| 3 | +import numpy as np |
| 4 | +from copy import deepcopy as dp |
| 5 | +__all__ = ['ChaCha20'] |
| 6 | +class ChaCha20: |
| 7 | + """ |
| 8 | + Implementation of the ChaCha20 stream cipher. |
| 9 | +
|
| 10 | + Attributes |
| 11 | + ---------- |
| 12 | + key : bytes |
| 13 | + 32-byte (256-bit) encryption key. |
| 14 | + nonce : bytes |
| 15 | + 12-byte (96-bit) nonce. |
| 16 | + counter : int |
| 17 | + 32-bit counter, typically starts at 0. |
| 18 | + """ |
| 19 | + def __new__(cls, key: bytes, nonce: bytes, counter: int = 0): |
| 20 | + if not isinstance(key, bytes) or len(key) != 32: |
| 21 | + raise ValueError("Key must be exactly 32 bytes (256 bits).") |
| 22 | + if not isinstance(nonce, bytes) or len(nonce) != 12: |
| 23 | + raise ValueError("Nonce must be exactly 12 bytes (96 bits).") |
| 24 | + if not isinstance(counter, int) or counter < 0: |
| 25 | + raise ValueError("Counter must be a non-negative integer.") |
| 26 | + instance = super().__new__(cls) |
| 27 | + instance.key = key |
| 28 | + instance.nonce = nonce |
| 29 | + instance.counter = counter |
| 30 | + return instance |
| 31 | + |
| 32 | + def __init__(self, key: bytes, nonce: bytes, counter: int = 0): |
| 33 | + """Initializes the ChaCha20 object.""" |
| 34 | + # Guard against multiple initializations |
| 35 | + if hasattr(self, "_initialized") and self._initialized: |
| 36 | + return |
| 37 | + self._initialized = True |
| 38 | + |
| 39 | + def __repr__(self): |
| 40 | + """Returns a string representation of the object for debugging.""" |
| 41 | + return f"<ChaCha20(key={self.key[:4].hex()}..., nonce={self.nonce.hex()}, counter={self.counter})>" |
| 42 | + |
| 43 | + |
| 44 | + def _quarter_round(self, state: np.ndarray, a: tuple, b: tuple, c: tuple, d: tuple): |
| 45 | + |
| 46 | + """ |
| 47 | + Performs the ChaCha20 quarter-round operation on the 4x4 state matrix. |
| 48 | +
|
| 49 | + The quarter-round consists of four operations (Add, XOR, and Rotate) performed on |
| 50 | + four elements of the state. It is a core component of the ChaCha20 algorithm, ensuring |
| 51 | + diffusion of bits for cryptographic security. |
| 52 | +
|
| 53 | + Parameters: |
| 54 | + ----------- |
| 55 | + state : np.ndarray |
| 56 | + A 4x4 matrix (NumPy array) representing the ChaCha20 state. |
| 57 | +
|
| 58 | + a, b, c, d : tuple |
| 59 | + Each tuple represents the (row, column) indices of four elements in the state matrix |
| 60 | + to be processed in the quarter-round. |
| 61 | +
|
| 62 | + Operations: |
| 63 | + ----------- |
| 64 | + - Add: Adds two values modulo 2^32. |
| 65 | + - XOR: Performs a bitwise XOR operation. |
| 66 | + - Rotate: Rotates bits (circular shift) to the left. |
| 67 | +
|
| 68 | + Formula for the quarter-round (performed four times): |
| 69 | + ----------------------------------------------------- |
| 70 | + 1. a += b; d ^= a; d <<<= 16 |
| 71 | + 2. c += d; b ^= c; b <<<= 12 |
| 72 | + 3. a += b; d ^= a; d <<<= 8 |
| 73 | + 4. c += d; b ^= c; b <<<= 7 |
| 74 | +
|
| 75 | + """ |
| 76 | + ax, ay = a |
| 77 | + bx, by = b |
| 78 | + cx, cy = c |
| 79 | + dx, dy = d |
| 80 | + |
| 81 | + state[ax, ay] = ((state[ax, ay].astype(np.uint32) + state[bx, by].astype(np.uint32)) & 0xFFFFFFFF).astype(np.uint32) |
| 82 | + state[dx, dy] ^= state[ax, ay] |
| 83 | + state[dx, dy] = np.bitwise_or( |
| 84 | + np.left_shift(state[dx, dy].astype(np.uint32), 16) & 0xFFFFFFFF, |
| 85 | + np.right_shift(state[dx, dy].astype(np.uint32), 16) |
| 86 | +) |
| 87 | + |
| 88 | + state[cx, cy] = ((state[cx, cy].astype(np.uint32) + state[dx, dy].astype(np.uint32)) & 0xFFFFFFFF).astype(np.uint32) |
| 89 | + state[bx, by] ^= state[cx, cy] |
| 90 | + state[bx, by] = np.bitwise_or( |
| 91 | + np.left_shift(state[bx, by].astype(np.uint32), 12) & 0xFFFFFFFF, |
| 92 | + np.right_shift(state[bx, by].astype(np.uint32), 20) |
| 93 | +) |
| 94 | + |
| 95 | + state[ax, ay] = ((state[ax, ay].astype(np.uint32) + state[bx, by].astype(np.uint32)) & 0xFFFFFFFF).astype(np.uint32) |
| 96 | + state[dx, dy] ^= state[ax, ay] |
| 97 | + state[dx, dy] = np.bitwise_or( |
| 98 | + np.left_shift(state[dx, dy].astype(np.uint32), 8) & 0xFFFFFFFF, |
| 99 | + np.right_shift(state[dx, dy].astype(np.uint32), 24) |
| 100 | +) |
| 101 | + |
| 102 | + state[cx, cy] = ((state[cx, cy].astype(np.uint32) + state[dx, dy].astype(np.uint32)) & 0xFFFFFFFF).astype(np.uint32) |
| 103 | + state[bx, by] ^= state[cx, cy] |
| 104 | + state[bx, by] = np.bitwise_or( |
| 105 | + np.left_shift(state[bx, by].astype(np.uint32), 7) & 0xFFFFFFFF, |
| 106 | + np.right_shift(state[bx, by].astype(np.uint32), 25) |
| 107 | +) |
| 108 | + def _double_round(self, state: np.ndarray): |
| 109 | + |
| 110 | + self._quarter_round(state, (0, 0), (1, 0), (2, 0), (3, 0)) |
| 111 | + self._quarter_round(state, (0, 1), (1, 1), (2, 1), (3, 1)) |
| 112 | + self._quarter_round(state, (0, 2), (1, 2), (2, 2), (3, 2)) |
| 113 | + self._quarter_round(state, (0, 3), (1, 3), (2, 3), (3, 3)) |
| 114 | + |
| 115 | + self._quarter_round(state, (0, 0), (1, 1), (2, 2), (3, 3)) |
| 116 | + self._quarter_round(state, (0, 1), (1, 2), (2, 3), (3, 0)) |
| 117 | + self._quarter_round(state, (0, 2), (1, 3), (2, 0), (3, 1)) |
| 118 | + self._quarter_round(state, (0, 3), (1, 0), (2, 1), (3, 2)) |
| 119 | + |
| 120 | + |
| 121 | + def _chacha20_block(self, counter: int) -> bytes: |
| 122 | + """ |
| 123 | + Generates a 64-byte keystream block from 16-word (512-bit) state |
| 124 | + The initial state is copied to preserve the original. |
| 125 | + 20 rounds (10 double rounds) are performed using quarter-round operations. |
| 126 | + The modified working state is combined with the original state using modular addition (mod 2^32). |
| 127 | + The result is returned as a 64-byte keystream block. |
| 128 | + """ |
| 129 | + constants = b"expand 32-byte k" |
| 130 | + state_values = struct.unpack( |
| 131 | + '<16I', |
| 132 | + constants + self.key + struct.pack('<I', counter) + self.nonce |
| 133 | + ) |
| 134 | + state = np.array(state_values, dtype=np.uint32).reshape(4, 4) |
| 135 | + working_state = dp(state) |
| 136 | + for _ in range(10): |
| 137 | + self._double_round(working_state) |
| 138 | + final_state = np.bitwise_and(working_state + state, np.uint32(0xFFFFFFFF)) |
| 139 | + return struct.pack('<16I', *final_state.flatten()) |
| 140 | + |
| 141 | + def _apply_keystream(self, data: bytes) -> bytes: |
| 142 | + """ |
| 143 | + Applies the ChaCha20 keystream to the input data (plaintext or ciphertext) |
| 144 | + to perform encryption or decryption. |
| 145 | +
|
| 146 | + This method processes the input data in 64-byte blocks. For each block: |
| 147 | + - A 64-byte keystream is generated using the `_chacha20_block()` function. |
| 148 | + - Each byte of the input block is XORed with the corresponding keystream byte. |
| 149 | + - The XORed result is appended to the output. |
| 150 | +
|
| 151 | + The same function is used for both encryption and decryption because |
| 152 | + XORing the ciphertext with the same keystream returns the original plaintext. |
| 153 | +
|
| 154 | + Args: |
| 155 | + data (bytes): The input data to be encrypted or decrypted (plaintext or ciphertext). |
| 156 | +
|
| 157 | + Returns: |
| 158 | + bytes: The result of XORing the input data with the ChaCha20 keystream |
| 159 | + (ciphertext if plaintext was provided, plaintext if ciphertext was provided). |
| 160 | + """ |
| 161 | + if len(data) == 0: |
| 162 | + return b"" |
| 163 | + result = b"" |
| 164 | + chunk_size = 64 |
| 165 | + start = 0 |
| 166 | + while start < len(data): |
| 167 | + chunk = data[start:start + chunk_size] |
| 168 | + start += chunk_size |
| 169 | + keystream = self._chacha20_block(self.counter) |
| 170 | + |
| 171 | + self.counter += 1 |
| 172 | + xor_block = [] |
| 173 | + for idx in range(len(chunk)): |
| 174 | + input_byte = chunk[idx] |
| 175 | + keystream_byte = keystream[idx] |
| 176 | + xor_block.append(input_byte ^ keystream_byte) |
| 177 | + result += bytes(xor_block) |
| 178 | + return result |
| 179 | + def encrypt(self, plaintext: bytes) -> bytes: |
| 180 | + """ |
| 181 | + Encrypts the given plaintext using the ChaCha20 stream cipher. |
| 182 | +
|
| 183 | + This method uses the ChaCha20 keystream generated from the |
| 184 | + key, nonce, and counter to XOR with the plaintext, producing ciphertext. |
| 185 | +
|
| 186 | + Args: |
| 187 | + plaintext (bytes): The plaintext data to be encrypted. |
| 188 | +
|
| 189 | + Returns: |
| 190 | + bytes: The resulting ciphertext. |
| 191 | + """ |
| 192 | + self.reset(counter=0) |
| 193 | + return self._apply_keystream(plaintext) |
| 194 | + |
| 195 | + def decrypt(self, ciphertext: bytes) -> bytes: |
| 196 | + """ |
| 197 | + Decrypts the given ciphertext using the ChaCha20 stream cipher. |
| 198 | +
|
| 199 | + Since ChaCha20 uses XOR for encryption, decryption is performed |
| 200 | + using the same keystream and XOR operation. |
| 201 | +
|
| 202 | + Args: |
| 203 | + ciphertext (bytes): The ciphertext data to be decrypted. |
| 204 | +
|
| 205 | + Returns: |
| 206 | + bytes: The resulting plaintext. |
| 207 | + """ |
| 208 | + self.reset(counter=0) |
| 209 | + return self._apply_keystream(ciphertext) |
| 210 | + |
| 211 | + def reset(self, counter: int = 0): |
| 212 | + """Resets the ChaCha20 counter to the specified value (default is 0).""" |
| 213 | + if not isinstance(counter, int) or counter < 0: |
| 214 | + raise ValueError("Counter must be a non-negative integer.") |
| 215 | + self.counter = counter |
0 commit comments