1010from struct import *
1111import argparse
1212from pathlib import Path
13- from typing import Tuple
14-
15- VERSION = "1.0"
13+ from typing import Tuple , Dict
14+ from types import FunctionType
15+
16+ VERSION = "1.1.1"
17+
18+
19+ def decode_n0 (response_to_decode : bytes , head_len : int ):
20+ """
21+ It decodes the result of the command N0 an prints the meaning of the returned output
22+
23+ Parameters
24+ ___________
25+ response_to_decode: bytes
26+ The response returned by the payShield
27+ head_len: int
28+ The length of the header
29+
30+ Returns
31+ ___________
32+ nothing
33+ """
34+ print ("Message length" , int .from_bytes (response_to_decode [:2 ], byteorder = 'big' , signed = False ))
35+ str_pointer = 2
36+ print ("Header:" , response_to_decode [str_pointer :str_pointer + head_len ].decode ('ascii' , 'ignore' ))
37+ str_pointer = str_pointer + head_len
38+ print ("Command returned:" , response_to_decode [str_pointer :str_pointer + 2 ].decode ('ascii' , 'ignore' ))
39+ str_pointer = str_pointer + 2
40+ print ("Error returned:" , response_to_decode [str_pointer :str_pointer + 2 ].decode ('ascii' , 'ignore' ))
41+ if (response_to_decode [str_pointer :str_pointer + 2 ]).decode ('ascii' , 'ignore' ) == '01' :
42+ print ("Invalid Random Value Length" )
43+ elif (response_to_decode [str_pointer :str_pointer + 2 ]).decode ('ascii' , 'ignore' ) == '00' :
44+ print ("Random payload:(HEX)" , binascii .hexlify (response_to_decode [6 + head_len :]).decode ('ascii' , 'ignore' ))
45+
46+
47+ def decode_no (response_to_decode : bytes , head_len : int ):
48+ """
49+ It decodes the result of the command NO an prints the meaning of the returned output
50+
51+ Parameters
52+ ___________
53+ response_to_decode: bytes
54+ The response returned by the payShield
55+ head_len: int
56+ The length of the header
57+
58+ Returns
59+ ___________
60+ nothing
61+ """
62+ BUFFER_SIZE : Dict [str , str ] = {
63+ '0' : '2K bytes' , '1' : '8K bytes' , '2' : '16K bytes' , '3' : '32K bytes' }
64+ print ("Message length" , int .from_bytes (response_to_decode [:2 ], byteorder = 'big' , signed = False ))
65+ response_to_decode = response_to_decode .decode ('ascii' , 'replace' )
66+ str_pointer : int = 2
67+ print ("Header:" , response_to_decode [str_pointer :str_pointer + head_len ])
68+ str_pointer = str_pointer + head_len
69+ print ("Command returned:" , response_to_decode [str_pointer :str_pointer + 2 ])
70+ str_pointer = str_pointer + 2
71+ print ("Error returned:" , response_to_decode [str_pointer :str_pointer + 2 ])
72+ if response_to_decode [str_pointer :str_pointer + 2 ] == '00' :
73+ str_pointer = str_pointer + 2
74+ print ("I/O buffer size:" , BUFFER_SIZE .get (response_to_decode [str_pointer :str_pointer + 1 ], "Unknown" ))
75+ str_pointer = str_pointer + 1
76+ if response_to_decode [str_pointer :str_pointer + 1 ] == '0' :
77+ print ("Type of connection: UDP" )
78+ elif response_to_decode [str_pointer :str_pointer + 1 ] == '1' :
79+ print ("Type of connection: TCP" )
80+ else :
81+ print ("Unexpected connection type" )
82+ str_pointer = str_pointer + 1
83+ print ("Number of TCP sockets:" , response_to_decode [str_pointer :str_pointer + 2 ])
84+ str_pointer = str_pointer + 2
85+ print ("Firmware number:" , response_to_decode [str_pointer :str_pointer + 9 ])
86+ str_pointer = str_pointer + 9
87+ print ("Reserved:" , response_to_decode [str_pointer :str_pointer + 1 ])
88+ str_pointer = str_pointer + 1
89+ print ("Reserved:" , response_to_decode [str_pointer :str_pointer + 4 ])
90+
91+
92+ def decode_nc (response_to_decode : bytes , head_len : int ):
93+ """
94+ It decodes the result of the command NC an prints the meaning of the returned output
95+ The message trailer is not considered
96+
97+ Parameters
98+ ___________
99+ response_to_decode: bytes
100+ The response returned by the payShield
101+ head_len: int
102+ The length of the header
103+
104+ Returns
105+ ___________
106+ nothing
107+ """
108+ print ("Message length" , int .from_bytes (response_to_decode [:2 ], byteorder = 'big' , signed = False ))
109+ response_to_decode = response_to_decode .decode ('ascii' , 'replace' )
110+ str_pointer : int = 2
111+ print ("Header:" , response_to_decode [str_pointer :str_pointer + head_len ])
112+ str_pointer = str_pointer + head_len
113+ print ("Command returned:" , response_to_decode [str_pointer :str_pointer + 2 ])
114+ str_pointer = str_pointer + 2
115+ print ("Error returned:" , response_to_decode [str_pointer :str_pointer + 2 ])
116+ if response_to_decode [str_pointer :str_pointer + 2 ] == '00' :
117+ str_pointer = str_pointer + 2
118+ print ("LMK CRC:" , response_to_decode [str_pointer :str_pointer + 16 ])
119+ str_pointer = str_pointer + 16
120+ print ("Firmware number:" , response_to_decode [str_pointer :str_pointer + 9 ])
16121
17122
18123def payshield_error_codes (error_code : str ) -> str :
19- # This function maps the result code with the error message
20- # I derived the list of errors and messages from the following manual:
21- # payShield 10K Core Host Commands v1
22- # Revision: A
23- # Date: 04 August 2020
24- # Doc.Number: PUGD0537 - 004
25-
26- pay_shield_error_table = {
124+ """This function maps the result code with the error message.
125+ I derived the list of errors and messages from the following manual:
126+ payShield 10K Core Host Commands v1
127+ Revision: A
128+ Date: 04 August 2020
129+ Doc.Number: PUGD0537 - 004
130+
131+ Parameters
132+ ----------
133+ error_code: str
134+ The status code returned from the payShield 10k
135+
136+ Returns
137+ ----------
138+ a string containing the message of the error code
139+ """
140+
141+ PAYSHIELD_ERROR_CODE = {
27142 '00' : 'No error' ,
28143 '01' : 'Verification failure or warning of imported key parity error' ,
29144 '02' : 'Key inappropriate length for algorithm' ,
@@ -115,10 +230,29 @@ def payshield_error_codes(error_code: str) -> str:
115230 'BD' : 'Incompatible key types' ,
116231 'BE' : 'Invalid key block header ID' }
117232
118- return pay_shield_error_table .get (error_code , "Unknown error" )
233+ return PAYSHIELD_ERROR_CODE .get (error_code , "Unknown error" )
119234
120235
121236def check_returned_command_verb (result_returned : bytes , head_len : int , command_sent : str ) -> Tuple [int , str , str ]:
237+ """
238+ Checks if the command returned by the payShield is congruent to the command sent
239+ Parameters
240+ ----------
241+ result_returned: bytes
242+ The output returned from the payShield
243+ head_len: int
244+ The length of the header
245+ command_sent: str
246+ The command send to the payShield
247+
248+ Returns
249+ ----------
250+ a Tuple[int, str, str]
251+ where the first value is 0 of the command is congruent or -1 if it is not
252+ the second value is the command sent
253+ the third value is the command returned by te payShield
254+ """
255+
122256 verb_returned = result_returned [2 + head_len :][:2 ]
123257 verb_sent = command_sent [head_len :][:2 ]
124258 verb_expected = verb_sent [0 :1 ] + chr (ord (verb_sent [1 :2 ]) + 1 )
@@ -161,11 +295,31 @@ def test_printable(input_str):
161295 return all (c in string .printable for c in input_str )
162296
163297
164- def run_test (ip_addr : str , port : int , host_command : str , proto : str = "tcp" , header_len : int = 4 ) -> int :
165- # it connects to the specified host and port, using the specified protocol that can me tcp, udp or tls and
166- # sends the command.
167- # The default header length is set to 4 if not provided because this is the out of box default value
168- # in payShield 10k
298+ def run_test (ip_addr : str , port : int , host_command : str , proto : str = "tcp" , header_len : int = 4 ,
299+ decoder_funct : FunctionType = None ) -> int :
300+ """It connects to the specified host and port, using the specified protocol (tcp, udp or tls) and sends the command.
301+
302+ Parameters
303+ ----------
304+ ip_addr: str
305+ The address to connect to. It can be an IP, hostname or FQDN
306+ port: int
307+ The port to connect to
308+ host_command: str
309+ The command to send to the payShield complete of the header part
310+ proto: str
311+ The protocol to use, it can be usb, tcp or tls. If not specified the default is tcp
312+ header_len: int
313+ The length of the header. If not specified the value is 4 because it is the default factory value
314+ in payShield 10k
315+ decoder_funct: FunctionType
316+ If provided needs to be a reference to a function that is able to parse the command and print the meaning of it
317+ If not provided the default is None
318+
319+ Returns
320+ ----------
321+ an integer value representing the error code: -1 means that some parameter were wrong.
322+ """
169323
170324 # if proto != "tcp" and proto != "udp" and proto != "tls":
171325 if proto not in ['tcp' , 'udp' , 'tls' ]:
@@ -179,7 +333,7 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea
179333
180334 # join everything together in python3
181335 message = size + host_command .encode ()
182- # Connect to the host and gather the the reply in TCP or UDP
336+ # Connect to the host and gather the reply in TCP or UDP
183337 buffer_size = 4096
184338 if proto == "tcp" :
185339 # creates the TCP socket
@@ -224,14 +378,18 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea
224378
225379 # don't print ascii if msg or resp contains non printable chars
226380 if test_printable (message [2 :].decode ("ascii" , "ignore" )):
227- print ("sent data (ASCII) :" , message [2 :])
381+ print ("sent data (ASCII) :" , message [2 :]. decode ( "ascii" , "ignore" ) )
228382
229383 print ("sent data (HEX) :" , binascii .hexlify (message ))
230384
231385 if test_printable ((data [2 :]).decode ("ascii" , "ignore" )):
232- print ("received data (ASCII):" , data [2 :])
386+ print ("received data (ASCII):" , data [2 :]. decode ( "ascii" , "ignore" ) )
233387
234388 print ("received data (HEX) :" , binascii .hexlify (data ))
389+ if (decoder_funct is not None ) and callable (decoder_funct ):
390+ print ("" )
391+ print ("-----DECODING RESPONSE-----" )
392+ decoder_funct (data , header_len )
235393
236394 except ConnectionError as e :
237395 print ("Connection issue: " , e .message )
@@ -253,34 +411,51 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea
253411 print ("To get more info about the usage invoke it with the -h option" )
254412 print ("This software is open source and it is under the Affero AGPL 3.0" )
255413 print ("" )
414+ KEY_IGNORED_MSG = "If this option is specified --key is ignored"
415+
416+ # List of decoder functions used to interpreter the result.
417+ # The reference to the function is used as parameter in the run_test function.
418+ # If the parameter is not passed because a decoder for that command it is not defined the default value of the
419+ # parameter assumes the value of None
420+ DECODERS = {
421+ 'NO' : decode_no ,
422+ 'NC' : decode_nc ,
423+ 'N0' : decode_n0
424+ }
425+
256426 parser = argparse .ArgumentParser (description = "Stress a PayShield appliance with RSA key generation" )
257427 parser .add_argument ("host" , help = "Ip address or hostname of the payShield" )
258428 group = parser .add_mutually_exclusive_group ()
259429 parser .add_argument ("--port" , "-p" , help = "The host port" , default = 1500 , type = int )
260430 group .add_argument ("--key" , help = "RSA key length. Accepted values are 2048 ot 4096" ,
261431 default = 2048 , choices = [2048 , 4096 ], type = int )
262- group .add_argument ("--nc" , help = "Just perform a NC test If this option is specified --key is ignored" ,
432+ group .add_argument ("--nc" , help = "Just perform a NC test. " + KEY_IGNORED_MSG ,
263433 action = "store_true" )
264- group .add_argument ("--j2" , help = "Get HSM Loading using J2 command. If this option is specified --key is ignored" ,
434+ group .add_argument ("--no" , help = "Retrieves HSM status information using NO command. " +
435+ KEY_IGNORED_MSG ,
436+ action = "store_true" )
437+ group .add_argument ("--j2" , help = "Get HSM Loading using J2 command. " + KEY_IGNORED_MSG ,
265438 action = "store_true" )
266439 group .add_argument ("--j4" ,
267- help = "Get Host Command Volumes using J4 command. If this option is specified --key is ignored" ,
440+ help = "Get Host Command Volumes using J4 command. " + KEY_IGNORED_MSG ,
268441 action = "store_true" )
269442 group .add_argument ("--j8" ,
270- help = "Get Health Check Accumulated Counts using J8 command. "
271- "If this option is specified --key is ignored" ,
443+ help = "Get Health Check Accumulated Counts using J8 command. " + KEY_IGNORED_MSG ,
272444 action = "store_true" )
273445 group .add_argument ("--jk" ,
274- help = "Get Instantaneous Health Check Status using JK command. "
275- "If this option is specified --key is ignored" ,
446+ help = "Get Instantaneous Health Check Status using JK command. " + KEY_IGNORED_MSG ,
276447 action = "store_true" )
277448 group .add_argument ("--randgen" ,
278449 help = "Generate a random value 8 bytes long" , action = "store_true" )
279450 parser .add_argument ("--header" ,
280451 help = "the header string to prepend to the host command. If not specified the default is HEAD" ,
281452 default = "HEAD" , type = str )
282- parser .add_argument ("--forever" , help = "if this option is specified the program will run for ever" ,
453+ parser .add_argument ("--forever" , help = "if this option is specified the program runs for ever" ,
454+ action = "store_true" )
455+ parser .add_argument ("--decode" , help = "if this option is specified the reply of the payShield is interpreted "
456+ "if a decoder function for that command has been implemented" ,
283457 action = "store_true" )
458+
284459 parser .add_argument ("--times" , help = "how many time to repeat the operation" , type = int , default = 1000 )
285460 parser .add_argument ("--proto" , help = "accepted value are tcp or udp, the default is tcp" , default = "tcp" ,
286461 choices = ["tcp" , "udp" , "tls" ], type = str .lower )
@@ -296,6 +471,8 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea
296471 command = args .header + 'EI2409601#0000'
297472 if args .nc :
298473 command = args .header + 'NC'
474+ if args .no :
475+ command = args .header + 'NO00'
299476 if args .j2 :
300477 command = args .header + 'J2'
301478 if args .j4 :
@@ -324,12 +501,21 @@ def run_test(ip_addr: str, port: int, host_command: str, proto: str = "tcp", hea
324501 i = 1
325502 while True :
326503 print ("Iteration: " , i )
327- run_test (args .host , args .port , command , args .proto , len (args .header ))
504+ if args .decode :
505+ run_test (args .host , args .port , command , args .proto , len (args .header ),
506+ DECODERS .get (command [len (args .header ):len (args .header ) + 2 ], None ))
507+ else :
508+ run_test (args .host , args .port , command , args .proto , len (args .header ), None )
509+
328510 i = i + 1
329511 print ("" )
330512 else :
331513 for i in range (0 , args .times ):
332514 print ("Iteration: " , i + 1 , " of " , args .times )
333- run_test (args .host , args .port , command , args .proto , len (args .header ))
515+ if args .decode :
516+ run_test (args .host , args .port , command , args .proto , len (args .header ),
517+ DECODERS .get (command [len (args .header ):len (args .header ) + 2 ], None ))
518+ else :
519+ run_test (args .host , args .port , command , args .proto , len (args .header ), None )
334520 print ("" )
335521 print ("DONE" )
0 commit comments