-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path4-santini.py
More file actions
500 lines (455 loc) · 14.5 KB
/
4-santini.py
File metadata and controls
500 lines (455 loc) · 14.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
#!/usr/bin/python3
from Crypto.Signature import eddsa
from Crypto.PublicKey import ECC
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from base64 import b64encode, b64decode
from base64 import b64encode
from getpass import getpass
from os.path import isfile
import json
class DSSErrorr(Exception):
'''Error executing DSS script'''
class ReadProcessingError(DSSErrorr):
'''Error preprocessing data read from file'''
class HybEncError(Exception):
'''Error executing Hybrid Encryption script'''
#
#* INPUT/OUTPUT functions
#
def read_file(subject, error, default='', process=lambda data: data):
'''
funtion that reads files
parameters:
- subject: what the file should contain
- error: error message to show when aborting
- default: name of file to open if not specified
- process: function to call on data,
reading is not considered complete unless
this function is called successfully.
Should raise ReadProcessingError on errors
returns data read (and processed) and name of file read
'''
# prepare string to print, including default choice
prompt = 'Insert path to ' + subject + ' file'
if default != '':
prompt += ' (' + default + ')'
prompt += ':\n'
# try until file is correctly read or user aborts
while True:
# read choice, use default if empty
in_filename = input(prompt)
if in_filename == '':
in_filename = default
# read and process data
try:
with open(in_filename, 'rb') as in_file:
data = in_file.read()
return process(data), in_filename
except (IOError, ReadProcessingError) as e:
print('Error while reading '+subject+':\n'+str(e))
# let user abort reading file
c = input('q to quit, anything else to try again: ')
if c.lower() == 'q':
# abort
raise DSSErrorr(error)
def write_file(data, subject, error, default=''):
'''
function to write on file
parameters:
- data: what to write to file
- subject: description of what the file will contain
- error: error message to show when aborting
- default: name of file to open if not specified
returns name of file written
'''
# try until file is correctly written or user aborts
while True:
# prepare string to print, including default choice
prompt = 'Insert path to file where to save ' + subject
if default != '':
prompt += ' (' + default + ')'
prompt += ':\n'
# read choice, use default if empty
out_filename = input(prompt)
if out_filename == '':
out_filename = default
try:
# warn before overwriting
if isfile(out_filename):
prompt = 'File exists, overwrite? '
prompt += '(n to cancel, anything else to continue)\n'
overwrite = input(prompt)
if overwrite.lower() == 'n':
continue
# write data
with open(out_filename, 'wb') as out_file:
out_file.write(data)
return out_filename
except IOError as e:
print('Error while saving '+subject+': '+str(e))
# let user abort writing file
c = input('q to quit, anything else to try again: ')
if c.lower() == 'q':
# abort
raise DSSErrorr(error)
#
#* VALIDATION FUNCTIONS
#
def check_len(data, min_len):
'''
function that validates a file's minimum length
parameters:
data: byte string to check
min_len: minimum length in bytes the file must have
'''
if len(data) >= min_len:
return data
else:
message = 'Error: the file must be at least '
message += str(min_len) + ' bytes long.'
raise ReadProcessingError(message)
def import_key(data, is_private):
'''
function that imports and validates an ECC key
parameters:
- data: byte string to check and import
- private: boolean that tells if the key should be a private key
- is_RSA: boolean that tells if the key should be a RSA key or ECC
'''
passphrase = None
if is_private:
# aquire passphrase
prompt = "Insert password to unlock the private key:"
passphrase = getpass(prompt)
# import key
try:
key = RSA.import_key(data, passphrase=passphrase)
except ValueError as e:
# error message
message = 'Error while importing the key: ' + str(e)
if is_private:
message += '\nPlease check that the password is correct.'
raise ReadProcessingError(message)
# check size
if key.size_in_bytes() < 256:
message = f'Error: RSA size insufficient, '
message += f'should be at least 256 bytes.'
raise ReadProcessingError(message)
# check type
if is_private and (not key.has_private()):
raise ReadProcessingError('Error: this is not a private key!')
return key
#
#* SUPPORT FUNCTIONS
#
def get_passphrase():
'''
function that acquires a non-empty passphrase
for private key protection
'''
prompt = "Insert password for the private key:"
while True:
pw = getpass(prompt)
if pw != '':
return pw
else:
prompt = "please enter a non-empty password:"
def read_key(is_private):
'''
function that imports a key from file
parameters:
- private: boolean that tells if the key is private
returns the imported key
'''
# prepare settings
settings = {
'error': 'Key import aborted.',
'process': lambda data: import_key(data, is_private)
}
if is_private:
settings['subject'] = 'private key'
settings['default'] = 'RSA_sk.pem'
else:
settings['subject'] = 'public key'
settings['default'] = 'RSA_pk.pem'
key, _ = read_file(**settings)
return key
#
#* GENERATE KEYS
#
def gen_keys():
# generate key pair
key = RSA.generate(2048)
print('Keys generated!')
# export private key
# acquire passphrase
passphrase = get_passphrase()
# define export settings
export_settings = {
'format': 'PEM',
'pkcs': 8,
'passphrase': passphrase,
'protection': 'scryptAndAES128-CBC'
}
# export
private_key = key.export_key(**export_settings)
# save on file
settings = {
# PEM is a textual format, so we encode it as raw bytes
'data': private_key,
'subject': 'private key',
'error': 'Output aborted.',
'default': 'RSA_sk.pem'
}
out_file = write_file(**settings)
print('Private key correctly written in "' + out_file + '"')
# export public key
public_key = key.public_key().export_key(format='PEM')
# save on file
settings = {
'data': public_key,
'subject': 'public key',
'default': 'RSA_pk.pem'
}
# complete export settings and write file
name = settings['subject'].capitalize()
settings['error'] = name + ' not saved: aborted.'
out_file = write_file(**settings)
print(name + ' correctly written in "' + out_file + '"')
#
#* SIGNATURE
#
def get_sig(msg, pr_key, encode=False):
'''
function that computes a signature
parameters:
- msg: byte string to sign
- pr_key: imported private key
- encode: boolean that determines output type:
- True: b64-utf8 encoded string
- False: bytes (default)
returns the signature
'''
# initialise signing
signer = eddsa.new(pr_key, 'rfc8032')
# sign
sig = signer.sign(msg)
# encode and return signature
if encode:
sig = b64encode(sig).decode('utf-8')
return sig
def gen_cert():
'''
function that generates a certificate with the provided public key and name
it leaves the sig field empty as per requested
'''
pk = read_key(False).exportKey('PEM').decode('utf8')
name = input('Certificate id: ')
settings = {
'data': json.dumps({'id': name, 'pubk': pk,
'sig': ''}).encode('utf8'),
'subject': 'certificate',
'error': 'Output aborted',
'default': name+'.cert'
}
out_file = write_file(**settings)
print('Certificate correctly generated: "' + out_file + '"')
#
#* VERIFY
#
def ver_sig(msg, sig, pub_key):
'''
function that verifies a signature
parameters:
- msg: byte string to verify
- sig: byte string containing the signature to be checked
- pub_key: imported public key
raises an exception if the signature does not verify
against msg and pub_key
'''
# initialise verifying
verifier = eddsa.new(pub_key, 'rfc8032')
# verify
try:
verifier.verify(msg, sig)
except ValueError:
raise ValueError('Invalid signature!')
#
#* ENCRYPTION
#
def encrypt():
'''
function that performs encryption
'''
# read ceritifcate to import
info = verify_cert(True)
# read file to encrypt, no validation
settings = {
'subject': 'data to encrypt',
'error': 'Plaintext reading aborted.'
}
p_data, in_file = read_file(**settings)
# file encryptionimport_key
aes_key = get_random_bytes(16)
aes_cipher = AES.new(aes_key, AES.MODE_OCB)
ciphertext, tag = aes_cipher.encrypt_and_digest(p_data)
# key encryption
# errors not captured because of previous checks on pk
rsa_cipher = PKCS1_OAEP.new(import_key(info, False))
enc_key = rsa_cipher.encrypt(aes_key)
# output
settings = {
'data': enc_key + aes_cipher.nonce + tag + ciphertext,
'subject': 'ciphertext',
'error': 'Output aborted.',
'default': in_file + '.enc'
}
out_file = write_file(**settings)
print('Ciphertext correctly written in "' + out_file + '"')
def decrypt():
'''
function that performs decryption
'''
# read private key to use
settings = {
'subject': 'private key',
'error': 'Key import aborted.',
'default': 'sk.pem',
'process': lambda data: import_key(data, True)
}
rsa_sk, _ = read_file(**settings)
# read file to decrypt, validating length
rsa_size = rsa_sk.size_in_bytes()
min_c_len = rsa_size + 15 + 16
settings = {
'subject': 'data to decrypt',
'error': 'Ciphertext reading aborted.',
'process': lambda data: check_len(data, min_c_len)
}
c_data, in_file = read_file(**settings)
# decomposition
enc_key = c_data[: rsa_size]
nonce = c_data[rsa_size: rsa_size + 15]
tag = c_data[rsa_size + 15: min_c_len]
ciphertext = c_data[min_c_len:]
# key decryption
# some errors are not captured because of previous checks on sk
rsa_cipher = PKCS1_OAEP.new(rsa_sk)
try:
aes_key = rsa_cipher.decrypt(enc_key)
except ValueError:
raise HybEncError('Decryption error: please check private key')
# ciphertext decryption
aes_cipher = AES.new(aes_key, AES.MODE_OCB, nonce)
try:
p_data = aes_cipher.decrypt_and_verify(ciphertext, tag)
except ValueError:
raise HybEncError('Decryption error: authentication failure')
# output
# try to deduce original filename
if in_file[-4:] == '.enc':
default = in_file[:-4]
else:
default = ''
# write output
settings = {
'data': p_data,
'subject': 'decrypted data',
'error': 'Output aborted.',
'default': default
}
out_file = write_file(**settings)
print('Decrypted data correctly written in "' + out_file + '"')
#
#* CERTIFICATE
#
def import_cert(data):
'''
function that imports and validates a certificate
parameters:
- data: byte string to check and import
'''
error_msg = 'Certificate format not valid: '
try:
# decode as string and import as json
cert = json.loads(data)
# get values to sign
info = [cert['id'], cert['pubk']]
if 'sig' in cert:
info += [b64decode(cert['sig'])]
except ValueError:
error_msg += 'encoding error.'
raise ReadProcessingError(error_msg)
except TypeError:
error_msg += 'invalid data.'
raise ReadProcessingError(error_msg)
except KeyError as e:
# certificate does not have 'id' or 'pubk' fields
error_msg += str(e) + ' field not found.'
raise ReadProcessingError(error_msg)
return info
def cert_sig_enc(info):
'''function that prepares certificate data for signing'''
return info[0].encode('utf-8') + info[1].encode('utf-8')
def verify_cert(get_pubk=False):
'''function that verifies a certificate'''
# public key to use
pk = ECC.import_key("-----BEGIN PUBLIC KEY-----\nMCowBQYDK2VwAyEA6nUBRM22wgeqVm/GkPlimdbsjaofC4Sk4eQ4ebhEjTs=\n-----END PUBLIC KEY-----")
# read certificate to verify
settings = {
'subject': 'certificate to verify',
'error': 'Verification aborted.',
'process': import_cert
}
done = False
while not done:
info, _ = read_file(**settings)
if len(info) < 3:
# 'sig' field is missing
print('The Certificate is not signed!')
return
# verify signature of certificate against public key
try:
ver_sig(cert_sig_enc(info), info[2], pk)
print('OK: the certificate is valid.')
done = True
except ValueError:
print('The certificate is not valid!')
if get_pubk:
return info[1]
return
#* ----
#* MAIN
#* ----
print("\nAuthenticated encryption with certificates")
menu_text = '''
-----------------------------
Enter:
1 -> generate and save keys
2 -> generate a certificate
3 -> encrypt a file
4 -> decrypt a file
0 -> quit
-----------------------------
> '''
while True:
# get user's choice and call appropriate function
# errors are captured and printed out
# invalid choices are ignored
choice = input(menu_text)
try:
if choice == '1':
gen_keys()
elif choice == '2':
gen_cert()
elif choice == '3':
encrypt()
elif choice == '4':
decrypt()
elif choice == '0':
exit()
except DSSErrorr as e:
print(e)