diff --git a/.github/workflows/python_detailed.yml b/.github/workflows/python_detailed.yml index 8c16a94..0d6c531 100644 --- a/.github/workflows/python_detailed.yml +++ b/.github/workflows/python_detailed.yml @@ -17,6 +17,10 @@ env: WIN_LIBOQS_INSTALL_PATH: C:\liboqs VERSION: 0.14.0 +concurrency: + group: test-python-detailed-${{ github.event_name == 'pull_request' && github.head_ref || github.sha }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + jobs: build: strategy: @@ -66,6 +70,8 @@ jobs: - name: Run unit tests POSIX if: matrix.os != 'windows-latest' run: | + # Ensure dev extras (nose2, pyasn1, etc.) are present + uv sync --extra dev uv run nose2 --verbose - name: Install liboqs Windows @@ -96,4 +102,6 @@ jobs: if: matrix.os == 'windows-latest' run: | set PATH=%PATH%;${{env.WIN_LIBOQS_INSTALL_PATH}}\bin + rem Ensure dev extras (nose2, pyasn1, etc.) are present + uv sync --extra dev uv run nose2 --verbose diff --git a/.github/workflows/python_simplified.yml b/.github/workflows/python_simplified.yml index 18a0f8f..c8899d2 100644 --- a/.github/workflows/python_simplified.yml +++ b/.github/workflows/python_simplified.yml @@ -11,6 +11,11 @@ on: permissions: contents: read +concurrency: + group: test-python-simplified-${{ github.event_name == 'pull_request' && github.head_ref || github.sha }} + cancel-in-progress: ${{ github.event_name == 'pull_request' }} + + jobs: build: strategy: @@ -38,7 +43,10 @@ jobs: uv run examples/sig.py uv run examples/rand.py uv run examples/stfl_sig.py + uv run oqs/serialize.py - name: Run unit tests run: | + # Ensure dev extras (nose2, pyasn1, etc.) are present + uv sync --extra dev uv run nose2 --verbose diff --git a/.gitignore b/.gitignore index 23b8431..492c284 100644 --- a/.gitignore +++ b/.gitignore @@ -121,3 +121,4 @@ pyvenv.cfg # uv /uv.lock +/data/tmp_keys/ diff --git a/data/xmss_xmssmt_keys/xmss-sha2_16_512.der b/data/xmss_xmssmt_keys/xmss-sha2_16_512.der new file mode 100644 index 0000000..bb0b1f8 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-sha2_16_512.der differ diff --git a/data/xmss_xmssmt_keys/xmss-sha2_20_192.der b/data/xmss_xmssmt_keys/xmss-sha2_20_192.der new file mode 100644 index 0000000..e84aac4 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-sha2_20_192.der differ diff --git a/data/xmss_xmssmt_keys/xmss-sha2_20_256.der b/data/xmss_xmssmt_keys/xmss-sha2_20_256.der new file mode 100644 index 0000000..3a1a8d4 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-sha2_20_256.der differ diff --git a/data/xmss_xmssmt_keys/xmss-sha2_20_512.der b/data/xmss_xmssmt_keys/xmss-sha2_20_512.der new file mode 100644 index 0000000..e218cef Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-sha2_20_512.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake256_20_192.der b/data/xmss_xmssmt_keys/xmss-shake256_20_192.der new file mode 100644 index 0000000..7dbac18 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake256_20_192.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake256_20_256.der b/data/xmss_xmssmt_keys/xmss-shake256_20_256.der new file mode 100644 index 0000000..5612728 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake256_20_256.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake_16_256.der b/data/xmss_xmssmt_keys/xmss-shake_16_256.der new file mode 100644 index 0000000..01e03be Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake_16_256.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake_16_512.der b/data/xmss_xmssmt_keys/xmss-shake_16_512.der new file mode 100644 index 0000000..7613b0d Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake_16_512.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake_20_256.der b/data/xmss_xmssmt_keys/xmss-shake_20_256.der new file mode 100644 index 0000000..fa11516 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake_20_256.der differ diff --git a/data/xmss_xmssmt_keys/xmss-shake_20_512.der b/data/xmss_xmssmt_keys/xmss-shake_20_512.der new file mode 100644 index 0000000..19581ea Binary files /dev/null and b/data/xmss_xmssmt_keys/xmss-shake_20_512.der differ diff --git a/data/xmss_xmssmt_keys/xmssmt-sha2_40_layers_2_256.der b/data/xmss_xmssmt_keys/xmssmt-sha2_40_layers_2_256.der new file mode 100644 index 0000000..2c86596 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmssmt-sha2_40_layers_2_256.der differ diff --git a/data/xmss_xmssmt_keys/xmssmt-sha2_60_layers_3_256.der b/data/xmss_xmssmt_keys/xmssmt-sha2_60_layers_3_256.der new file mode 100644 index 0000000..e73474f Binary files /dev/null and b/data/xmss_xmssmt_keys/xmssmt-sha2_60_layers_3_256.der differ diff --git a/data/xmss_xmssmt_keys/xmssmt-shake_40_layers_2_256.der b/data/xmss_xmssmt_keys/xmssmt-shake_40_layers_2_256.der new file mode 100644 index 0000000..fc56808 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmssmt-shake_40_layers_2_256.der differ diff --git a/data/xmss_xmssmt_keys/xmssmt-shake_60_layers_3_256.der b/data/xmss_xmssmt_keys/xmssmt-shake_60_layers_3_256.der new file mode 100644 index 0000000..21cca70 Binary files /dev/null and b/data/xmss_xmssmt_keys/xmssmt-shake_60_layers_3_256.der differ diff --git a/docker/minitest.py b/docker/minitest.py index 5cc7bb1..ae001d0 100644 --- a/docker/minitest.py +++ b/docker/minitest.py @@ -65,8 +65,7 @@ except: print( "Test of algorithm combination SIG %s/KEX %s failed. " - "Are all algorithms supported by current OQS library?" - % (sigs, kex) + "Are all algorithms supported by current OQS library?" % (sigs, kex) ) if "SHORT_TEST" in os.environ: diff --git a/oqs/oqs.py b/oqs/oqs.py index 9a24f0a..54e36b1 100644 --- a/oqs/oqs.py +++ b/oqs/oqs.py @@ -17,6 +17,9 @@ import subprocess import tempfile # to install liboqs on demand import time +import faulthandler + +faulthandler.enable() try: import tomllib # Python 3.11+ @@ -36,6 +39,7 @@ cast, Optional, ) + if TYPE_CHECKING: from collections.abc import Sequence, Iterable from types import TracebackType @@ -152,10 +156,15 @@ def _install_liboqs( oqs_version_to_install: Union[str, None] = None, ) -> None: """Install liboqs version oqs_version (if None, installs latest at HEAD) in the target_directory.""" # noqa: E501 - if "rc" in oqs_version_to_install: + # Set explicit to `None` to install the lastest `liboqs` code. + if oqs_version_to_install is None: + pass + + elif "rc" in oqs_version_to_install: # removed the "-" from the version string tmp = oqs_version_to_install.split("rc") oqs_version_to_install = tmp[0] + "-rc" + tmp[1] + with tempfile.TemporaryDirectory() as tmpdirname: oqs_install_cmd = [ "cd", @@ -746,8 +755,10 @@ def sign_with_ctx_str(self, message: bytes, context: bytes) -> bytes: :param message: the message to sign. """ if context and not self._sig.contents.sig_with_ctx_support: - msg = (f"Signing with context is not supported for: " - f"{self._sig.contents.method_name.decode()}") + msg = ( + f"Signing with context is not supported for: " + f"{self._sig.contents.method_name.decode()}" + ) raise RuntimeError(msg) # Provide length to avoid extra null char @@ -1024,6 +1035,13 @@ def _load_secret_key(self, data: bytes) -> None: buf = ct.create_string_buffer(data, len(data)) rc = native().OQS_SIG_STFL_SECRET_KEY_deserialize(self._secret_key, buf, len(data), None) if rc != OQS_SUCCESS: + if len(data) != int(self.length_secret_key): + msg = ( + f"Secret key length must be {self.length_secret_key} bytes, " + f"got {len(data)} bytes" + ) + raise ValueError(msg) + msg = "Secret‑key deserialization failed" raise RuntimeError(msg) @@ -1173,6 +1191,20 @@ def free(self) -> None: native().OQS_SIG_STFL_new.restype = ct.POINTER(StatefulSignature) native().OQS_SIG_STFL_SECRET_KEY_new.restype = ct.c_void_p native().OQS_SIG_STFL_SECRET_KEY_new.argtypes = [ct.c_char_p] +# Added precise signatures for (de)serialization to avoid ABI issues +native().OQS_SIG_STFL_SECRET_KEY_serialize.restype = ct.c_int +native().OQS_SIG_STFL_SECRET_KEY_serialize.argtypes = [ + ct.POINTER(ct.POINTER(ct.c_uint8)), + ct.POINTER(ct.c_size_t), + ct.c_void_p, +] +native().OQS_SIG_STFL_SECRET_KEY_deserialize.restype = ct.c_int +native().OQS_SIG_STFL_SECRET_KEY_deserialize.argtypes = [ + ct.c_void_p, + ct.c_void_p, + ct.c_size_t, + ct.c_void_p, +] native().OQS_SIG_STFL_SECRET_KEY_SET_store_cb.argtypes = [ct.c_void_p, ct.c_void_p, ct.c_void_p] native().OQS_SIG_STFL_keypair.argtypes = [ct.POINTER(StatefulSignature), ct.c_void_p, ct.c_void_p] native().OQS_SIG_STFL_sign.argtypes = [ diff --git a/oqs/serialize.py b/oqs/serialize.py new file mode 100644 index 0000000..eee6355 --- /dev/null +++ b/oqs/serialize.py @@ -0,0 +1,167 @@ +""" +Serialization and deserialization of stateful signature keys +using OneAsymmetricKey (PKCS#8) structure. +""" + +import logging +from pathlib import Path +from typing import Optional, Union + +from pyasn1.codec.der import encoder, decoder +from pyasn1.type import univ, tag + +import oqs +from pyasn1_alt_modules import rfc5958 + +_NAME_2_OIDS = { + "hss": "1.2.840.113549.1.9.16.3.17", # RFC 9708 + "xmss": "1.3.6.1.5.5.7.6.34", # RFC 9802 + "xmssmt": "1.3.6.1.5.5.7.6.35", # RFC 9802 +} +_OID_2_NAME = {v: k for k, v in _NAME_2_OIDS.items()} + +_KEY_DIR = Path(__file__).resolve().parent.parent / "data" / "xmss_xmssmt_keys" + + +def _get_oid_from_name(name: str) -> str: + """Get the OID corresponding to the stateful signature name.""" + if name.startswith("LMS"): + return _NAME_2_OIDS["hss"] + if name.startswith("XMSS-"): + return _NAME_2_OIDS["xmss"] + if name.startswith("XMSSMT-"): + return _NAME_2_OIDS["xmssmt"] + msg = f"Unsupported stateful signature name: {name}" + raise ValueError(msg) + + +def serialize_stateful_signature_key( + stateful_sig: oqs.StatefulSignature, public_key: bytes, fpath: str +) -> None: + """ + Serialize the stateful signature key to a `OneAsymmetricKey` structure. + + :param stateful_sig: The stateful signature object. + :param public_key: The public key bytes. + :param fpath: The file path to save the serialized key. + """ + one_asym_key = rfc5958.OneAsymmetricKey() + one_asym_key["version"] = 1 + one_asym_key["privateKeyAlgorithm"]["algorithm"] = univ.ObjectIdentifier( + _get_oid_from_name(stateful_sig.method_name.decode()) + ) + one_asym_key["privateKey"] = stateful_sig.export_secret_key() + one_asym_key["publicKey"] = ( + rfc5958.PublicKey() + .fromOctetString(public_key) + .subtype(implicitTag=tag.Tag(tag.tagClassContext, tag.tagFormatConstructed, 1)) + ) + + der_data = encoder.encode(one_asym_key) + fpath_obj = Path(fpath) + with fpath_obj.open("wb") as f: + f.write(der_data) + logging.info("Wrote: %s", fpath_obj.name) + + +def deserialize_stateful_signature_key( + key_name: str, dir_name: Union[str, Path] = _KEY_DIR +) -> tuple[bytes, bytes]: + """ + Deserialize the stateful signature key from a `OneAsymmetricKey` structure. + + :param key_name: The base name of the serialized key (without extension). + :param dir_name: The directory where the key files are stored. + :return: A tuple (private_key_bytes, public_key_bytes). + """ + key_name = key_name.replace("/", "_layers_", 1).lower() + fpath = Path(dir_name) / f"{key_name}.der" + + with fpath.open("rb") as f: + der_data = f.read() + + one_asym_key = decoder.decode(der_data, asn1Spec=rfc5958.OneAsymmetricKey())[0] + oid = str(one_asym_key["privateKeyAlgorithm"]["algorithm"]) + + # Accept any OID for supported families + if oid not in _OID_2_NAME: + msg = f"Unsupported stateful signature OID: {oid}" + raise ValueError(msg) + + private_key_bytes = one_asym_key["privateKey"].asOctets() + public_key_bytes = one_asym_key["publicKey"].asOctets() + return private_key_bytes, public_key_bytes + +def _may_generate_stfl_key( + key_name: str, dir_name: str +) -> tuple[Optional[bytes], Optional[bytes]]: + """ + Decide whether to generate a stateful signature key for the given algorithm name. + + Currently, this function allows opportunistic generation only for fast XMSS parameter sets + used in tests, specifically those starting with "XMSS-" and containing "_16_". + + :param key_name: The name of the stateful signature mechanism. + :param dir_name: The directory where the key files are stored. + :return: A tuple (private_key_bytes, public_key_bytes) if generated, else (None, None). + """ + alt_path = Path(str(dir_name).replace("xmss_xmssmt_keys", "tmp_keys", 1)) + alt_fpath = alt_path / f"{key_name.replace('/', '_layers_', 1).lower()}.der" + if key_name.startswith("XMSS-") and "_16_" in key_name: + Path(alt_path).mkdir(parents=True, exist_ok=True) + with oqs.StatefulSignature(key_name) as stfl_sig: + public_key_bytes = stfl_sig.generate_keypair() + private_key_bytes = stfl_sig.export_secret_key() + serialize_stateful_signature_key(stfl_sig, public_key_bytes, str(alt_fpath)) + return private_key_bytes, public_key_bytes + + return None, None + + +def gen_or_load_stateful_signature_key( + key_name: str, dir_name: Union[str, Path] = _KEY_DIR +) -> tuple[Optional[bytes], Optional[bytes]]: + """ + Generate or load a stateful signature key pair. + + :param key_name: The name of the stateful signature mechanism. + :param dir_name: The directory where the key files are stored. + :return: A tuple (stateful_signature_object, public_key_bytes). + """ + key_file_name = key_name.replace("/", "_layers_", 1).lower() + fpath = Path(dir_name) / f"{key_file_name}.der" + + if Path(fpath).exists(): + return deserialize_stateful_signature_key(key_file_name, dir_name=dir_name) + + # Check alternative path for test keys, to avoid regenerating for every test run. + alt_path = Path(str(_KEY_DIR).replace("xmss_xmssmt_keys", "tmp_keys", 1)) + alt_fpath = alt_path / f"{key_file_name}.der" + if Path(alt_fpath).exists(): + private_key_bytes, public_key_bytes = deserialize_stateful_signature_key( + key_name, dir_name=alt_path + ) + return private_key_bytes, public_key_bytes + + # Opportunistic generation for fast XMSS parameter sets used in tests + return _may_generate_stfl_key(key_name, dir_name) + + +if __name__ == "__main__": + xmss_names = [ + name for name in oqs.get_enabled_stateful_sig_mechanisms() if name.startswith("XMSS-") + ] + xmssmt_names = [ + name for name in oqs.get_enabled_stateful_sig_mechanisms() if name.startswith("XMSSMT-") + ] + hss_names = [ + name for name in oqs.get_enabled_stateful_sig_mechanisms() if name.startswith("LMS") + ] + logging.info("xmss_names: %s", str(xmss_names)) + private_bytes, public_bytes = deserialize_stateful_signature_key( + "XMSS-sha2_20_512", dir_name=_KEY_DIR + ) + if private_bytes is None or public_bytes is None: + ERROR_MSG = "Could not load the XMSS key" + raise ValueError(ERROR_MSG) + logging.info("Loaded XMSS key, public key len: %d", len(public_bytes)) diff --git a/pyproject.toml b/pyproject.toml index 86e31a6..b50d645 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -21,6 +21,8 @@ dev = [ "pre-commit==4.1.0", "ruff==0.9.4", "nose2==0.15.1", + "pyasn1-alt-modules==0.4.6", + "pyasn1==0.6.1", ] lint = [ "mypy==1.14.1", diff --git a/tests/test_stfl_sig.py b/tests/test_stfl_sig.py index c2459ed..6825a84 100644 --- a/tests/test_stfl_sig.py +++ b/tests/test_stfl_sig.py @@ -1,11 +1,17 @@ import logging import platform # to learn the OS we're on import random +from pathlib import Path + +from typing import Tuple + +from oqs.serialize import gen_or_load_stateful_signature_key import oqs _skip_names = ["LMS_SHA256_H20_W8_H10_W8", "LMS_SHA256_H20_W8_H15_W8", "LMS_SHA256_H20_W8_H20_W8"] +_KEY_DIR = Path(__file__).resolve().parent.parent / "data" / "xmss_xmssmt_keys" # Sigs for which unit testing is disabled disabled_sig_patterns = [] @@ -14,67 +20,87 @@ disabled_sig_patterns = [""] +def _load_or_generate_key(alg_name: str) -> Tuple[oqs.StatefulSignature, bytes]: + private_key, public_key = gen_or_load_stateful_signature_key(alg_name, dir_name=_KEY_DIR) + + if private_key is not None: + sig = oqs.StatefulSignature(alg_name, secret_key=private_key) + return sig, public_key + sig = oqs.StatefulSignature(alg_name) + public_key = sig.generate_keypair() + return sig, public_key + + def test_correctness() -> tuple[None, str]: for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if alg_name.startswith("LMS"): + continue + if any(item in alg_name for item in disabled_sig_patterns): continue yield check_correctness, alg_name def check_correctness(alg_name: str) -> None: - with oqs.StatefulSignature(alg_name) as sig: - message = bytes(random.getrandbits(8) for _ in range(100)) - public_key = sig.generate_keypair() - signature = sig.sign(message) - assert sig.verify(message, signature, public_key) # noqa: S101 + sig, public_key = _load_or_generate_key(alg_name) + message = bytes(random.getrandbits(8) for _ in range(100)) + signature = sig.sign(message) + assert sig.verify(message, signature, public_key) # noqa: S101 def test_wrong_message() -> tuple[None, str]: for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if alg_name.startswith("LMS"): + continue + if any(item in alg_name for item in disabled_sig_patterns): continue + yield check_wrong_message, alg_name def check_wrong_message(alg_name: str) -> None: - with oqs.StatefulSignature(alg_name) as sig: - message = bytes(random.getrandbits(8) for _ in range(100)) - public_key = sig.generate_keypair() - signature = sig.sign(message) - wrong_message = bytes(random.getrandbits(8) for _ in range(len(message))) - assert not (sig.verify(wrong_message, signature, public_key)) # noqa: S101 + sig, public_key = _load_or_generate_key(alg_name) + message = bytes(random.getrandbits(8) for _ in range(100)) + signature = sig.sign(message) + wrong_message = bytes(random.getrandbits(8) for _ in range(len(message))) + assert not (sig.verify(wrong_message, signature, public_key)) # noqa: S101 def test_wrong_signature() -> tuple[None, str]: for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if alg_name.startswith("LMS"): + continue + if any(item in alg_name for item in disabled_sig_patterns): continue yield check_wrong_signature, alg_name def check_wrong_signature(alg_name: str) -> None: - with oqs.StatefulSignature(alg_name) as sig: - message = bytes(random.getrandbits(8) for _ in range(100)) - public_key = sig.generate_keypair() - signature = sig.sign(message) - wrong_signature = bytes(random.getrandbits(8) for _ in range(len(signature))) - assert not (sig.verify(message, wrong_signature, public_key)) # noqa: S101 + sig, public_key = _load_or_generate_key(alg_name) + message = bytes(random.getrandbits(8) for _ in range(100)) + signature = sig.sign(message) + wrong_signature = bytes(random.getrandbits(8) for _ in range(len(signature))) + assert not (sig.verify(message, wrong_signature, public_key)) # noqa: S101 def test_wrong_public_key() -> tuple[None, str]: for alg_name in oqs.get_enabled_stateful_sig_mechanisms(): + if alg_name.startswith("LMS"): + continue + if any(item in alg_name for item in disabled_sig_patterns): continue yield check_wrong_public_key, alg_name def check_wrong_public_key(alg_name: str) -> None: - with oqs.StatefulSignature(alg_name) as sig: - message = bytes(random.getrandbits(8) for _ in range(100)) - public_key = sig.generate_keypair() - signature = sig.sign(message) - wrong_public_key = bytes(random.getrandbits(8) for _ in range(len(public_key))) - assert not (sig.verify(message, signature, wrong_public_key)) # noqa: S101 + sig, public_key = _load_or_generate_key(alg_name) + message = bytes(random.getrandbits(8) for _ in range(100)) + signature = sig.sign(message) + wrong_public_key = bytes(random.getrandbits(8) for _ in range(len(public_key))) + assert not (sig.verify(message, signature, wrong_public_key)) # noqa: S101 def test_not_supported() -> None: