Skip to content

Commit 7804c71

Browse files
authored
Merge pull request #8 from Freedom-Club-Sec/feat/strandlock-protocol
Feat/strandlock protocol
2 parents 6ca9be6 + a48ee25 commit 7804c71

File tree

9 files changed

+449
-241
lines changed

9 files changed

+449
-241
lines changed

core/constants.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,9 @@
1515
XCHACHA20POLY1305_NONCE_LEN = 24
1616

1717
OTP_PAD_SIZE = 11264
18-
OTP_PADDING_LENGTH = 2
19-
OTP_PADDING_LIMIT = 1024
18+
OTP_MAX_BUCKET = 64
19+
OTP_MAX_RANDOM_PAD = 16
20+
OTP_SIZE_LENGTH = 2
2021

2122
SMP_NONCE_LENGTH = 64
2223
SMP_PROOF_LENGTH = 64
@@ -71,5 +72,5 @@
7172
ARGON2_MEMORY = 256 * 1024 # MB
7273
ARGON2_ITERS = 3
7374
ARGON2_OUTPUT_LEN = 32 # bytes
74-
ARGON2_SALT_LEN = 16 # bytes (Must be always 16 for interoperability with libsodium.)
75+
ARGON2_SALT_LEN = 16 # bytes (Must be always 16 for interoperability with implementations using libsodium.)
7576
ARGON2_LANES = 4

core/crypto.py

Lines changed: 76 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414

1515
import oqs
1616
import secrets
17+
from typing import Tuple
1718
from core.constants import (
1819
OTP_PAD_SIZE,
19-
OTP_PADDING_LENGTH,
20+
OTP_MAX_RANDOM_PAD,
21+
OTP_SIZE_LENGTH,
22+
OTP_MAX_BUCKET,
2023
ML_KEM_1024_NAME,
2124
ML_KEM_1024_SK_LEN,
2225
ML_KEM_1024_PK_LEN,
@@ -59,7 +62,7 @@ def verify_signature(algorithm: str, message: bytes, signature: bytes, public_ke
5962
with oqs.Signature(algorithm) as verifier:
6063
return verifier.verify(message, signature[:ALGOS_BUFFER_LIMITS[algorithm]["SIGN_LEN"]], public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])
6164

62-
def generate_sign_keys(algorithm: str = ML_DSA_87_NAME):
65+
def generate_sign_keys(algorithm: str = ML_DSA_87_NAME) -> Tuple[bytes, bytes]:
6366
"""
6467
Generates a new post-quantum signature keypair.
6568
@@ -74,34 +77,43 @@ def generate_sign_keys(algorithm: str = ML_DSA_87_NAME):
7477
private_key = signer.export_secret_key()
7578
return private_key, public_key
7679

77-
def otp_encrypt_with_padding(plaintext: bytes, key: bytes, padding_limit: int) -> bytes:
80+
def otp_encrypt_with_padding(plaintext: bytes, key: bytes) -> Tuple[bytes, bytes]:
7881
"""
79-
Encrypts plaintext using a one-time pad with random padding.
82+
Encrypts plaintext using a one-time pad with random or bucket padding.
8083
8184
Process:
82-
- Prefixes length of padding.
83-
- Adds random padding (0..padding_limit bytes).
85+
- Prefixes length of message.
86+
- Adds random padding (0..padding_limit bytes) if message > 64 bytes
87+
- If 64 bytes > message, pad message up to 64 bytes,
8488
- XORs with one-time pad key.
8589
8690
Args:
8791
plaintext: Data to encrypt.
8892
key: OTP key (>= plaintext length + padding).
89-
padding_limit: Max padding length.
9093
9194
Returns:
9295
Ciphertext bytes.
9396
"""
94-
if padding_limit > ((2 ** (8 * OTP_PADDING_LENGTH)) - 1):
95-
raise ValueError("Padding too large")
9697

97-
plaintext_padding = secrets.token_bytes(padding_limit)
98-
padding_length_bytes = len(plaintext_padding).to_bytes(OTP_PADDING_LENGTH, "big")
99-
padded_plaintext = padding_length_bytes + plaintext + plaintext_padding
98+
if len(plaintext) <= OTP_MAX_BUCKET - OTP_SIZE_LENGTH:
99+
pad_len = OTP_MAX_BUCKET - OTP_SIZE_LENGTH - len(plaintext)
100+
else:
101+
pad_len = secrets.randbelow(OTP_MAX_RANDOM_PAD + 1)
102+
103+
padding = secrets.token_bytes(pad_len)
104+
105+
plaintext_length_bytes = len(plaintext).to_bytes(OTP_SIZE_LENGTH, "big")
106+
107+
padded_plaintext = plaintext_length_bytes + plaintext + padding
108+
109+
if len(padded_plaintext) > len(key):
110+
raise ValueError("Padded plaintext is larger than key!")
111+
100112
return one_time_pad(padded_plaintext, key)
101113

102114
def otp_decrypt_with_padding(ciphertext: bytes, key: bytes) -> bytes:
103115
"""
104-
Decrypts one-time pad ciphertext that contains prefixed padding length.
116+
Decrypts one-time pad ciphertext that contains prefixed plaintext length.
105117
106118
Args:
107119
ciphertext: Ciphertext bytes.
@@ -110,11 +122,15 @@ def otp_decrypt_with_padding(ciphertext: bytes, key: bytes) -> bytes:
110122
Returns:
111123
Original plaintext bytes without padding.
112124
"""
113-
plaintext_with_padding = one_time_pad(ciphertext, key)
114-
padding_length = int.from_bytes(plaintext_with_padding[:OTP_PADDING_LENGTH], "big")
115-
if padding_length != 0:
116-
return plaintext_with_padding[OTP_PADDING_LENGTH : -padding_length]
117-
return plaintext_with_padding[OTP_PADDING_LENGTH:]
125+
plaintext_with_padding, _ = one_time_pad(ciphertext, key)
126+
127+
plaintext_length = int.from_bytes(plaintext_with_padding[:OTP_SIZE_LENGTH], "big")
128+
129+
if plaintext_length <= 0:
130+
raise ValueError(f"{plaintext_length} plaintext length, ciphertext corrupted or invalid key!")
131+
132+
return plaintext_with_padding[OTP_SIZE_LENGTH : OTP_SIZE_LENGTH + plaintext_length]
133+
118134

119135
def one_time_pad(plaintext: bytes, key: bytes) -> bytes:
120136
"""
@@ -131,14 +147,16 @@ def one_time_pad(plaintext: bytes, key: bytes) -> bytes:
131147
for index, plain_byte in enumerate(plaintext):
132148
key_byte = key[index]
133149
otpd_plaintext += bytes([plain_byte ^ key_byte])
134-
return otpd_plaintext
135150

136-
def generate_kem_keys(algorithm: str):
151+
key = key[len(otpd_plaintext):]
152+
return otpd_plaintext, key
153+
154+
def generate_kem_keys(algorithm: str) -> Tuple[bytes, bytes]:
137155
"""
138156
Generates a KEM keypair.
139157
140158
Args:
141-
algorithm: PQ KEM algorithm (default Kyber1024).
159+
algorithm: PQ KEM algorithm.
142160
143161
Returns:
144162
(private_key, public_key) as bytes.
@@ -148,23 +166,46 @@ def generate_kem_keys(algorithm: str):
148166
private_key = kem.export_secret_key()
149167
return private_key, public_key
150168

151-
def encap_shared_secret(public_key: bytes, algorithm: str):
169+
def encap_shared_secret(public_key: bytes, algorithm: str) -> Tuple[bytes, bytes]:
170+
"""
171+
Derive a KEM shared secret from a public key.
172+
173+
Args:
174+
public_key: KEM public key.
175+
algorithm: KEM algorithm NIST name.
176+
177+
Returns:
178+
(KEM ciphertext, shared secret) as bytes.
179+
"""
180+
152181
with oqs.KeyEncapsulation(algorithm) as kem:
153182
return kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])
154183

155-
def decap_shared_secret(ciphertext: bytes, private_key: bytes, algorithm: str):
184+
def decap_shared_secret(ciphertext: bytes, private_key: bytes, algorithm: str) -> bytes:
185+
"""
186+
Decrypts a single KEM ciphertext to derive a shared secret.
187+
188+
Args:
189+
ciphertext: KEM ciphertext.
190+
private_key: KEM private key.
191+
algorithm: KEM algorithm NIST name.
192+
size: Desired shared_secret size in bytes.
193+
194+
Returns:
195+
Shared secret of size as bytes.
196+
"""
156197
with oqs.KeyEncapsulation(algorithm, secret_key = private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
157198
return kem.decap_secret(ciphertext[:ALGOS_BUFFER_LIMITS[algorithm]["CT_LEN"]])
158199

159-
def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
200+
def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm: str = None, size: int = OTP_PAD_SIZE):
160201
"""
161-
Decrypts concatenated KEM ciphertexts to derive shared one-time pad.
202+
Decrypts concatenated KEM ciphertexts to derive shared secret.
162203
163204
Args:
164-
ciphertext_blob: Concatenated Kyber ciphertexts.
205+
ciphertext_blob: Concatenated KEM ciphertexts.
165206
private_key: KEM private key.
166207
algorithm: KEM algorithm NIST name.
167-
otp_pad_size: Desired OTP pad size in bytes.
208+
size: Desired OTP pad size in bytes.
168209
169210
Returns:
170211
Shared secret OTP pad bytes.
@@ -174,7 +215,7 @@ def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm
174215
cursor = 0
175216

176217
with oqs.KeyEncapsulation(algorithm, secret_key=private_key[:ALGOS_BUFFER_LIMITS[algorithm]["SK_LEN"]]) as kem:
177-
while len(shared_secrets) < otp_pad_size:
218+
while len(shared_secrets) < size:
178219
ciphertext = ciphertext_blob[cursor:cursor + cipher_size]
179220
if len(ciphertext) != cipher_size:
180221
raise ValueError(f"Ciphertext of {algorithm} blob is malformed or incomplete ({len(ciphertext)})")
@@ -185,28 +226,28 @@ def decrypt_shared_secrets(ciphertext_blob: bytes, private_key: bytes, algorithm
185226

186227
return shared_secrets #[:otp_pad_size]
187228

188-
def generate_shared_secrets(public_key: bytes, algorithm: str = None, otp_pad_size: int = OTP_PAD_SIZE):
229+
def generate_shared_secrets(public_key: bytes, algorithm: str = None, size: int = OTP_PAD_SIZE) -> Tuple[bytes, bytes]:
189230
"""
190-
Generates a one-time pad via `algorithm` encapsulation.
231+
Generates many shared secrets via `algorithm` encapsulation in chunks.
191232
192233
Args:
193-
public_key: Recipient's public key.
234+
public_key: Recipient's KEM public key.
194235
algorithm: KEM algorithm NIST name.
195-
otp_pad_size: Desired OTP pad size in bytes.
236+
size: Desired shared secrets size in bytes.
196237
197238
Returns:
198-
(ciphertexts_blob, shared_secrets) for transport & encryption.
239+
(ciphertexts_blob, shared_secrets) as bytes.
199240
"""
200241
shared_secrets = b''
201242
ciphertexts_blob = b''
202243

203244
with oqs.KeyEncapsulation(algorithm) as kem:
204-
while len(shared_secrets) < otp_pad_size:
245+
while len(shared_secrets) < size:
205246
ciphertext, shared_secret = kem.encap_secret(public_key[:ALGOS_BUFFER_LIMITS[algorithm]["PK_LEN"]])
206247
ciphertexts_blob += ciphertext
207248
shared_secrets += shared_secret
208249

209-
return ciphertexts_blob, shared_secrets[:otp_pad_size]
250+
return ciphertexts_blob, shared_secrets # [:otp_pad_size]
210251

211252
def random_number_range(a: int, b: int) -> int:
212253
"""

core/trad_crypto.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def derive_key_argon2id(password: bytes, salt: bytes = None, output_length: int
6767
), salt
6868

6969

70-
def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, counter: int = None, counter_safety: int = 2 ** 32) -> tuple[bytes, bytes]:
70+
def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, nonce: bytes = None, counter: int = None, counter_safety: int = 2 ** 32) -> tuple[bytes, bytes]:
7171
"""
7272
Encrypt plaintext using ChaCha20Poly1305.
7373
@@ -83,7 +83,9 @@ def encrypt_xchacha20poly1305(key: bytes, plaintext: bytes, counter: int = None,
8383
- nonce: The randomly generated AES-GCM nonce.
8484
- ciphertext: The encrypted data including the authentication tag.
8585
"""
86-
nonce = secrets.token_bytes(XCHACHA20POLY1305_NONCE_LEN)
86+
if nonce is None:
87+
nonce = sha3_512(secrets.token_bytes(XCHACHA20POLY1305_NONCE_LEN))[:XCHACHA20POLY1305_NONCE_LEN]
88+
8789
if counter is not None:
8890
if counter > counter_safety:
8991
raise ValueError("ChaCha counter has overflowen")

logic/contacts.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,10 @@ def save_contact(user_data: dict, user_data_lock, contact_id: str) -> None:
8686

8787
}
8888
},
89+
"our_strand_key": None,
90+
"our_next_strand_nonce": None,
91+
"contact_next_strand_key": None,
92+
"contact_strand_nonce": None,
8993
"our_pads": {
9094
"hash_chain": None,
9195
"pads": None

0 commit comments

Comments
 (0)