|
6 | 6 | from google.protobuf import json_format |
7 | 7 | import grpc |
8 | 8 | from . import proto |
| 9 | +import ssl |
| 10 | +from cryptography import x509 |
| 11 | +from cryptography.hazmat.backends import default_backend |
9 | 12 |
|
10 | 13 |
|
11 | 14 | try: |
@@ -228,6 +231,7 @@ def __init__(self, *args, **kwargs): |
228 | 231 | super().__init__(*args, **kwargs) |
229 | 232 | self.device = kwargs.get('device') |
230 | 233 | self.dev_args = self.connection_info |
| 234 | + self.skip_verify = kwargs.get('skip_verify') |
231 | 235 | if self.dev_args.get('protocol', '') != 'gnmi': |
232 | 236 | msg = 'Invalid protocol {0}'.format(self.dev_args.get('protocol', '')) |
233 | 237 | raise TypeError(msg) |
@@ -266,6 +270,7 @@ def connect(self): |
266 | 270 | dev_args = self.dev_args |
267 | 271 | username = dev_args.get('username', '') |
268 | 272 | password = dev_args.get('password', '') |
| 273 | + skip_verify = dev_args.get('skip_verify') |
269 | 274 |
|
270 | 275 | if dev_args.get('custom_log', ''): |
271 | 276 | self.log = dev_args.get('custom_log') |
@@ -331,6 +336,28 @@ def connect(self): |
331 | 336 | if private_key and os.path.isfile(private_key): |
332 | 337 | private_key = open(private_key, 'rb').read() |
333 | 338 |
|
| 339 | + if skip_verify and not root: |
| 340 | + try: |
| 341 | + ssl_cert = ssl.get_server_certificate((str(host), port)).encode("utf-8") |
| 342 | + except Exception as e: |
| 343 | + self.log.error(f'The SSH certificate cannot be retrieved from {target}') |
| 344 | + raise gNMIException(f'The SSH certificate cannot be retrieved from {target}', e) |
| 345 | + |
| 346 | + ssl_cert_deserialized = x509.load_pem_x509_certificate( |
| 347 | + ssl_cert, default_backend() |
| 348 | + ) |
| 349 | + |
| 350 | + try: |
| 351 | + ssl_cert_common_name = ssl_cert_deserialized.subject.get_attributes_for_oid( |
| 352 | + (x509.oid.NameOID.COMMON_NAME) |
| 353 | + )[0].value |
| 354 | + options.append( |
| 355 | + ('grpc.ssl_target_name_override', ssl_cert_common_name), |
| 356 | + ) |
| 357 | + root = ssl_cert |
| 358 | + except BaseException as err: |
| 359 | + self.log.warning(f'Unable to get common name: {err}') |
| 360 | + |
334 | 361 | if any((root, chain, private_key)): |
335 | 362 | override_name = dev_args.get('ssl_name_override', '') |
336 | 363 | if override_name: |
@@ -359,7 +386,6 @@ def connect(self): |
359 | 386 | target, channel_creds, options |
360 | 387 | ) |
361 | 388 | else: |
362 | | - self.channel = grpc.insecure_channel(target) |
363 | 389 | self.channel = grpc.insecure_channel(target, options) |
364 | 390 | self.metadata = [ |
365 | 391 | ("username", username), |
|
0 commit comments