@@ -111,12 +111,14 @@ def _create_host_spec_ocsp_bypass_ssd(ocsp, priv_key, hostname):
111111 exp_val = nbf_val + tdelta
112112 header = {'ssd_iss' :'dep1' }
113113 payload = {}
114- payload .update ({'sfcEndpoint' : hostname })
114+ hname_string = " " .join (hostname )
115+ acc_name = ocsp .get_account_from_hostname (hostname [0 ])
116+ payload .update ({'sfcEndpoint' : hname_string })
115117 payload .update ({'certId' : '*' })
116118 payload .update ({'nbf' : nbf_val })
117119 payload .update ({'exp' : exp_val })
118120 host_spec_jwt_token = jwt .encode (payload , priv_key , algorithm = 'RS512' , headers = header )
119- host_spec_bypass_ssd = {hostname : host_spec_jwt_token .decode ("utf-8" )}
121+ host_spec_bypass_ssd = {acc_name : host_spec_jwt_token .decode ("utf-8" )}
120122 json .dump (host_spec_bypass_ssd , jwt_host_spec_fp )
121123
122124
@@ -137,15 +139,16 @@ def test_host_spec_ocsp_bypass_ssd():
137139 ocsp = _setup_ssd_test (temp_ocsp_file_path )
138140 priv_key = _get_test_priv_key (1 )
139141
140- hostname = 'sfcsupport.us-east-1.snowflakecomputing.com'
142+ hostname = [ 'sfcsupport.us-east-1.snowflakecomputing.com' ]
141143 try :
142144 _create_host_spec_ocsp_bypass_ssd (ocsp , priv_key , hostname )
143145 except Exception as ex :
144146 print ("Exception occurred %s" % ex .message )
145147
146148 ocsp .read_directives ()
147149
148- cache_status , cur_host_spec_token = ocsp .SSD .find_in_ssd_cache (hostname )
150+ acc_name = ocsp .get_account_from_hostname (hostname [0 ])
151+ cache_status , cur_host_spec_token = ocsp .SSD .find_in_ssd_cache (acc_name )
149152 assert ((cur_host_spec_token is not None ), "Failed to read host specific directive" )
150153
151154 try :
@@ -155,6 +158,84 @@ def test_host_spec_ocsp_bypass_ssd():
155158 print ("Exception while processing SSD :" + ex )
156159
157160
161+ def test_host_spec_ocsp_bypass_updated_ssd ():
162+
163+ """
164+ Clean any skeletons of past tests
165+ """
166+ _teardown_ssd_test_setup ()
167+
168+ """
169+ Setup OCSP instance to use test keys
170+ for authenticating SSD
171+ """
172+ tmp_dir = str (tempfile .gettempdir ())
173+ temp_ocsp_file_path = path .join (tmp_dir , "ocsp_cache_backup.json" )
174+ copy (OCSP_RESPONSE_CACHE_URI , temp_ocsp_file_path )
175+ ocsp = _setup_ssd_test (temp_ocsp_file_path )
176+ priv_key = _get_test_priv_key (1 )
177+
178+ hostname = ['sfcsupport-test12345.global.us-east-1.snowflakecomputing.com' ,
179+ 'sfcsupport-test67890.global.us-east-1.snowflakecomputing.com' ,
180+ 'sfcsupport.us-east-1.snowflakecomputing.com' ,
181+ 'sfcsupport.us-east-2.snowflakecomputing.com' ]
182+ try :
183+ _create_host_spec_ocsp_bypass_ssd (ocsp , priv_key , hostname )
184+ except Exception as ex :
185+ print ("Exception occurred %s" % ex .message )
186+
187+ ocsp .read_directives ()
188+
189+ acc_name = ocsp .get_account_from_hostname (hostname [0 ])
190+ cache_status , cur_host_spec_token = ocsp .SSD .find_in_ssd_cache (acc_name )
191+ assert ((cur_host_spec_token is not None ), "Failed to read host specific directive" )
192+
193+ try :
194+ assert ocsp .process_ocsp_bypass_directive (cur_host_spec_token , '*' , hostname [1 ]),\
195+ "Failed to process host specific bypass ssd"
196+ except Exception as ex :
197+ print ("Exception while processing SSD :" + ex )
198+
199+
200+ def test_invalid_host_spec_ocsp_bypass_updated_ssd ():
201+
202+ """
203+ Clean any skeletons of past tests
204+ """
205+ _teardown_ssd_test_setup ()
206+
207+ """
208+ Setup OCSP instance to use test keys
209+ for authenticating SSD
210+ """
211+ tmp_dir = str (tempfile .gettempdir ())
212+ temp_ocsp_file_path = path .join (tmp_dir , "ocsp_cache_backup.json" )
213+ copy (OCSP_RESPONSE_CACHE_URI , temp_ocsp_file_path )
214+ ocsp = _setup_ssd_test (temp_ocsp_file_path )
215+ priv_key = _get_test_priv_key (1 )
216+
217+ hostname = ['sfcsupport-test12345.global.us-east-1.snowflakecomputing.com' ,
218+ 'sfcsupport-test67890.global.us-east-1.snowflakecomputing.com' ,
219+ 'sfcsupport.us-east-1.snowflakecomputing.com' ,
220+ 'sfcsupport.us-east-2.snowflakecomputing.com' ]
221+ try :
222+ _create_host_spec_ocsp_bypass_ssd (ocsp , priv_key , hostname )
223+ except Exception as ex :
224+ print ("Exception occurred %s" % ex .message )
225+
226+ ocsp .read_directives ()
227+
228+ acc_name = ocsp .get_account_from_hostname (hostname [0 ])
229+ cache_status , cur_host_spec_token = ocsp .SSD .find_in_ssd_cache (acc_name )
230+ assert ((cur_host_spec_token is not None ), "Failed to read host specific directive" )
231+
232+ try :
233+ assert ocsp .process_ocsp_bypass_directive (cur_host_spec_token , '*' , "sonytv.snowflakecomputing.com" ) is False ,\
234+ "SSD should not match hostname specified"
235+ except Exception as ex :
236+ print ("Exception while processing SSD :" + ex )
237+
238+
158239def _create_cert_spec_ocsp_bypass_token (priv_key , cid , validity_days = 1 ):
159240
160241 tdelta = timedelta (days = validity_days )
0 commit comments