1- # This Source Code Form is subject to the terms of the Mozilla Public
2- # License, v. 2.0. If a copy of the MPL was not distributed with this
3- # file, You can obtain one at https://mozilla.org/MPL/2.0/.
4- #
5- # This Source Code Form is "Incompatible With Secondary Licenses", as
6- # defined by the Mozilla Public License, v. 2.0.
7-
8- from .engine import Engine
9- from multiprocessing .pool import Pool
10- import numpy as np
11- import os
12-
13- import matplotlib .pyplot as plt
14-
15- # Class to compute average traces
16- class AverageTraces :
17- def __init__ (self , num_values , trace_length ):
18- self .avtraces = np .zeros ((num_values , trace_length ))
19- self .counters = np .zeros (num_values )
20-
21- # Method to add a trace and update the average
22- def add_trace (self , data , trace ):
23- if self .counters [data ] == 0 :
24- self .avtraces [data ] = trace
25- else :
26- self .avtraces [data ] = self .avtraces [data ] + (trace - self .avtraces [data ]) / self .counters [data ]
27- self .counters [data ] += 1
28-
29- # Method to get data with non-zero counters and corresponding average traces
30- def get_data (self ):
31- avdata_snap = np .flatnonzero (self .counters )
32- avtraces_snap = self .avtraces [avdata_snap ]
33- return avdata_snap , avtraces_snap
34-
35-
36- # Function to compute S-box output
37- def s_box_out (data , key_byte , sbox ):
38- s_box_in = data ^ key_byte
39- return sbox [s_box_in ]
40-
41-
42- # Linear regression analysis (LRA) for each byte of the key
43- def lra (data , traces , intermediate_fkt , sbox , sst ):
44- num_traces , trace_length = traces .shape
45- SSR = np .empty ((256 , trace_length ))
46- all_coefs = np .empty ((256 , 9 , trace_length ))
47-
48- for key_byte in np .arange (256 , dtype = 'uint8' ):
49- intermediate_var = intermediate_fkt (data , key_byte , sbox )
50- M = np .array (list (map (wrapper (8 ), intermediate_var )))
51-
52- P = (np .linalg .inv (M .T @ M )) @ M .T
53-
54- beta = P @ traces
55-
56- E = M @ beta
57-
58- SSR [key_byte ] = np .sum ((E - traces ) ** 2 , axis = 0 )
59-
60- all_coefs [key_byte ,:] = beta [:]
61-
62- R2 = 1 - SSR / sst [None , :]
63- return R2 , all_coefs
64-
65-
66- # Function to generate a model of bits
67- def model_single_bits (x , bit_width ):
68- model = []
69- for i in range (0 , bit_width ):
70- bit = (x >> i ) & 1
71- model .append (bit )
72- model .append (1 )
73- return model
74-
75-
76- # Wrapper function to map a model to an intermediate variable
77- def wrapper (y ):
78- def curry (x ):
79- return model_single_bits (x , y )
80- return curry
81-
82-
83- # Function to plot traces
84- def plot_traces (traces , nb ):
85- for i in range (nb ):
86- plt .plot (traces [i ])
87- plt .show ()
88-
89-
90- def fixed_sst (traces ):
91- num_traces , trace_length = traces .shape
92- u = np .zeros (trace_length )
93- v = np .zeros (trace_length )
94- for i in range (num_traces ):
95- u += traces [i ]
96- v += traces [i ]** 2
97- return np .subtract (v ,np .square (u )/ num_traces )
98-
99-
100- class LRA (Engine ):
101- def __init__ (self , key_bytes = np .arange ) -> None :
102- self .key_bytes = key_bytes
103- self .samples_len = 0
104- self .traces_len = 0
105- self .batch_size = 0
106- self .batches_num = 0
107-
108- self .average_traces = []
109- self .aes_key = []
110-
111- self .samples_range = None
112- self .samples_start = self .samples_end = 0
113-
114- # S-box definition
115- self .sbox = np .array ([
116- 0x63 , 0x7C , 0x77 , 0x7B , 0xF2 , 0x6B , 0x6F , 0xC5 , 0x30 , 0x01 , 0x67 , 0x2B , 0xFE , 0xD7 , 0xAB , 0x76 ,
117- 0xCA , 0x82 , 0xC9 , 0x7D , 0xFA , 0x59 , 0x47 , 0xF0 , 0xAD , 0xD4 , 0xA2 , 0xAF , 0x9C , 0xA4 , 0x72 , 0xC0 ,
118- 0xB7 , 0xFD , 0x93 , 0x26 , 0x36 , 0x3F , 0xF7 , 0xCC , 0x34 , 0xA5 , 0xE5 , 0xF1 , 0x71 , 0xD8 , 0x31 , 0x15 ,
119- 0x04 , 0xC7 , 0x23 , 0xC3 , 0x18 , 0x96 , 0x05 , 0x9A , 0x07 , 0x12 , 0x80 , 0xE2 , 0xEB , 0x27 , 0xB2 , 0x75 ,
120- 0x09 , 0x83 , 0x2C , 0x1A , 0x1B , 0x6E , 0x5A , 0xA0 , 0x52 , 0x3B , 0xD6 , 0xB3 , 0x29 , 0xE3 , 0x2F , 0x84 ,
121- 0x53 , 0xD1 , 0x00 , 0xED , 0x20 , 0xFC , 0xB1 , 0x5B , 0x6A , 0xCB , 0xBE , 0x39 , 0x4A , 0x4C , 0x58 , 0xCF ,
122- 0xD0 , 0xEF , 0xAA , 0xFB , 0x43 , 0x4D , 0x33 , 0x85 , 0x45 , 0xF9 , 0x02 , 0x7F , 0x50 , 0x3C , 0x9F , 0xA8 ,
123- 0x51 , 0xA3 , 0x40 , 0x8F , 0x92 , 0x9D , 0x38 , 0xF5 , 0xBC , 0xB6 , 0xDA , 0x21 , 0x10 , 0xFF , 0xF3 , 0xD2 ,
124- 0xCD , 0x0C , 0x13 , 0xEC , 0x5F , 0x97 , 0x44 , 0x17 , 0xC4 , 0xA7 , 0x7E , 0x3D , 0x64 , 0x5D , 0x19 , 0x73 ,
125- 0x60 , 0x81 , 0x4F , 0xDC , 0x22 , 0x2A , 0x90 , 0x88 , 0x46 , 0xEE , 0xB8 , 0x14 , 0xDE , 0x5E , 0x0B , 0xDB ,
126- 0xE0 , 0x32 , 0x3A , 0x0A , 0x49 , 0x06 , 0x24 , 0x5C , 0xC2 , 0xD3 , 0xAC , 0x62 , 0x91 , 0x95 , 0xE4 , 0x79 ,
127- 0xE7 , 0xC8 , 0x37 , 0x6D , 0x8D , 0xD5 , 0x4E , 0xA9 , 0x6C , 0x56 , 0xF4 , 0xEA , 0x65 , 0x7A , 0xAE , 0x08 ,
128- 0xBA , 0x78 , 0x25 , 0x2E , 0x1C , 0xA6 , 0xB4 , 0xC6 , 0xE8 , 0xDD , 0x74 , 0x1F , 0x4B , 0xBD , 0x8B , 0x8A ,
129- 0x70 , 0x3E , 0xB5 , 0x66 , 0x48 , 0x03 , 0xF6 , 0x0E , 0x61 , 0x35 , 0x57 , 0xB9 , 0x86 , 0xC1 , 0x1D , 0x9E ,
130- 0xE1 , 0xF8 , 0x98 , 0x11 , 0x69 , 0xD9 , 0x8E , 0x94 , 0x9B , 0x1E , 0x87 , 0xE9 , 0xCE , 0x55 , 0x28 , 0xDF ,
131- 0x8C , 0xA1 , 0x89 , 0x0D , 0xBF , 0xE6 , 0x42 , 0x68 , 0x41 , 0x99 , 0x2D , 0x0F , 0xB0 , 0x54 , 0xBB , 0x16
132- ])
133-
134-
135- def update (self , traces : np .ndarray , plaintext : np .ndarray , average_traces ):
136- for i in range (traces .shape [0 ]):
137- average_traces .add_trace (plaintext [i ], traces [i ])
138-
139- return average_traces
140-
141-
142- def calculate (self , byte_idx , average_traces ):
143- plain , trace = average_traces .get_data ()
144- sst = fixed_sst (trace )
145- r2 , coefs = lra (plain , trace , s_box_out , self .sbox , sst )
146- r2_peaks = np .max (r2 , axis = 1 )
147- winning_byte = np .argmax (r2_peaks )
148-
149- print (f"Key Byte { byte_idx } : { winning_byte :02x} , Max R2: { np .max (r2_peaks ):.5f} " )
150- return winning_byte
151-
152-
153- def run (self , container , samples_range = None ):
154- if samples_range == None :
155- self .samples_range = container .data .sample_length
156- self .samples_start = 0
157- self .samples_end = container .data .sample_length
158- else :
159- self .samples_range = samples_range [1 ]- samples_range [0 ]
160- (self .samples_start , self .samples_end ) = samples_range
161-
162- self .average_traces = [AverageTraces (256 , self .samples_range ) for _ in range (len (container .model_positions ))] # all key bytes
163- self .aes_key = [[] for _ in range (len (container .tiles ))]
164-
165- with Pool (processes = int (os .cpu_count ()/ 2 )) as pool :
166- workload = []
167- for tile in container .tiles :
168- (tile_x , tile_y ) = tile
169- for model_pos in container .model_positions :
170- workload .append ((self , container , self .average_traces [model_pos ], tile_x , tile_y , model_pos ))
171- starmap_results = pool .starmap (self .run_workload , workload , chunksize = 1 )
172- pool .close ()
173- pool .join ()
174-
175- for tile_x , tile_y , model_pos , tmp_key_byte in starmap_results :
176- tile_index = list (container .tiles ).index ((tile_x , tile_y ))
177- self .aes_key [tile_index ].append (tmp_key_byte )
178-
179- # Print recovered AES key(s)
180- for key in self .aes_key :
181- aes_key_bytes = bytes (key )
182- print ("Recovered AES Key:" , aes_key_bytes .hex ())
183-
184-
185- @staticmethod
186- def run_workload (self , container , average_traces , tile_x , tile_y , model_pos ):
187- container .configure (tile_x , tile_y , [model_pos ])
188-
189- for batch in container .get_batches (tile_x , tile_y ):
190- average_traces = self .update (batch [- 1 ][:,self .samples_start :self .samples_end ], batch [0 ], average_traces )
191-
192- key_byte = self .calculate (model_pos , average_traces )
193- return tile_x , tile_y , model_pos , key_byte
1+ # This Source Code Form is subject to the terms of the Mozilla Public
2+ # License, v. 2.0. If a copy of the MPL was not distributed with this
3+ # file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+ #
5+ # This Source Code Form is "Incompatible With Secondary Licenses", as
6+ # defined by the Mozilla Public License, v. 2.0.
7+
8+
9+ from .engine import Engine
10+ from ..model_values .model_value import ModelValue
11+ from ..model_values .model_bits import ModelBits
12+ from multiprocessing .pool import Pool
13+ import numpy as np
14+ import os
15+
16+
17+ # Class to compute average traces
18+ class AverageTraces :
19+ def __init__ (self , num_values , trace_length ):
20+ self .avtraces = np .zeros ((num_values , trace_length ))
21+ self .counters = np .zeros (num_values )
22+
23+ # Method to add a trace and update the average
24+ def add_trace (self , data , trace ):
25+ if self .counters [data ] == 0 :
26+ self .avtraces [data ] = trace
27+ else :
28+ self .avtraces [data ] = self .avtraces [data ] + (trace - self .avtraces [data ]) / self .counters [data ]
29+ self .counters [data ] += 1
30+
31+ # Method to get data with non-zero counters and corresponding average traces
32+ def get_data (self ):
33+ avdata_snap = np .flatnonzero (self .counters )
34+ avtraces_snap = self .avtraces [avdata_snap ]
35+ return avdata_snap , avtraces_snap
36+
37+
38+ class LRA (Engine ):
39+ def __init__ (self , model_value : ModelValue , convergence_step = None , normalize = False , bias = True ) -> None :
40+ self .normalize = normalize
41+
42+ self .convergence_step = convergence_step
43+ self .R2s = None
44+ self .betas = None
45+ self .results = []
46+
47+ super ().__init__ (ModelBits (model_value , bias ))
48+
49+ def get_R2s (self ):
50+ return self .R2s
51+
52+ def get_betas (self ):
53+ return self .betas
54+
55+ def get_results (self ):
56+ return self .results
57+
58+ def populate (self , container ):
59+ model_vals = self .model_value .model .num_vals
60+ num_steps = - (container .data .traces_length // - self .convergence_step ) if self .convergence_step else 1
61+
62+ # R2s and Beta values, Results
63+ self .R2s = np .zeros ((len (container .model_positions ),
64+ num_steps ,
65+ model_vals ,
66+ container .sample_length ), dtype = np .float64 )
67+ self .betas = np .zeros ((len (container .model_positions ),
68+ model_vals ,
69+ self .model_value .num_bits ,
70+ container .sample_length ), dtype = np .float64 )
71+ self .results = [[] for _ in range (len (container .tiles ))]
72+
73+ def update (self , traces : np .ndarray , plaintext : np .ndarray , average_traces ):
74+ for i in range (traces .shape [0 ]):
75+ average_traces .add_trace (plaintext [i ], traces [i ])
76+
77+ return average_traces
78+
79+ def calculate (self , average_traces , model ):
80+ plain , traces = average_traces .get_data ()
81+ num_traces , trace_length = traces .shape
82+ model_vals = model .shape [0 ]
83+
84+ SST = np .sum (np .square (traces ), axis = 0 ) - np .square (np .sum (traces , axis = 0 ))/ num_traces
85+ SSR = np .empty ((model_vals , trace_length ))
86+
87+ # Linear regression analysis (LRA) for each model position
88+ P = np .linalg .pinv (model )
89+ betas = P @ traces
90+ # Below loop is equivalent to:
91+ # E = model @ beta
92+ # SSR = np.sum((E - traces)**2, axis=1)
93+ # However this takes too much memory
94+ for i in range (0 , betas .shape [- 1 ], step := 1 ):
95+ E = model @ betas [..., i :i + step ]
96+ SSR [..., i :i + step ] = np .sum ((E - traces [:, i :i + step ])** 2 , axis = 1 )
97+
98+ R2s = 1 - SSR / SST [None , :]
99+ if self .normalize : # Normalization
100+ R2s = (R2s - np .mean (R2s , axis = 0 , keepdims = True )) / np .std (R2s , axis = 0 , keepdims = True )
101+
102+ return R2s , betas
103+
104+ def find_candidate (self , R2s ):
105+ r2_peaks = np .max (R2s , axis = 1 )
106+ winning_candidate = np .argmax (r2_peaks )
107+ return winning_candidate
108+
109+ def run (self , container ):
110+ self .populate (container )
111+
112+ with Pool (processes = int (os .cpu_count ()/ 2 )) as pool :
113+ workload = []
114+ for tile in container .tiles :
115+ (tile_x , tile_y ) = tile
116+ for model_pos in container .model_positions :
117+ workload .append ((self , container , tile_x , tile_y , model_pos ))
118+ starmap_results = pool .starmap (self .run_workload , workload , chunksize = 1 )
119+ pool .close ()
120+ pool .join ()
121+
122+ for tile_x , tile_y , model_pos , candidate , r2s , betas in starmap_results :
123+ self .R2s [model_pos ] = r2s
124+ self .betas [model_pos ] = betas
125+ tile_index = list (container .tiles ).index ((tile_x , tile_y ))
126+ self .results [tile_index ].append (candidate )
127+
128+ @staticmethod
129+ def run_workload (self , container , tile_x , tile_y , model_pos ):
130+ num_steps = container .configure (tile_x , tile_y , [model_pos ], self .convergence_step )
131+ if self .convergence_step is None :
132+ self .convergence_step = np .inf
133+
134+ model_vals = self .model_value .model .num_vals
135+ average_traces = AverageTraces (model_vals , container .sample_length )
136+
137+ r2s = np .empty ((num_steps , model_vals , container .sample_length ))
138+
139+ model_input = np .arange (model_vals , dtype = 'uint8' )[..., np .newaxis ]
140+ hypotheses = self .model_value .calculate_table ([model_input , None , None , None ])
141+
142+ traces_processed = 0
143+ converge_index = 0
144+ for batch in container .get_batches (tile_x , tile_y ):
145+ if traces_processed >= self .convergence_step :
146+ instant_r2s , _ = self .calculate (average_traces , hypotheses )
147+ r2s [converge_index , :, :] = instant_r2s
148+ traces_processed = 0
149+ converge_index += 1
150+ # Update
151+ plaintext = batch [0 ]
152+ traces = batch [- 1 ]
153+ average_traces = self .update (traces , plaintext , average_traces )
154+ traces_processed += traces .shape [0 ]
155+
156+ instant_r2s , betas = self .calculate (average_traces , hypotheses )
157+ r2s [converge_index , :, :] = instant_r2s
158+ candidate = self .find_candidate (r2s [- 1 , ...])
159+
160+ return tile_x , tile_y , model_pos , candidate , r2s , betas
0 commit comments