diff --git a/.github/workflows/build_tests.yml b/.github/workflows/build_tests.yml index c982751..526d1df 100644 --- a/.github/workflows/build_tests.yml +++ b/.github/workflows/build_tests.yml @@ -13,7 +13,7 @@ jobs: - name: Setup Python uses: actions/setup-python@v1 with: - python-version: 3.8 + python-version: "3.10" - name: Install Build Tools run: | python -m pip install build wheel diff --git a/.github/workflows/unit_tests.yml b/.github/workflows/unit_tests.yml new file mode 100644 index 0000000..cd4a460 --- /dev/null +++ b/.github/workflows/unit_tests.yml @@ -0,0 +1,58 @@ +name: Run UnitTests +on: + pull_request: + branches: + - dev + paths-ignore: + - 'hivemind_bus_client/version.py' + - '.github/**' + - '.gitignore' + - 'LICENSE' + - 'CHANGELOG.md' + - 'MANIFEST.in' + - 'README.md' + push: + branches: + - master + paths-ignore: + - 'hivemind_bus_client/version.py' + - '.github/**' + - '.gitignore' + - 'LICENSE' + - 'CHANGELOG.md' + - 'MANIFEST.in' + - 'README.md' + workflow_dispatch: + +jobs: + unit_tests: + strategy: + matrix: + python-version: ["3.10", "3.11" ] + runs-on: ubuntu-latest + timeout-minutes: 15 + steps: + - uses: actions/checkout@v2 + - name: Set up python ${{ matrix.python-version }} + uses: actions/setup-python@v2 + with: + python-version: ${{ matrix.python-version }} + - name: Install System Dependencies + run: | + sudo apt-get update + sudo apt install python3-dev swig + python -m pip install build wheel + - name: Install repo + run: | + pip install -e . + - name: Install test dependencies + run: | + pip install -r test/requirements.txt + - name: Run unittests + run: | + pytest --cov=hivemind_bus_client --cov-report xml test + - name: Upload coverage + if: "${{ matrix.python-version == '3.11' }}" + env: + CODECOV_TOKEN: ${{secrets.CODECOV_TOKEN}} + uses: codecov/codecov-action@v2 diff --git a/hivemind_bus_client/encodings/__init__.py b/hivemind_bus_client/encodings/__init__.py new file mode 100644 index 0000000..a009afd --- /dev/null +++ b/hivemind_bus_client/encodings/__init__.py @@ -0,0 +1,3 @@ +from hivemind_bus_client.encodings.z85b import Z85B +from hivemind_bus_client.encodings.z85p import Z85P +from hivemind_bus_client.encodings.b91 import B91 diff --git a/hivemind_bus_client/encodings/b91.py b/hivemind_bus_client/encodings/b91.py new file mode 100644 index 0000000..6ce0773 --- /dev/null +++ b/hivemind_bus_client/encodings/b91.py @@ -0,0 +1,100 @@ +from typing import Union + + +class B91: + ALPHABET = [ + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', + 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z', + 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', + 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z', + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '!', '#', '$', + '%', '&', '(', ')', '*', '+', ',', '.', '/', ':', ';', '<', '=', + '>', '?', '@', '[', ']', '^', '_', '`', '{', '|', '}', '~', '"' + ] + + DECODE_TABLE = {char: idx for idx, char in enumerate(ALPHABET)} + + @classmethod + def decode(cls, encoded_data: Union[str, bytes], encoding: str = "utf-8") -> bytes: + """ + Decodes a Base91-encoded string into its original binary form. + + Args: + encoded_data (Union[str, bytes]): Base91-encoded input data. If `bytes`, it is decoded as UTF-8. + encoding (str): The encoding to use if `encoded_data` is provided as a string. Default is 'utf-8'. + + Returns: + bytes: The decoded binary data. + + Raises: + ValueError: If the input contains invalid Base91 characters. + """ + if isinstance(encoded_data, bytes): + encoded_data = encoded_data.decode(encoding) + + v = -1 + b = 0 + n = 0 + out = bytearray() + + for char in encoded_data: + if char not in cls.DECODE_TABLE: + raise ValueError(f"Invalid Base91 character: {char}") + c = cls.DECODE_TABLE[char] + if v < 0: + v = c + else: + v += c * 91 + b |= v << n + n += 13 if (v & 8191) > 88 else 14 + while n >= 8: + out.append(b & 255) + b >>= 8 + n -= 8 + v = -1 + + if v >= 0: + out.append((b | v << n) & 255) + + return bytes(out) + + @classmethod + def encode(cls, data: Union[bytes, str], encoding: str = "utf-8") -> bytes: + """ + Encodes binary data into a Base91-encoded string. + + Args: + data (Union[bytes, str]): Input binary data to encode. If `str`, it is encoded as UTF-8. + encoding (str): The encoding to use if `data` is provided as a string. Default is 'utf-8'. + + Returns: + str: The Base91-encoded string. + """ + if isinstance(data, str): + data = data.encode(encoding) + + b = 0 + n = 0 + out = [] + + for byte in data: + b |= byte << n + n += 8 + if n > 13: + v = b & 8191 + if v > 88: + b >>= 13 + n -= 13 + else: + v = b & 16383 + b >>= 14 + n -= 14 + out.append(cls.ALPHABET[v % 91]) + out.append(cls.ALPHABET[v // 91]) + + if n: + out.append(cls.ALPHABET[b % 91]) + if n > 7 or b > 90: + out.append(cls.ALPHABET[b // 91]) + + return ''.join(out).encode(encoding) diff --git a/hivemind_bus_client/encodings/z85b.py b/hivemind_bus_client/encodings/z85b.py new file mode 100644 index 0000000..50c89a0 --- /dev/null +++ b/hivemind_bus_client/encodings/z85b.py @@ -0,0 +1,108 @@ +""" +Python implementation of Z85b 85-bit encoding. + +Z85b is a variation of ZMQ RFC 32 Z85 85-bit encoding with the following differences: +1. Little-endian encoding (to facilitate alignment with lower byte indices). +2. No requirement for a multiple of 4/5 length. +3. `decode_z85b()` eliminates whitespace from the input. +4. `decode_z85b()` raises a clear exception if invalid characters are encountered. + +This file is a derivative work of https://gist.github.com/minrk/6357188?permalink_comment_id=2366506#gistcomment-2366506 + +Copyright (c) 2013 Brian Granger, Min Ragan-Kelley +Distributed under the terms of the New BSD License. +""" +import re +import struct +from typing import Union + +from hivemind_bus_client.exceptions import Z85DecodeError + + +class Z85B: + # Z85CHARS is the base 85 symbol table + Z85CHARS = bytearray(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#") + + # Z85MAP maps integers in [0, 84] to the appropriate character in Z85CHARS + Z85MAP = {char: idx for idx, char in enumerate(Z85CHARS)} + + # Powers of 85 for encoding/decoding + _85s = [85 ** i for i in range(5)] + + # Padding lengths for encoding and decoding + _E_PADDING = [0, 3, 2, 1] + _D_PADDING = [0, 4, 3, 2, 1] + + @classmethod + def encode(cls, data: Union[str, bytes], encoding: str = "utf-8") -> bytes: + """ + Encode raw bytes into Z85b format. + + Args: + data (Union[str, bytes]): Input data to encode. + encoding (str): The encoding to use if `data` is provided as a string. Default is 'utf-8'. + + Returns: + bytes: Z85b-encoded bytes. + """ + if isinstance(data, str): + data = data.encode(encoding) + data = bytearray(data) + padding = cls._E_PADDING[len(data) % 4] + data += b'\x00' * padding + nvalues = len(data) // 4 + + # Pack the raw bytes into little-endian 32-bit integers + values = struct.unpack(f'<{nvalues}I', data) + encoded = bytearray() + + for value in values: + for offset in cls._85s: + encoded.append(cls.Z85CHARS[(value // offset) % 85]) + + # Remove padding characters from the encoded output + if padding: + encoded = encoded[:-padding] + return bytes(encoded) + + @classmethod + def decode(cls, encoded_data: Union[str, bytes], encoding: str = "utf-8") -> bytes: + """ + Decode Z85b-encoded bytes into raw bytes. + + Args: + encoded_data (Union[str, bytes]): Z85b-encoded data. + encoding (str): The encoding to use if `encoded_data` is provided as a string. Default is 'utf-8'. + + Returns: + bytes: Decoded raw bytes. + + Raises: + Z85DecodeError: If invalid characters are encountered during decoding. + """ + # Normalize input by removing whitespace + encoded_data = bytearray(re.sub(rb'\s+', b'', + encoded_data if isinstance(encoded_data, bytes) + else encoded_data.encode(encoding))) + padding = cls._D_PADDING[len(encoded_data) % 5] + nvalues = (len(encoded_data) + padding) // 5 + + values = [] + for i in range(0, len(encoded_data), 5): + value = 0 + for j, offset in enumerate(cls._85s): + try: + value += cls.Z85MAP[encoded_data[i + j]] * offset + except IndexError: + break # End of input reached + except KeyError as e: + raise Z85DecodeError(f"Invalid byte code: {e.args[0]!r}") + values.append(value) + + # Unpack the values back into raw bytes + decoded = struct.pack(f'<{nvalues}I', *values) + + # Remove padding from the decoded output + if padding: + decoded = decoded[:-padding] + return decoded diff --git a/hivemind_bus_client/encodings/z85p.py b/hivemind_bus_client/encodings/z85p.py new file mode 100644 index 0000000..3298a17 --- /dev/null +++ b/hivemind_bus_client/encodings/z85p.py @@ -0,0 +1,88 @@ +from typing import Union +import struct + +class Z85P: + """ + Z85 is a class that provides encoding and decoding methods for transforming raw bytes into the Z85 encoding format. + Z85 encoding represents 32-bit chunks of input bytes into a base85-encoded string with padding applied. + The padding is added to ensure the encoded data's length is a multiple of 4 characters. + The first byte of the encoded data indicates how many padding characters were added, which can be removed during decoding. + """ + Z85CHARS = b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#" + Z85MAP = {c: idx for idx, c in enumerate(Z85CHARS)} + + _85s = [85 ** i for i in range(5)][::-1] + + @classmethod + def encode(cls, rawbytes: Union[str, bytes]) -> bytes: + """ + Encodes raw bytes into Z85 encoding format with padding, and prepends the padding size. + + Args: + rawbytes (Union[str, bytes]): The input raw bytes to be encoded. + + Returns: + bytes: The Z85-encoded byte sequence with appropriate padding and padding size indication. + + Notes: + The padding is applied to ensure the length of the encoded data is a multiple of 5. The first byte in the + returned byte sequence represents the number of padding characters added. + """ + if isinstance(rawbytes, str): + rawbytes = rawbytes.encode("utf-8") + + padding = (4 - len(rawbytes) % 4) % 4 # Padding to make the length a multiple of 4 + rawbytes += b'\x00' * padding + + # The first byte indicates how many padding characters were added + nvalues = len(rawbytes) // 4 + values = struct.unpack('>%dI' % nvalues, rawbytes) + encoded = [padding] + + for v in values: + for offset in cls._85s: + encoded.append(cls.Z85CHARS[(v // offset) % 85]) + + return bytes(encoded) + + @classmethod + def decode(cls, z85bytes: Union[str, bytes]) -> bytes: + """ + Decodes a Z85-encoded byte sequence back into raw bytes, removing padding as indicated by the first byte. + + Args: + z85bytes (Union[str, bytes]): The Z85-encoded byte sequence to be decoded. + + Returns: + bytes: The decoded raw byte sequence with padding removed. + + Raises: + ValueError: If the length of the input data is not divisible by 5 or contains invalid Z85 encoding. + + Notes: + The first byte of the encoded data indicates the padding size, and this padding is removed during decoding. + """ + if isinstance(z85bytes, str): + z85bytes = z85bytes.encode("utf-8") + + if len(z85bytes) == 0: + return z85bytes + + if len(z85bytes) % 5 != 1: + raise ValueError('Invalid data length, should be divisible by 5 with 1 extra byte for padding indicator.') + + padding = z85bytes[0] # Read the padding size from the first byte + if padding < 0 or padding > 4: + raise ValueError('Padding size must be between 0 and 4.') + + z85bytes = z85bytes[1:] # Remove the first byte (padding size byte) + + values = [] + for i in range(0, len(z85bytes), 5): + value = 0 + for j, offset in enumerate(cls._85s): + value += cls.Z85MAP[z85bytes[i + j]] * offset + values.append(value) + + decoded = struct.pack('>%dI' % len(values), *values) + return decoded[:-padding] if padding else decoded # Remove padding diff --git a/hivemind_bus_client/encryption.py b/hivemind_bus_client/encryption.py index 61c9ac3..bfda5d0 100644 --- a/hivemind_bus_client/encryption.py +++ b/hivemind_bus_client/encryption.py @@ -2,12 +2,12 @@ import json from binascii import hexlify, unhexlify from typing import Union, Optional, Dict, Any, Literal, List, Callable - +import base64 import pybase64 -from hivemind_bus_client.z85b import Z85B from Cryptodome.Cipher import AES, ChaCha20_Poly1305 from cpuinfo import get_cpu_info +from hivemind_bus_client.encodings import Z85B, B91, Z85P from hivemind_bus_client.exceptions import EncryptionKeyError, DecryptionKeyError, InvalidEncoding, InvalidCipher, \ InvalidKeySize @@ -41,19 +41,31 @@ class SupportedEncodings(str, enum.Enum): Ciphers output binary data, and JSON needs to transmit that data as plaintext. The supported encodings include Base64 and Hex encoding. """ + JSON_B91 = "JSON-B91" # JSON text output with Base91 encoding JSON_Z85B = "JSON-Z85B" # JSON text output with Z85B encoding + JSON_Z85P = "JSON-Z85P" # JSON text output with Z85B encoding JSON_B64 = "JSON-B64" # JSON text output with Base64 encoding - JSON_HEX = "JSON-HEX" # JSON text output with Hex encoding + JSON_URLSAFE_B64 = "JSON-URLSAFE-B64" # JSON text output with url safe Base64 encoding + JSON_B32 = "JSON-B32" # JSON text output with Base32 encoding + JSON_HEX = "JSON-HEX" # JSON text output with Base16 (Hex) encoding def get_encoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: encoding = _norm_encoding(encoding) if encoding == SupportedEncodings.JSON_B64: return pybase64.b64encode + if encoding == SupportedEncodings.JSON_URLSAFE_B64: + return pybase64.urlsafe_b64encode + if encoding == SupportedEncodings.JSON_B32: + return base64.b32encode if encoding == SupportedEncodings.JSON_HEX: return hexlify if encoding == SupportedEncodings.JSON_Z85B: return Z85B.encode + if encoding == SupportedEncodings.JSON_Z85P: + return Z85P.encode + if encoding == SupportedEncodings.JSON_B91: + return B91.encode raise InvalidEncoding(f"Invalid encoding: {encoding}") @@ -61,10 +73,18 @@ def get_decoder(encoding: SupportedEncodings) -> Callable[[bytes], bytes]: encoding = _norm_encoding(encoding) if encoding == SupportedEncodings.JSON_B64: return pybase64.b64decode + if encoding == SupportedEncodings.JSON_URLSAFE_B64: + return pybase64.urlsafe_b64decode + if encoding == SupportedEncodings.JSON_B32: + return base64.b32decode if encoding == SupportedEncodings.JSON_HEX: return unhexlify if encoding == SupportedEncodings.JSON_Z85B: return Z85B.decode + if encoding == SupportedEncodings.JSON_Z85P: + return Z85P.decode + if encoding == SupportedEncodings.JSON_B91: + return B91.decode raise InvalidEncoding(f"Invalid encoding: {encoding}") diff --git a/hivemind_bus_client/protocol.py b/hivemind_bus_client/protocol.py index 7efda07..ed82296 100644 --- a/hivemind_bus_client/protocol.py +++ b/hivemind_bus_client/protocol.py @@ -144,9 +144,7 @@ def start_handshake(self): LOG.info("hivemind does not support binarization protocol") payload = {"binarize": self.binarize, - "encodings": [SupportedEncodings.JSON_B64, - SupportedEncodings.JSON_Z85B, - SupportedEncodings.JSON_HEX], + "encodings": list(SupportedEncodings), "ciphers": optimal_ciphers()} if self.pswd_handshake is not None: payload["envelope"] = self.pswd_handshake.generate_handshake() diff --git a/hivemind_bus_client/z85b.py b/hivemind_bus_client/z85b.py index 1697d7b..a68c49a 100644 --- a/hivemind_bus_client/z85b.py +++ b/hivemind_bus_client/z85b.py @@ -1,102 +1,11 @@ -""" -Python implementation of Z85b 85-bit encoding. +import warnings -Z85b is a variation of ZMQ RFC 32 Z85 85-bit encoding with the following differences: -1. Little-endian encoding (to facilitate alignment with lower byte indices). -2. No requirement for a multiple of 4/5 length. -3. `decode_z85b()` eliminates whitespace from the input. -4. `decode_z85b()` raises a clear exception if invalid characters are encountered. +from hivemind_bus_client.encodings.z85b import Z85B -This file is a derivative work of z85.py from pyzmq. - -Copyright (c) 2013 Brian Granger, Min Ragan-Kelley -Distributed under the terms of the New BSD License. -""" -import re -import struct -from typing import Union - -from hivemind_bus_client.exceptions import Z85DecodeError - - -class Z85B: - # Z85CHARS is the base 85 symbol table - Z85CHARS = bytearray(b"0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ.-:+=^!/*?&<>()[]{}@%$#") - - # Z85MAP maps integers in [0, 84] to the appropriate character in Z85CHARS - Z85MAP = {char: idx for idx, char in enumerate(Z85CHARS)} - - # Powers of 85 for encoding/decoding - _85s = [85 ** i for i in range(5)] - - # Padding lengths for encoding and decoding - _E_PADDING = [0, 3, 2, 1] - _D_PADDING = [0, 4, 3, 2, 1] - - @classmethod - def encode(cls, rawbytes: Union[str, bytes]) -> bytes: - """ - Encode raw bytes into Z85b format. - - Args: - rawbytes (Union[str, bytes]): Input data to encode. - - Returns: - bytes: Z85b-encoded bytes. - """ - rawbytes = bytearray(rawbytes) if isinstance(rawbytes, (bytes, str)) else rawbytes - padding = cls._E_PADDING[len(rawbytes) % 4] - rawbytes += b'\x00' * padding - nvalues = len(rawbytes) // 4 - - # Pack the raw bytes into little-endian 32-bit integers - values = struct.unpack(f'<{nvalues}I', rawbytes) - encoded = bytearray() - - for value in values: - for offset in cls._85s: - encoded.append(cls.Z85CHARS[(value // offset) % 85]) - - # Remove padding characters from the encoded output - if padding: - encoded = encoded[:-padding] - return bytes(encoded) - - @classmethod - def decode(cls, z85bytes: Union[str, bytes]) -> bytes: - """ - Decode Z85b-encoded bytes into raw bytes. - - Args: - z85bytes (Union[str, bytes]): Z85b-encoded data. - - Returns: - bytes: Decoded raw bytes. - - Raises: - Z85DecodeError: If invalid characters are encountered during decoding. - """ - # Normalize input by removing whitespace - z85bytes = bytearray(re.sub(rb'\s+', b'', z85bytes if isinstance(z85bytes, bytes) else z85bytes.encode())) - padding = cls._D_PADDING[len(z85bytes) % 5] - nvalues = (len(z85bytes) + padding) // 5 - - values = [] - for i in range(0, len(z85bytes), 5): - value = 0 - for j, offset in enumerate(cls._85s): - try: - value += cls.Z85MAP[z85bytes[i + j]] * offset - except IndexError: - break # End of input reached - except KeyError as e: - raise Z85DecodeError(f"Invalid byte code: {e.args[0]!r}") - values.append(value) - - # Unpack the values back into raw bytes - decoded = struct.pack(f'<{nvalues}I', *values) - - # Remove padding from the decoded output - if padding: - decoded = decoded[:-padding] - return decoded +# Deprecation warning +warnings.warn( + "Importing Z85B from hivemind_bus_client.z85b is deprecated and will be removed in a future release. " + "Please update your code to use the new import path 'from hivemind_bus_client.encodings.z85b'.", + DeprecationWarning, + stacklevel=2, +) \ No newline at end of file diff --git a/setup.py b/setup.py index 0e7fe73..2d8bd90 100644 --- a/setup.py +++ b/setup.py @@ -44,7 +44,7 @@ def required(requirements_file): setup( name='hivemind_bus_client', version=get_version(), - packages=['hivemind_bus_client'], + packages=['hivemind_bus_client', 'hivemind_bus_client.encodings'], package_data={ '*': ['*.txt', '*.md'] }, diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/test/requirements.txt b/test/requirements.txt new file mode 100644 index 0000000..9e40bde --- /dev/null +++ b/test/requirements.txt @@ -0,0 +1,2 @@ +pytest~=7.1 +pytest-cov~=4.1 \ No newline at end of file diff --git a/test/test_b91.py b/test/test_b91.py new file mode 100644 index 0000000..8cee1f9 --- /dev/null +++ b/test/test_b91.py @@ -0,0 +1,62 @@ +import unittest +from hivemind_bus_client.encodings import B91 + + +class TestB91(unittest.TestCase): + def test_encode_empty(self): + """Test encoding an empty byte sequence.""" + self.assertEqual(B91.encode(b''), b'') + self.assertEqual(B91.encode(''), b'') + + def test_decode_empty(self): + """Test decoding an empty string.""" + self.assertEqual(B91.decode(''), b'') + self.assertEqual(B91.decode(b''), b'') + + def test_encode_single_byte(self): + """Test encoding a single byte.""" + self.assertEqual(b'A', B91.decode(B91.encode(b'A'))) + self.assertEqual(b'B', B91.decode(B91.encode('B'))) + self.assertEqual(b'_~', B91.decode(B91.encode(b'_~'))) + self.assertEqual(b'_~', B91.decode(B91.encode('_~'))) + + def test_encode_short_string(self): + """Test encoding a short string.""" + self.assertEqual(b'hello', B91.decode(B91.encode(b'hello'))) + self.assertEqual(B91.decode('>OwJh>Io0Tv!lE'), b'Hello World') + + def test_encode_decode_round_trip(self): + """Test encoding and decoding round-trip.""" + data = b'The quick brown fox jumps over the lazy dog.' + encoded = B91.encode(data) + decoded = B91.decode(encoded) + self.assertEqual(decoded, data) + + def test_encode_unicode_string(self): + """Test encoding a Unicode string.""" + data = 'こんにちは' # Japanese for "hello" + encoded = B91.encode(data) + decoded = B91.decode(encoded) + self.assertEqual(decoded.decode('utf-8'), data) + + def test_decode_invalid_character(self): + """Test decoding with invalid Base91 characters.""" + with self.assertRaises(ValueError): + B91.decode('Invalid🎉Chars') + + def test_3bytes_threshold(self): + """Test edge cases around the 88 threshold.""" + data = b'\x00\x00\x00' # Minimal data + encoded = B91.encode(data) + self.assertEqual(B91.decode(encoded), data) + + def test_encode_large_data(self): + """Test encoding a large byte sequence.""" + data = b'\xff' * 1000 + encoded = B91.encode(data) + decoded = B91.decode(encoded) + self.assertEqual(decoded, data) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_z85b.py b/test/test_z85b.py new file mode 100644 index 0000000..aa4af7a --- /dev/null +++ b/test/test_z85b.py @@ -0,0 +1,63 @@ +import unittest +from hivemind_bus_client.encodings import Z85B +from hivemind_bus_client.exceptions import Z85DecodeError + + +class TestZ85B(unittest.TestCase): + def test_encode_empty(self): + """Test encoding an empty byte sequence.""" + self.assertEqual(Z85B.encode(b''), b'') + self.assertEqual(Z85B.encode(''), b'') + + def test_decode_empty(self): + """Test decoding an empty string.""" + self.assertEqual(Z85B.decode(''), b'') + self.assertEqual(Z85B.decode(b''), b'') + + def test_encode_single_byte(self): + """Test encoding a single byte.""" + self.assertEqual(b'A', Z85B.decode(Z85B.encode(b'A'))) + self.assertEqual(b'B', Z85B.decode(Z85B.encode('B'))) + self.assertEqual(b'_~', Z85B.decode(Z85B.encode(b'_~'))) + self.assertEqual(b'_~', Z85B.decode(Z85B.encode('_~'))) + + def test_encode_short_string(self): + """Test encoding a short string.""" + self.assertEqual(b'hello', Z85B.decode(Z85B.encode(b'hello'))) + self.assertEqual(b'Hello World', Z85B.decode(Z85B.encode(b'Hello World'))) + + def test_encode_decode_round_trip(self): + """Test encoding and decoding round-trip.""" + data = b'The quick brown fox jumps over the lazy dog.' + encoded = Z85B.encode(data) + decoded = Z85B.decode(encoded) + self.assertEqual(decoded, data) + + def test_encode_unicode_string(self): + """Test encoding a Unicode string.""" + data = 'こんにちは' # Japanese for "hello" + encoded = Z85B.encode(data) + decoded = Z85B.decode(encoded) + self.assertEqual(decoded.decode('utf-8'), data) + + def test_decode_invalid_character(self): + """Test decoding with invalid Base91 characters.""" + with self.assertRaises(Z85DecodeError): + Z85B.decode('Invalid🎉Chars') + + def test_edge_case_88_threshold(self): + """Test edge cases around the 88 threshold.""" + data = b'\x00\x00\x00' # Minimal data + encoded = Z85B.encode(data) + self.assertEqual(Z85B.decode(encoded), data) + + def test_encode_large_data(self): + """Test encoding a large byte sequence.""" + data = b'\xff' * 1000 + encoded = Z85B.encode(data) + decoded = Z85B.decode(encoded) + self.assertEqual(decoded, data) + + +if __name__ == '__main__': + unittest.main() diff --git a/test/test_z85p.py b/test/test_z85p.py new file mode 100644 index 0000000..f18303c --- /dev/null +++ b/test/test_z85p.py @@ -0,0 +1,104 @@ +import unittest +from hivemind_bus_client.encodings.z85p import Z85P + + +class TestZ85P(unittest.TestCase): + def test_encode_empty(self): + """Test encoding an empty byte sequence.""" + self.assertEqual(Z85P.encode(b''), b'\x00') + self.assertEqual(Z85P.encode(''), b'\x00') + + def test_decode_empty(self): + """Test decoding an empty string.""" + self.assertEqual(Z85P.decode(b'\x00'), b'') + self.assertEqual(Z85P.decode(''), b'') + self.assertEqual(Z85P.decode(b''), b'') + + def test_encode_single_byte(self): + """Test encoding a single byte.""" + self.assertEqual(b'A', Z85P.decode(Z85P.encode(b'A'))) + self.assertEqual(b'B', Z85P.decode(Z85P.encode('B'))) + self.assertEqual(b'_~', Z85P.decode(Z85P.encode(b'_~'))) + self.assertEqual(b'_~', Z85P.decode(Z85P.encode('_~'))) + + def test_encode_short_string(self): + """Test encoding a short string.""" + self.assertEqual(b'hello', Z85P.decode(Z85P.encode(b'hello'))) + + def test_encode_decode_round_trip(self): + """Test encoding and decoding round-trip.""" + data = b'The quick brown fox jumps over the lazy dog.' + encoded = Z85P.encode(data) + decoded = Z85P.decode(encoded) + self.assertEqual(decoded, data) + + def test_encode_unicode_string(self): + """Test encoding a Unicode string.""" + data = 'こんにちは' # Japanese for "hello" + encoded = Z85P.encode(data) + decoded = Z85P.decode(encoded) + self.assertEqual(decoded.decode('utf-8'), data) + + def test_decode_invalid_character(self): + """Test decoding with invalid z85 characters.""" + with self.assertRaises(ValueError): + Z85P.decode('Invalid🎉Chars') + + def test_encode_large_data(self): + """Test encoding a large byte sequence.""" + data = b'\xff' * 1000 + encoded = Z85P.encode(data) + decoded = Z85P.decode(encoded) + self.assertEqual(decoded, data) + + def test_padding_single_byte(self): + """Test encoding and decoding with one byte that requires padding.""" + data = b'\x01' # Single byte, should get padded with 3 \x00 bytes + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 3) # Check padding byte + self.assertEqual(Z85P.decode(encoded), data) + + def test_padding_two_bytes(self): + """Test encoding and decoding with two bytes that require padding.""" + data = b'\x01\x01' # Two bytes, should get padded with 2 \x00 bytes + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 2) # Check padding byte + self.assertEqual(Z85P.decode(encoded), data) + + def test_padding_three_bytes(self): + """Test encoding and decoding with three bytes that require padding.""" + data = b'\x01\x01\x01' # Three bytes, should get padded with 1 \x00 byte + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 1) # Check padding byte + self.assertEqual(Z85P.decode(encoded), data) + + def test_no_padding_5_bytes(self): # fails + data = b'\x01\x01\x01\x01\x01' # 5 bytes + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 3) # Check padding byte + self.assertEqual(Z85P.decode(encoded), data) + + def test_no_padding_needed(self): + """Test encoding and decoding with data that doesn't need padding.""" + data = b'\x01\x01\x01\x01' # Exactly 4 bytes, no padding + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 0) # No padding + self.assertEqual(Z85P.decode(encoded), data) + + def test_round_trip_padding(self): + """Test round-trip encoding and decoding with padding.""" + data = b'\x01\x01\x01' # Less than 4 bytes, needs padding + encoded = Z85P.encode(data) + decoded = Z85P.decode(encoded) + self.assertEqual(decoded, data) # Ensure padding is correctly removed + + def test_padding_removal_after_decoding(self): + """Test ensuring padding is correctly removed after decoding.""" + data = b'\x01\x01\x01' # Less than 4 bytes, needs padding + encoded = Z85P.encode(data) + self.assertEqual(encoded[0], 1) # Padding size is 1 + decoded = Z85P.decode(encoded) + self.assertEqual(decoded, data) # Padding should be removed + +if __name__ == '__main__': + unittest.main()