Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@
XCHACHA20POLY1305_NONCE_LEN = 24

OTP_PAD_SIZE = 11264
OTP_PADDING_LENGTH = 2
OTP_PADDING_LIMIT = 1024
OTP_MAX_BUCKET = 64
OTP_MAX_RANDOM_PAD = 16
OTP_SIZE_LENGTH = 2

SMP_NONCE_LENGTH = 64
SMP_PROOF_LENGTH = 64
Expand Down Expand Up @@ -71,5 +72,5 @@
ARGON2_MEMORY = 256 * 1024 # MB
ARGON2_ITERS = 3
ARGON2_OUTPUT_LEN = 32 # bytes
ARGON2_SALT_LEN = 16 # bytes (Must be always 16 for interoperability with libsodium.)
ARGON2_SALT_LEN = 16 # bytes (Must be always 16 for interoperability with implementations using libsodium.)
ARGON2_LANES = 4
111 changes: 76 additions & 35 deletions core/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,12 @@

import oqs
import secrets
from typing import Tuple
from core.constants import (
OTP_PAD_SIZE,
OTP_PADDING_LENGTH,
OTP_MAX_RANDOM_PAD,
OTP_SIZE_LENGTH,
OTP_MAX_BUCKET,
ML_KEM_1024_NAME,
ML_KEM_1024_SK_LEN,
ML_KEM_1024_PK_LEN,
Expand Down Expand Up @@ -59,7 +62,7 @@ def verify_signature(algorithm: str, message: bytes, signature: bytes, public_ke
with oqs.Signature(algorithm) as verifier:
return verifier.verify(message, signature[:ALGOS_BUFFER_LIMITS[algorithm]["SIGN_LEN"]], public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])

def generate_sign_keys(algorithm: str = ML_DSA_87_NAME):
def generate_sign_keys(algorithm: str = ML_DSA_87_NAME) -> Tuple[bytes, bytes]:
"""
Generates a new post-quantum signature keypair.

Expand All @@ -74,34 +77,43 @@ def generate_sign_keys(algorithm: str = ML_DSA_87_NAME):
private_key = signer.export_secret_key()
return private_key, public_key

def otp_encrypt_with_padding(plaintext: bytes, key: bytes, padding_limit: int) -> bytes:
def otp_encrypt_with_padding(plaintext: bytes, key: bytes) -> Tuple[bytes, bytes]:
"""
Encrypts plaintext using a one-time pad with random padding.
Encrypts plaintext using a one-time pad with random or bucket padding.

Process:
- Prefixes length of padding.
- Adds random padding (0..padding_limit bytes).
- Prefixes length of message.
- Adds random padding (0..padding_limit bytes) if message > 64 bytes
- If 64 bytes > message, pad message up to 64 bytes,
- XORs with one-time pad key.

Args:
plaintext: Data to encrypt.
key: OTP key (>= plaintext length + padding).
padding_limit: Max padding length.

Returns:
Ciphertext bytes.
"""
if padding_limit > ((2 ** (8 * OTP_PADDING_LENGTH)) - 1):
raise ValueError("Padding too large")

plaintext_padding = secrets.token_bytes(padding_limit)
padding_length_bytes = len(plaintext_padding).to_bytes(OTP_PADDING_LENGTH, "big")
padded_plaintext = padding_length_bytes + plaintext + plaintext_padding
if len(plaintext) <= OTP_MAX_BUCKET - OTP_SIZE_LENGTH:
pad_len = OTP_MAX_BUCKET - OTP_SIZE_LENGTH - len(plaintext)
else:
pad_len = secrets.randbelow(OTP_MAX_RANDOM_PAD + 1)

padding = secrets.token_bytes(pad_len)

plaintext_length_bytes = len(plaintext).to_bytes(OTP_SIZE_LENGTH, "big")

padded_plaintext = plaintext_length_bytes + plaintext + padding

if len(padded_plaintext) > len(key):
raise ValueError("Padded plaintext is larger than key!")

return one_time_pad(padded_plaintext, key)

def otp_decrypt_with_padding(ciphertext: bytes, key: bytes) -> bytes:
"""
Decrypts one-time pad ciphertext that contains prefixed padding length.
Decrypts one-time pad ciphertext that contains prefixed plaintext length.

Args:
ciphertext: Ciphertext bytes.
Expand All @@ -110,11 +122,15 @@ def otp_decrypt_with_padding(ciphertext: bytes, key: bytes) -> bytes:
Returns:
Original plaintext bytes without padding.
"""
plaintext_with_padding = one_time_pad(ciphertext, key)
padding_length = int.from_bytes(plaintext_with_padding[:OTP_PADDING_LENGTH], "big")
if padding_length != 0:
return plaintext_with_padding[OTP_PADDING_LENGTH : -padding_length]
return plaintext_with_padding[OTP_PADDING_LENGTH:]
plaintext_with_padding, _ = one_time_pad(ciphertext, key)

plaintext_length = int.from_bytes(plaintext_with_padding[:OTP_SIZE_LENGTH], "big")

if plaintext_length <= 0:
raise ValueError(f"{plaintext_length} plaintext length, ciphertext corrupted or invalid key!")

return plaintext_with_padding[OTP_SIZE_LENGTH : OTP_SIZE_LENGTH + plaintext_length]


def one_time_pad(plaintext: bytes, key: bytes) -> bytes:
"""
Expand All @@ -131,14 +147,16 @@ def one_time_pad(plaintext: bytes, key: bytes) -> bytes:
for index, plain_byte in enumerate(plaintext):
key_byte = key[index]
otpd_plaintext += bytes([plain_byte ^ key_byte])
return otpd_plaintext

def generate_kem_keys(algorithm: str):
key = key[len(otpd_plaintext):]
return otpd_plaintext, key

def generate_kem_keys(algorithm: str) -> Tuple[bytes, bytes]:
"""
Generates a KEM keypair.

Args:
algorithm: PQ KEM algorithm (default Kyber1024).
algorithm: PQ KEM algorithm.

Returns:
(private_key, public_key) as bytes.
Expand All @@ -148,23 +166,46 @@ def generate_kem_keys(algorithm: str):
private_key = kem.export_secret_key()
return private_key, public_key

def encap_shared_secret(public_key: bytes, algorithm: str):
def encap_shared_secret(public_key: bytes, algorithm: str) -> Tuple[bytes, bytes]:
"""
Derive a KEM shared secret from a public key.

Args:
public_key: KEM public key.
algorithm: KEM algorithm NIST name.

Returns:
(KEM ciphertext, shared secret) as bytes.
"""

with oqs.KeyEncapsulation(algorithm) as kem:
return kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])

def decap_shared_secret(ciphertext: bytes, private_key: bytes, algorithm: str):
def decap_shared_secret(ciphertext: bytes, private_key: bytes, algorithm: str) -> bytes:
"""
Decrypts a single KEM ciphertext to derive a shared secret.

Args:
ciphertext: KEM ciphertext.
private_key: KEM private key.
algorithm: KEM algorithm NIST name.
size: Desired shared_secret size in bytes.

Returns:
Shared secret of size as bytes.
"""
with oqs.KeyEncapsulation(algorithm, secret_key = private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
return kem.decap_secret(ciphertext[:ALGOS_BUFFER_LIMITS[algorithm]["CT_LEN"]])

def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, size: int = OTP_PAD_SIZE):
"""
Decrypts concatenated KEM ciphertexts to derive shared one-time pad.
Decrypts concatenated KEM ciphertexts to derive shared secret.

Args:
ciphertext_blob: Concatenated Kyber ciphertexts.
ciphertext_blob: Concatenated KEM ciphertexts.
private_key: KEM private key.
algorithm: KEM algorithm NIST name.
otp_pad_size: Desired OTP pad size in bytes.
size: Desired OTP pad size in bytes.

Returns:
Shared secret OTP pad bytes.
Expand All @@ -174,7 +215,7 @@ def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm
cursor = 0

with oqs.KeyEncapsulation(algorithm, secret_key=private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
while len(shared_secrets) < otp_pad_size:
while len(shared_secrets) < size:
ciphertext = ciphertext_blob[cursor:cursor + cipher_size]
if len(ciphertext) != cipher_size:
raise ValueError(f"Ciphertext of {algorithm} blob is malformed or incomplete ({len(ciphertext)})")
Expand All @@ -185,28 +226,28 @@ def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm

return shared_secrets #[:otp_pad_size]

def generate_shared_secrets(public_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
def generate_shared_secrets(public_key: bytes, algorithm: str = None, size: int = OTP_PAD_SIZE) -> Tuple[bytes, bytes]:
"""
Generates a one-time pad via `algorithm` encapsulation.
Generates many shared secrets via `algorithm` encapsulation in chunks.

Args:
public_key: Recipient's public key.
public_key: Recipient's KEM public key.
algorithm: KEM algorithm NIST name.
otp_pad_size: Desired OTP pad size in bytes.
size: Desired shared secrets size in bytes.

Returns:
(ciphertexts_blob, shared_secrets) for transport & encryption.
(ciphertexts_blob, shared_secrets) as bytes.
"""
shared_secrets = b''
ciphertexts_blob = b''

with oqs.KeyEncapsulation(algorithm) as kem:
while len(shared_secrets) < otp_pad_size:
while len(shared_secrets) < size:
ciphertext, shared_secret = kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])
ciphertexts_blob += ciphertext
shared_secrets += shared_secret

return ciphertexts_blob, shared_secrets[:otp_pad_size]
return ciphertexts_blob, shared_secrets # [:otp_pad_size]

def random_number_range(a: int, b: int) -> int:
"""
Expand Down
6 changes: 4 additions & 2 deletions core/trad_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ def derive_key_argon2id(password: bytes, salt: bytes = None, output_length: int
), salt


def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, counter: int = None, counter_safety: int = 2 ** 32) -> tuple[bytes, bytes]:
def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, nonce: bytes = None, counter: int = None, counter_safety: int = 2 ** 32) -> tuple[bytes, bytes]:
"""
Encrypt plaintext using ChaCha20Poly1305.

Expand All @@ -83,7 +83,9 @@ def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, counter: int = None,
- nonce: The randomly generated AES-GCM nonce.
- ciphertext: The encrypted data including the authentication tag.
"""
nonce = secrets.token_bytes(XCHACHA20POLY1305_NONCE_LEN)
if nonce is None:
nonce = sha3_512(secrets.token_bytes(XCHACHA20POLY1305_NONCE_LEN))[:XCHACHA20POLY1305_NONCE_LEN]

if counter is not None:
if counter > counter_safety:
raise ValueError("ChaCha counter has overflowen")
Expand Down
4 changes: 4 additions & 0 deletions logic/contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ def save_contact(user_data: dict, user_data_lock, contact_id: str) -> None:

}
},
"our_strand_key": None,
"our_next_strand_nonce": None,
"contact_next_strand_key": None,
"contact_strand_nonce": None,
"our_pads": {
"hash_chain": None,
"pads": None
Expand Down
Loading