-
Notifications
You must be signed in to change notification settings - Fork 2
Expand file tree
/
Copy pathEngine.py
More file actions
76 lines (67 loc) · 2.88 KB
/
Engine.py
File metadata and controls
76 lines (67 loc) · 2.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
'''
Acts as a facde between Stratergy and user
Loads the stratergy required dynamically and executes the request o/p
Motiviation:
- all encyption method have series of I/O operation that must be performed eg. reading file
- I/O ops may be platform dependant eg, different for CLI, server engine class lets you provide those platform sepcific ops and does the rest
'''
#TODO: prevent against timing attacks
import inspect
from warnings import warn
from Stratergies.Impl import ArnoldCat,OneTimePad,AES,Stegano
class Engine():
@staticmethod
def stratergies():
return {
'ArnoldCat':ArnoldCat.ArnoldCatStratergy,
'OneTimePad':OneTimePad.OneTimePadStratergy,
'AES':AES.AESStratergy,
'Stegano':Stegano.SteganoStratergy,
}
def setImgRead(self,imgRead:'function') -> 'Engine':
self.imgRead = imgRead
return self
def setImgWrite(self,imgWrite:'function') -> 'Engine':
self.imgWrite = imgWrite
return self
def setLogger(self,logger:'function') ->'Engine':
self.logger = logger
return self
def loadStratergy(self,stratergy:str) -> 'Engine':
if not(all((self.imgRead,self.imgWrite,self.logger))):
raise RuntimeError('Please add imgRead imgWrite, logger first')
if stratergy not in self.stratergies():
raise RuntimeError('Stratergy not found')
self.stratergy = self.stratergies()[stratergy](imgRead = self.imgRead,imgWrite=self.imgWrite,logger = self.logger)
# self.stratergy.setImgRead(self.imgRead)
# self.stratergy.setImgWrite(self.imgWrite)
# self.stratergy.setLogger(self.logger)
return self
def exe(self,operation:str,**kwargs):
supportedOps = ('encrypt','decrypt')
if operation not in supportedOps:
raise RuntimeError(f'{operation} not support operation must be any one of : {supportedOps} ')
if operation == 'encrypt':
self.encrypt(**kwargs)
else:
self.decrypt(**kwargs)
def __getFunReq(self,fn:'function',**kwargs) -> dict:
fullArgSpec = inspect.getfullargspec(fn)
args = fullArgSpec.args
args += fullArgSpec.kwonlyargs
args.remove('self')
reqArgs = {}
for arg in args:
if arg not in kwargs:
raise KeyError(f'key {arg} is required')
reqArgs[arg] = kwargs[arg]
if len(reqArgs) < len(kwargs):
usedArgs = list(reqArgs.keys())
warn(f'Extra parameters were supplied they are ignored arguments consider are {usedArgs}')
return reqArgs
def decrypt(self,**kwargs):
reqArgs = self.__getFunReq(self.stratergy.decrypt,**kwargs)
self.stratergy.decrypt(**reqArgs)
def encrypt(self,**kwargs):
reqArgs = self.__getFunReq(self.stratergy.encrypt,**kwargs)
self.stratergy.encrypt(**reqArgs)