|
11 | 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
12 | 12 | # See the License for the specific language governing permissions and |
13 | 13 | # limitations under the License. |
14 | | - |
15 | 14 | import base64 |
16 | 15 | import ctypes |
17 | 16 | import os |
|
30 | 29 | ENTERPRISE_CERT_FILE = os.path.join( |
31 | 30 | os.path.dirname(__file__), "../data/enterprise_cert_valid.json" |
32 | 31 | ) |
| 32 | +ENTERPRISE_CERT_FILE_PROVIDER = os.path.join( |
| 33 | + os.path.dirname(__file__), "../data/enterprise_cert_valid_provider.json" |
| 34 | +) |
33 | 35 | INVALID_ENTERPRISE_CERT_FILE = os.path.join( |
34 | 36 | os.path.dirname(__file__), "../data/enterprise_cert_invalid.json" |
35 | 37 | ) |
36 | 38 |
|
37 | 39 |
|
| 40 | +def test_load_provider_lib(): |
| 41 | + with mock.patch("ctypes.CDLL", return_value=mock.MagicMock()): |
| 42 | + _custom_tls_signer.load_provider_lib("/path/to/provider/lib") |
| 43 | + |
| 44 | + |
38 | 45 | def test_load_offload_lib(): |
39 | 46 | with mock.patch("ctypes.CDLL", return_value=mock.MagicMock()): |
40 | 47 | lib = _custom_tls_signer.load_offload_lib("/path/to/offload/lib") |
@@ -173,62 +180,81 @@ def test_custom_tls_signer(): |
173 | 180 | ) as load_offload_lib: |
174 | 181 | load_offload_lib.return_value = offload_lib |
175 | 182 | load_signer_lib.return_value = signer_lib |
176 | | - signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE) |
177 | | - signer_object.load_libraries() |
178 | | - assert signer_object._cert is None |
| 183 | + with mock.patch( |
| 184 | + "google.auth.transport._custom_tls_signer.get_cert" |
| 185 | + ) as get_cert: |
| 186 | + with mock.patch( |
| 187 | + "google.auth.transport._custom_tls_signer.get_sign_callback" |
| 188 | + ) as get_sign_callback: |
| 189 | + get_cert.return_value = b"mock_cert" |
| 190 | + signer_object = _custom_tls_signer.CustomTlsSigner( |
| 191 | + ENTERPRISE_CERT_FILE |
| 192 | + ) |
| 193 | + signer_object.load_libraries() |
| 194 | + signer_object.attach_to_ssl_context(create_urllib3_context()) |
| 195 | + get_cert.assert_called_once() |
| 196 | + get_sign_callback.assert_called_once() |
| 197 | + offload_lib.ConfigureSslContext.assert_called_once() |
179 | 198 | assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE |
180 | 199 | assert signer_object._offload_lib == offload_lib |
181 | 200 | assert signer_object._signer_lib == signer_lib |
182 | 201 | load_signer_lib.assert_called_with("/path/to/signer/lib") |
183 | 202 | load_offload_lib.assert_called_with("/path/to/offload/lib") |
184 | 203 |
|
185 | | - # Test set_up_custom_key and set_up_ssl_context methods |
186 | | - with mock.patch("google.auth.transport._custom_tls_signer.get_cert") as get_cert: |
187 | | - with mock.patch( |
188 | | - "google.auth.transport._custom_tls_signer.get_sign_callback" |
189 | | - ) as get_sign_callback: |
190 | | - get_cert.return_value = b"mock_cert" |
191 | | - signer_object.set_up_custom_key() |
192 | | - signer_object.attach_to_ssl_context(create_urllib3_context()) |
193 | | - get_cert.assert_called_once() |
194 | | - get_sign_callback.assert_called_once() |
195 | | - offload_lib.ConfigureSslContext.assert_called_once() |
196 | 204 |
|
| 205 | +def test_custom_tls_signer_provider(): |
| 206 | + provider_lib = mock.MagicMock() |
197 | 207 |
|
198 | | -def test_custom_tls_signer_failed_to_load_libraries(): |
199 | 208 | # Test load_libraries method |
| 209 | + with mock.patch( |
| 210 | + "google.auth.transport._custom_tls_signer.load_provider_lib" |
| 211 | + ) as load_provider_lib: |
| 212 | + load_provider_lib.return_value = provider_lib |
| 213 | + signer_object = _custom_tls_signer.CustomTlsSigner( |
| 214 | + ENTERPRISE_CERT_FILE_PROVIDER |
| 215 | + ) |
| 216 | + signer_object.load_libraries() |
| 217 | + signer_object.attach_to_ssl_context(mock.MagicMock()) |
| 218 | + |
| 219 | + assert signer_object._enterprise_cert_file_path == ENTERPRISE_CERT_FILE_PROVIDER |
| 220 | + assert signer_object._provider_lib == provider_lib |
| 221 | + load_provider_lib.assert_called_with("/path/to/provider/lib") |
| 222 | + |
| 223 | + |
| 224 | +def test_custom_tls_signer_failed_to_load_libraries(): |
200 | 225 | with pytest.raises(exceptions.MutualTLSChannelError) as excinfo: |
201 | 226 | signer_object = _custom_tls_signer.CustomTlsSigner(INVALID_ENTERPRISE_CERT_FILE) |
202 | 227 | signer_object.load_libraries() |
203 | 228 | assert excinfo.match("enterprise cert file is invalid") |
204 | 229 |
|
205 | 230 |
|
206 | | -def test_custom_tls_signer_fail_to_offload(): |
207 | | - offload_lib = mock.MagicMock() |
208 | | - signer_lib = mock.MagicMock() |
| 231 | +def test_custom_tls_signer_failed_to_attach(): |
| 232 | + with pytest.raises(exceptions.MutualTLSChannelError) as excinfo: |
| 233 | + signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE) |
| 234 | + signer_object._offload_lib = mock.MagicMock() |
| 235 | + signer_object._signer_lib = mock.MagicMock() |
| 236 | + signer_object._sign_callback = mock.MagicMock() |
| 237 | + signer_object._cert = b"mock cert" |
| 238 | + signer_object._offload_lib.ConfigureSslContext.return_value = False |
| 239 | + signer_object.attach_to_ssl_context(mock.MagicMock()) |
| 240 | + assert excinfo.match("failed to configure ECP Offload SSL context") |
209 | 241 |
|
210 | | - with mock.patch( |
211 | | - "google.auth.transport._custom_tls_signer.load_signer_lib" |
212 | | - ) as load_signer_lib: |
213 | | - with mock.patch( |
214 | | - "google.auth.transport._custom_tls_signer.load_offload_lib" |
215 | | - ) as load_offload_lib: |
216 | | - load_offload_lib.return_value = offload_lib |
217 | | - load_signer_lib.return_value = signer_lib |
218 | | - signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE) |
219 | | - signer_object.load_libraries() |
220 | 242 |
|
221 | | - # set the return value to be 0 which indicts offload fails |
222 | | - offload_lib.ConfigureSslContext.return_value = 0 |
| 243 | +def test_custom_tls_signer_failed_to_attach_provider(): |
| 244 | + with pytest.raises(exceptions.MutualTLSChannelError) as excinfo: |
| 245 | + signer_object = _custom_tls_signer.CustomTlsSigner( |
| 246 | + ENTERPRISE_CERT_FILE_PROVIDER |
| 247 | + ) |
| 248 | + signer_object._provider_lib = mock.MagicMock() |
| 249 | + signer_object._provider_lib.ECP_attach_to_ctx.return_value = False |
| 250 | + signer_object.attach_to_ssl_context(mock.MagicMock()) |
| 251 | + assert excinfo.match("failed to configure ECP Provider SSL context") |
223 | 252 |
|
| 253 | + |
| 254 | +def test_custom_tls_signer_failed_to_attach_no_libs(): |
224 | 255 | with pytest.raises(exceptions.MutualTLSChannelError) as excinfo: |
225 | | - with mock.patch( |
226 | | - "google.auth.transport._custom_tls_signer.get_cert" |
227 | | - ) as get_cert: |
228 | | - with mock.patch( |
229 | | - "google.auth.transport._custom_tls_signer.get_sign_callback" |
230 | | - ): |
231 | | - get_cert.return_value = b"mock_cert" |
232 | | - signer_object.set_up_custom_key() |
233 | | - signer_object.attach_to_ssl_context(create_urllib3_context()) |
234 | | - assert excinfo.match("failed to configure SSL context") |
| 256 | + signer_object = _custom_tls_signer.CustomTlsSigner(ENTERPRISE_CERT_FILE) |
| 257 | + signer_object._offload_lib = None |
| 258 | + signer_object._signer_lib = None |
| 259 | + signer_object.attach_to_ssl_context(mock.MagicMock()) |
| 260 | + assert excinfo.match("Invalid ECP configuration.") |
0 commit comments