Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# crypto parameters (bytes)
CHALLENGE_LEN = 11264

AES_GCM_NONCE_LEN = 12
XCHACHA20POLY1305_NONCE_LEN = 24

OTP_PAD_SIZE = 11264
OTP_PADDING_LENGTH = 2
Expand Down Expand Up @@ -71,5 +71,5 @@
ARGON2_MEMORY = 256 * 1024 # MB
ARGON2_ITERS = 3
ARGON2_OUTPUT_LEN = 32 # bytes
ARGON2_SALT_LEN = 32 # bytes
ARGON2_SALT_LEN = 16 # bytes (Must be always 16 for interoperability with libsodium.)
ARGON2_LANES = 4
8 changes: 8 additions & 0 deletions core/crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ def generate_kem_keys(algorithm: str):
private_key = kem.export_secret_key()
return private_key, public_key

def encap_shared_secret(public_key: bytes, algorithm: str):
with oqs.KeyEncapsulation(algorithm) as kem:
return kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])

def decap_shared_secret(ciphertext: bytes, private_key: bytes, algorithm: str):
with oqs.KeyEncapsulation(algorithm, secret_key = private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
return kem.decap_secret(ciphertext[:ALGOS_BUFFER_LIMITS[algorithm]["CT_LEN"]])

def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
"""
Decrypts concatenated KEM ciphertexts to derive shared one-time pad.
Expand Down
54 changes: 29 additions & 25 deletions core/trad_crypto.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,10 @@
These functions rely on the cryptography library and are intended for use within Coldwire's higher-level protocol logic.
"""

from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.argon2 import Argon2id
from nacl import pwhash, bindings
from core.constants import (
OTP_PAD_SIZE,
AES_GCM_NONCE_LEN,
XCHACHA20POLY1305_NONCE_LEN,
ARGON2_ITERS,
ARGON2_MEMORY,
ARGON2_LANES,
Expand All @@ -39,7 +38,7 @@ def sha3_512(data: bytes) -> bytes:
return h.digest()


def derive_key_argon2id(password: bytes, salt: bytes = None, salt_length: int = ARGON2_SALT_LEN, output_length: int = ARGON2_OUTPUT_LEN) -> tuple[bytes, bytes]:
def derive_key_argon2id(password: bytes, salt: bytes = None, output_length: int = ARGON2_OUTPUT_LEN) -> tuple[bytes, bytes]:
"""
Derive a symmetric key from a password using Argon2id.

Expand All @@ -57,55 +56,60 @@ def derive_key_argon2id(password: bytes, salt: bytes = None, salt_length: int =
- salt: The salt used for derivation.
"""
if salt is None:
salt = secrets.token_bytes(salt_length)
salt = secrets.token_bytes(ARGON2_SALT_LEN)

kdf = Argon2id(
salt=salt,
iterations=ARGON2_ITERS,
memory_cost=ARGON2_MEMORY,
length=output_length,
lanes=ARGON2_LANES
)
derived_key = kdf.derive(password)
return derived_key, salt
return pwhash.argon2id.kdf(
output_length,
password,
salt,
opslimit = ARGON2_ITERS,
memlimit = ARGON2_MEMORY
), salt


def encrypt_aes_gcm(key: bytes, plaintext: bytes) -> tuple[bytes, bytes]:
def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, counter: int = None, counter_safety: int = 2 ** 32) -> tuple[bytes, bytes]:
"""
Encrypt plaintext using AES-256 in GCM mode.
Encrypt plaintext using ChaCha20Poly1305.

A random nonce is generated for each encryption.

Args:
key: A 32-byte AES key.
key: A 32-byte ChaCha20Poly1305 key.
plaintext: Data to encrypt.
counter: an (optional) number to add to nonce

Returns:
A tuple (nonce, ciphertext) where:
- nonce: The randomly generated AES-GCM nonce.
- ciphertext: The encrypted data including the authentication tag.
"""
nonce = secrets.token_bytes(AES_GCM_NONCE_LEN)
aes_gcm = AESGCM(key)
ciphertext = aes_gcm.encrypt(nonce, plaintext, None)
nonce = secrets.token_bytes(XCHACHA20POLY1305_NONCE_LEN)
if counter is not None:
if counter > counter_safety:
raise ValueError("ChaCha counter has overflowen")

nonce = nonce[:XCHACHA20POLY1305_NONCE_LEN - 4] + counter.to_bytes(4, "big")

ciphertext = bindings.crypto_aead_xchacha20poly1305_ietf_encrypt(plaintext, None, nonce, key)

return nonce, ciphertext


def decrypt_aes_gcm(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
def decrypt_xchacha20poly1305(key: bytes, nonce: bytes, ciphertext: bytes) -> bytes:
"""
Decrypt ciphertext using AES-256 in GCM mode.
Decrypt ciphertext using ChaCha20Poly1305.

Raises an exception if authentication fails.

Args:
key: The 32-byte AES key used for encryption.
key: The 32-byte ChaCha20Poly1305 key used for encryption.
nonce: The nonce used during encryption.
ciphertext: The encrypted data including the authentication tag.

Returns:
The decrypted plaintext bytes.
"""
aes_gcm = AESGCM(key)
return aes_gcm.decrypt(nonce, ciphertext, None)

return bindings.crypto_aead_xchacha20poly1305_ietf_decrypt(ciphertext, None, nonce, key)


5 changes: 1 addition & 4 deletions logic/background_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,7 @@ def background_worker(user_data, user_data_lock, ui_queue, stop_flag):
# logger.debug("Data received: %s", json.dumps(response, indent = 2)[:2000])

for message in response["messages"]:
try:
logger.debug("Received data message: %s", json.dumps(message, indent = 2)[:5000])
except:
print("################# ", message)
logger.debug("Received data message: %s", json.dumps(message, indent = 2)[:5000])

# Sanity check universal message fields
if (not "sender" in message) or (not message["sender"].isdigit()) or (len(message["sender"]) != 16):
Expand Down
1 change: 1 addition & 0 deletions logic/contacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def save_contact(user_data: dict, user_data_lock, contact_id: str) -> None:
"contact_nonce": None,
"smp_step": None,
"tmp_proof": None,
"tmp_key": None,
"contact_kem_public_key": None,
"our_kem_keys": {
"private_key": None,
Expand Down
Loading