Skip to content

Commit e67138e

Browse files
committed
tests for key transport
1 parent 9b8fb5f commit e67138e

File tree

2 files changed

+982
-0
lines changed

2 files changed

+982
-0
lines changed
Lines changed: 370 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,370 @@
1+
_pkcs11lib = "/usr/lib/softhsm/libsofthsm2.so"
2+
3+
4+
class TestKeyAgreementECDH:
5+
6+
def test_ECDH_cbc_decrypt(self):
7+
from cryptography.hazmat.primitives import (
8+
hashes,
9+
keywrap,
10+
serialization,
11+
)
12+
from cryptography.hazmat.primitives.asymmetric.ec import (
13+
SECP384R1,
14+
)
15+
from cryptography.hazmat.backends import default_backend
16+
17+
from pkcs11_cryptography_keys import (
18+
KeyTypes,
19+
PKCS11AdminSession,
20+
PKCS11UnwrapNDecryptSession,
21+
PKCS11KeyUsageAll,
22+
list_token_labels,
23+
get_AES_algorithm_properties,
24+
ECDH_ephemeral,
25+
)
26+
27+
data = b"This is a test if it commes over, but I think we do a good job, and next time we will do it like it is nothing. Let us try to build a simple test and then run with it."
28+
other_info_for_kdf = b"this is shared info"
29+
30+
encryption_algorithm = "aes256_cbc"
31+
key_wrap_algorithm = "aes256_wrap"
32+
kdf_hash = hashes.SHA256()
33+
34+
sym_algo_props = get_AES_algorithm_properties(encryption_algorithm)
35+
if sym_algo_props is None:
36+
assert False, "algorithm not Known"
37+
else:
38+
padded_payload = sym_algo_props.pre_encryption(data)
39+
content_encryption_key = sym_algo_props.generate_key()
40+
iv = sym_algo_props.get_nonce()
41+
encryptor = sym_algo_props.get_sw_encryptor(content_encryption_key)
42+
ciphertext = encryptor.update(padded_payload) + encryptor.finalize()
43+
encrypted_content = ciphertext
44+
45+
kek_gen = ECDH_ephemeral.create_for_wrap(
46+
key_wrap_algorithm, kdf_hash
47+
)
48+
49+
# Generate a private key for use in the exchange.
50+
for label in list_token_labels(_pkcs11lib):
51+
pub_key_obj_1 = None
52+
create_session_1 = PKCS11AdminSession(
53+
label, "1234", True, "ec_token_1", b"254", _pkcs11lib
54+
)
55+
with create_session_1 as current_admin:
56+
keydef = PKCS11KeyUsageAll()
57+
ec_private_key_1 = current_admin.create_key_pair(
58+
keydef, key_type=KeyTypes.EC, EC_curve=SECP384R1()
59+
)
60+
assert ec_private_key_1 is not None
61+
pub_key_1 = ec_private_key_1.public_key()
62+
pub_key_obj_1 = pub_key_1.public_numbers().public_key()
63+
64+
derived_key, peer_pub_key = kek_gen.derive_key_concat_kdf(
65+
pub_key_obj_1, other_info_for_kdf
66+
)
67+
wrapped_content_encryption_key = keywrap.aes_key_wrap(
68+
derived_key, content_encryption_key, default_backend()
69+
)
70+
peer_pub_key_info = peer_pub_key.public_bytes(
71+
serialization.Encoding.DER,
72+
serialization.PublicFormat.SubjectPublicKeyInfo,
73+
)
74+
75+
if (
76+
peer_pub_key_info is not None
77+
and wrapped_content_encryption_key is not None
78+
):
79+
private_1_ses = PKCS11UnwrapNDecryptSession(
80+
label, "1234", "ec_token_1", pksc11_lib=_pkcs11lib
81+
)
82+
with private_1_ses as curr_key:
83+
decrypted_message = curr_key.unwrap_and_decrypt(
84+
encrypted_content,
85+
wrapped_content_encryption_key,
86+
encryption_algorithm,
87+
iv,
88+
public_key_info=peer_pub_key_info,
89+
kdf_hash=kdf_hash,
90+
wrap_algorithm=key_wrap_algorithm,
91+
other_info_bytes=other_info_for_kdf,
92+
kdf_on_card=False,
93+
)
94+
assert decrypted_message == data
95+
else:
96+
assert (
97+
False
98+
), "Encryption did not provide enough information"
99+
100+
with create_session_1 as current_admin:
101+
r = current_admin.delete_key_pair()
102+
assert r
103+
104+
def test_ECDH_gcm_decrypt(self):
105+
from cryptography.hazmat.primitives import (
106+
hashes,
107+
keywrap,
108+
serialization,
109+
)
110+
from cryptography.hazmat.primitives.asymmetric.ec import (
111+
SECP384R1,
112+
)
113+
from cryptography.hazmat.backends import default_backend
114+
115+
from pkcs11_cryptography_keys import (
116+
KeyTypes,
117+
PKCS11AdminSession,
118+
PKCS11UnwrapNDecryptSession,
119+
PKCS11KeyUsageAll,
120+
list_token_labels,
121+
get_AES_algorithm_properties,
122+
ECDH_ephemeral,
123+
)
124+
125+
_pkcs11lib = "/usr/lib/softhsm/libsofthsm2.so"
126+
data = b"This is not OK if it commes over, but I think we do a good job, and next time we will do it like it is nothing. Let us try to build a simple test and then run with it."
127+
other_info_for_kdf = b"this is shared info"
128+
aad = b"additional data"
129+
130+
encryption_algorithm = "aes256_gcm"
131+
key_wrap_algorithm = "aes256_wrap"
132+
kdf_hash = hashes.SHA256()
133+
134+
sym_algo_props = get_AES_algorithm_properties(encryption_algorithm)
135+
if sym_algo_props is None:
136+
assert False, "algorithm not Known"
137+
else:
138+
padded_payload = sym_algo_props.pre_encryption(data)
139+
content_encryption_key = sym_algo_props.generate_key()
140+
iv = sym_algo_props.get_nonce()
141+
encryptor = sym_algo_props.get_sw_encryptor(
142+
content_encryption_key, aad=aad
143+
)
144+
ciphertext = encryptor.update(padded_payload) + encryptor.finalize()
145+
encrypted_content = ciphertext
146+
received_tag = encryptor.tag
147+
148+
kek_gen = ECDH_ephemeral.create_for_wrap(
149+
key_wrap_algorithm, kdf_hash
150+
)
151+
152+
# Generate a private key for use in the exchange.
153+
for label in list_token_labels(_pkcs11lib):
154+
pub_key_obj_1 = None
155+
create_session_1 = PKCS11AdminSession(
156+
label, "1234", True, "ec_token_1", b"254", _pkcs11lib
157+
)
158+
with create_session_1 as current_admin:
159+
keydef = PKCS11KeyUsageAll()
160+
ec_private_key_1 = current_admin.create_key_pair(
161+
keydef, key_type=KeyTypes.EC, EC_curve=SECP384R1()
162+
)
163+
assert ec_private_key_1 is not None
164+
pub_key_1 = ec_private_key_1.public_key()
165+
pub_key_obj_1 = pub_key_1.public_numbers().public_key()
166+
167+
derived_key, peer_pub_key = kek_gen.derive_key_concat_kdf(
168+
pub_key_obj_1, other_info_for_kdf
169+
)
170+
wrapped_content_encryption_key = keywrap.aes_key_wrap(
171+
derived_key, content_encryption_key, default_backend()
172+
)
173+
peer_pub_key_info = peer_pub_key.public_bytes(
174+
serialization.Encoding.DER,
175+
serialization.PublicFormat.SubjectPublicKeyInfo,
176+
)
177+
178+
if (
179+
peer_pub_key_info is not None
180+
and wrapped_content_encryption_key is not None
181+
):
182+
private_1_ses = PKCS11UnwrapNDecryptSession(
183+
label, "1234", "ec_token_1", pksc11_lib=_pkcs11lib
184+
)
185+
with private_1_ses as curr_key:
186+
decrypted_message = curr_key.unwrap_and_decrypt(
187+
encrypted_content,
188+
wrapped_content_encryption_key,
189+
encryption_algorithm,
190+
iv,
191+
public_key_info=peer_pub_key_info,
192+
kdf_hash=kdf_hash,
193+
wrap_algorithm=key_wrap_algorithm,
194+
other_info_bytes=other_info_for_kdf,
195+
received_tag=received_tag,
196+
aad_for_gcm=aad,
197+
kdf_on_card=False,
198+
)
199+
assert decrypted_message == data
200+
else:
201+
assert False, "encrypt did not provide enough information"
202+
203+
with create_session_1 as current_admin:
204+
r = current_admin.delete_key_pair()
205+
assert r
206+
207+
def test_ECDH_cbc_encrypt_decrypt(self):
208+
from cryptography.hazmat.primitives import hashes, serialization
209+
from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1
210+
211+
from pkcs11_cryptography_keys import (
212+
KeyTypes,
213+
PKCS11AdminSession,
214+
PKCS11KeyUsageAll,
215+
list_token_labels,
216+
PKCS11EncryptNWrapSession,
217+
PKCS11UnwrapNDecryptSession,
218+
)
219+
220+
data = b"This is a test if it commes over, but I think we do a good job, and next time we will do it like it is nothing. Let us try to build a simple test and then run with it."
221+
other_info_for_kdf = b"this is shared info"
222+
223+
encryption_algorithm = "aes256_cbc"
224+
key_wrap_algorithm = "aes256_wrap"
225+
kdf_hash = hashes.SHA256()
226+
227+
# Generate a private key for use in the exchange.
228+
for label in list_token_labels(_pkcs11lib):
229+
create_session_1 = PKCS11AdminSession(
230+
label, "1234", True, "ec_token_1", b"254", _pkcs11lib
231+
)
232+
with create_session_1 as current_admin:
233+
keydef = PKCS11KeyUsageAll()
234+
ec_private_key_1 = current_admin.create_key_pair(
235+
keydef, key_type=KeyTypes.EC, EC_curve=SECP384R1()
236+
)
237+
assert ec_private_key_1 is not None
238+
pub_key_1 = ec_private_key_1.public_key()
239+
public_key_info_der = pub_key_1.public_bytes(
240+
serialization.Encoding.DER,
241+
serialization.PublicFormat.SubjectPublicKeyInfo,
242+
)
243+
244+
ecdh_es_session = PKCS11EncryptNWrapSession(
245+
"A token", "1234", _pkcs11lib
246+
)
247+
with ecdh_es_session as ecdh_es:
248+
encrypted_content, received_tag, iv = ecdh_es.encrypt(
249+
data, encryption_algorithm
250+
)
251+
wrapped_content_encryption_key, peer_pub_key_info = (
252+
ecdh_es.wrap_key(
253+
public_key_info_der,
254+
kdf_hash=kdf_hash,
255+
wrap_algorithm=key_wrap_algorithm,
256+
other_info_bytes=other_info_for_kdf,
257+
kdf_on_card=False,
258+
)
259+
)
260+
261+
if wrapped_content_encryption_key is not None:
262+
private_1_ses = PKCS11UnwrapNDecryptSession(
263+
label, "1234", "ec_token_1", pksc11_lib=_pkcs11lib
264+
)
265+
with private_1_ses as ecdh_static:
266+
decrypted_message = ecdh_static.unwrap_and_decrypt(
267+
encrypted_content,
268+
wrapped_content_encryption_key,
269+
encryption_algorithm,
270+
iv,
271+
public_key_info=peer_pub_key_info,
272+
kdf_hash=kdf_hash,
273+
wrap_algorithm=key_wrap_algorithm,
274+
other_info_bytes=other_info_for_kdf,
275+
kdf_on_card=False,
276+
)
277+
assert decrypted_message == data
278+
else:
279+
assert False, "wrapped_content_encryption_key is None"
280+
281+
with create_session_1 as current_admin:
282+
r = current_admin.delete_key_pair()
283+
assert r
284+
285+
def test_ECDH_gcm_encrypt_decrypt(self):
286+
from cryptography.hazmat.primitives import (
287+
hashes,
288+
serialization,
289+
)
290+
from cryptography.hazmat.primitives.asymmetric.ec import SECP384R1
291+
292+
from pkcs11_cryptography_keys import (
293+
KeyTypes,
294+
PKCS11AdminSession,
295+
PKCS11KeyUsageAll,
296+
list_token_labels,
297+
PKCS11UnwrapNDecryptSession,
298+
PKCS11EncryptNWrapSession,
299+
)
300+
301+
data = b"This is a test if it commes over, but I think we do a good job, and next time we will do it like it is nothing. Let us try to build a simple test and then run with it."
302+
aad = b"additional data"
303+
other_info_for_kdf = b"this is shared info"
304+
305+
encryption_algorithm = "aes256_gcm"
306+
key_wrap_algorithm = "aes256_wrap"
307+
kdf_hash = hashes.SHA256()
308+
309+
# Generate a private key for use in the exchange.
310+
for label in list_token_labels(_pkcs11lib):
311+
create_session_1 = PKCS11AdminSession(
312+
label, "1234", True, "ec_token_1", b"254", _pkcs11lib
313+
)
314+
with create_session_1 as current_admin:
315+
keydef = PKCS11KeyUsageAll()
316+
ec_private_key_1 = current_admin.create_key_pair(
317+
keydef, key_type=KeyTypes.EC, EC_curve=SECP384R1()
318+
)
319+
assert ec_private_key_1 is not None
320+
pub_key_1 = ec_private_key_1.public_key()
321+
public_key_info_der = pub_key_1.public_bytes(
322+
serialization.Encoding.DER,
323+
serialization.PublicFormat.SubjectPublicKeyInfo,
324+
)
325+
326+
ecdh_es_session = PKCS11EncryptNWrapSession(
327+
"A token", "1234", _pkcs11lib
328+
)
329+
with ecdh_es_session as ecdh_es:
330+
encrypted_content, received_tag, iv = ecdh_es.encrypt(
331+
data, encryption_algorithm, aad_for_gcm=aad
332+
)
333+
wrapped_content_encryption_key, peer_pub_key_info = (
334+
ecdh_es.wrap_key(
335+
public_key_info_der,
336+
kdf_hash=kdf_hash,
337+
wrap_algorithm=key_wrap_algorithm,
338+
other_info_bytes=other_info_for_kdf,
339+
kdf_on_card=False,
340+
)
341+
)
342+
343+
if wrapped_content_encryption_key is not None:
344+
private_1_ses = PKCS11UnwrapNDecryptSession(
345+
label,
346+
"1234",
347+
"ec_token_1",
348+
pksc11_lib=_pkcs11lib,
349+
)
350+
with private_1_ses as ecdh_static:
351+
decrypted_message = ecdh_static.unwrap_and_decrypt(
352+
encrypted_content,
353+
wrapped_content_encryption_key,
354+
encryption_algorithm,
355+
iv,
356+
public_key_info=peer_pub_key_info,
357+
kdf_hash=kdf_hash,
358+
wrap_algorithm=key_wrap_algorithm,
359+
other_info_bytes=other_info_for_kdf,
360+
received_tag=received_tag,
361+
aad_for_gcm=aad,
362+
kdf_on_card=False,
363+
)
364+
assert decrypted_message == data
365+
else:
366+
assert False, "wrapped_content_encryption_key is None"
367+
368+
with create_session_1 as current_admin:
369+
r = current_admin.delete_key_pair()
370+
assert r

0 commit comments

Comments
 (0)