|
5 | 5 |
|
6 | 6 | from __future__ import annotations
|
7 | 7 |
|
| 8 | +import typing |
8 | 9 | from base64 import b64decode, b64encode
|
9 | 10 | from collections import OrderedDict
|
10 | 11 | from datetime import datetime, timezone
|
|
28 | 29 | from cryptography.hazmat.backends import default_backend
|
29 | 30 | from cryptography.hazmat.primitives import hashes, serialization
|
30 | 31 | from cryptography.hazmat.primitives.asymmetric import padding, utils
|
| 32 | +from cryptography.hazmat.primitives.asymmetric.dsa import DSAPublicKey |
| 33 | +from cryptography.hazmat.primitives.asymmetric.ec import ECDSA, EllipticCurvePublicKey |
| 34 | +from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey |
31 | 35 | from OpenSSL.SSL import Connection
|
32 | 36 |
|
33 | 37 | from snowflake.connector.errorcode import (
|
@@ -368,9 +372,21 @@ def verify_signature(self, signature_algorithm, signature, cert, data):
|
368 | 372 | hasher = hashes.Hash(chosen_hash, backend)
|
369 | 373 | hasher.update(data.dump())
|
370 | 374 | digest = hasher.finalize()
|
| 375 | + additional_kwargs: dict[str, typing.Any] = dict() |
| 376 | + if isinstance(public_key, RSAPublicKey): |
| 377 | + additional_kwargs["padding"] = padding.PKCS1v15() |
| 378 | + additional_kwargs["algorithm"] = utils.Prehashed(chosen_hash) |
| 379 | + elif isinstance(public_key, DSAPublicKey): |
| 380 | + additional_kwargs["algorithm"] = utils.Prehashed(chosen_hash) |
| 381 | + elif isinstance(public_key, EllipticCurvePublicKey): |
| 382 | + additional_kwargs["signature_algorithm"] = ECDSA( |
| 383 | + utils.Prehashed(chosen_hash) |
| 384 | + ) |
371 | 385 | try:
|
372 | 386 | public_key.verify(
|
373 |
| - signature, digest, padding.PKCS1v15(), utils.Prehashed(chosen_hash) |
| 387 | + signature, |
| 388 | + digest, |
| 389 | + **additional_kwargs, |
374 | 390 | )
|
375 | 391 | except InvalidSignature:
|
376 | 392 | raise RevocationCheckError(msg="Failed to verify the signature")
|
|
0 commit comments