Skip to content

Commit bc9c6d0

Browse files
author
Roland Hedberg
committed
Added a new exception (UnknownBinding) and used it.
1 parent 343b4ee commit bc9c6d0

File tree

1 file changed

+93
-46
lines changed

1 file changed

+93
-46
lines changed

src/saml2/entity.py

Lines changed: 93 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import base64
2-
#from binascii import hexlify
2+
# from binascii import hexlify
33
import copy
44
import logging
55
from hashlib import sha1
@@ -25,7 +25,8 @@
2525
from saml2 import element_to_extension_element
2626
from saml2 import extension_elements_to_elements
2727

28-
from saml2.saml import NameID, EncryptedAssertion
28+
from saml2.saml import NameID
29+
from saml2.saml import EncryptedAssertion
2930
from saml2.saml import Issuer
3031
from saml2.saml import NAMEID_FORMAT_ENTITY
3132
from saml2.response import LogoutResponse
@@ -88,6 +89,10 @@
8889
}
8990

9091

92+
class UnknownBinding(SAMLError):
93+
pass
94+
95+
9196
def create_artifact(entity_id, message_handle, endpoint_index=0):
9297
"""
9398
SAML_artifact := B64(TypeCode EndpointIndex RemainingArtifact)
@@ -285,8 +290,8 @@ def pick_binding(self, service, bindings=None, descr_type="", request=None,
285290

286291
logger.error("Failed to find consumer URL: %s, %s, %s",
287292
entity_id, bindings, descr_type)
288-
#logger.error("Bindings: %s", bindings)
289-
#logger.error("Entities: %s", self.metadata)
293+
# logger.error("Bindings: %s", bindings)
294+
# logger.error("Entities: %s", self.metadata)
290295

291296
raise SAMLError("Unknown entity or unsupported bindings")
292297

@@ -362,11 +367,11 @@ def unravel(txt, binding, msgtype="response"):
362367
:param msgtype:
363368
:return:
364369
"""
365-
#logger.debug("unravel '%s'", txt)
370+
# logger.debug("unravel '%s'", txt)
366371
if binding not in [BINDING_HTTP_REDIRECT, BINDING_HTTP_POST,
367372
BINDING_SOAP, BINDING_URI, BINDING_HTTP_ARTIFACT,
368373
None]:
369-
raise ValueError("Don't know how to handle '%s'" % binding)
374+
raise UnknownBinding("Don't know how to handle '%s'" % binding)
370375
else:
371376
try:
372377
if binding == BINDING_HTTP_REDIRECT:
@@ -406,7 +411,7 @@ def unpack_soap_message(text):
406411
"""
407412
return open_soap_envelope(text)
408413

409-
# --------------------------------------------------------------------------
414+
# --------------------------------------------------------------------------
410415

411416
def sign(self, msg, mid=None, to_sign=None, sign_prepare=False):
412417
if msg.signature is None:
@@ -520,7 +525,8 @@ def has_encrypt_cert_in_metadata(self, sp_entity_id):
520525
return True
521526
return False
522527

523-
def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=None):
528+
def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response,
529+
node_xpath=None):
524530
""" Encryption of assertions.
525531
526532
:param encrypt_cert: Certificate to be used for encryption.
@@ -547,7 +553,8 @@ def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=No
547553
_cert = "%s%s" % (_cert, end_cert)
548554
_, cert_file = make_temp(_cert.encode('ascii'), decode=False)
549555
response = cbxs.encrypt_assertion(response, cert_file,
550-
pre_encryption_part(), node_xpath=node_xpath)
556+
pre_encryption_part(),
557+
node_xpath=node_xpath)
551558
return response
552559
except Exception as ex:
553560
exception = ex
@@ -558,14 +565,21 @@ def _encrypt_assertion(self, encrypt_cert, sp_entity_id, response, node_xpath=No
558565

559566
def _response(self, in_response_to, consumer_url=None, status=None,
560567
issuer=None, sign=False, to_sign=None, sp_entity_id=None,
561-
encrypt_assertion=False, encrypt_assertion_self_contained=False, encrypted_advice_attributes=False,
562-
encrypt_cert_advice=None, encrypt_cert_assertion=None,sign_assertion=None, pefim=False, **kwargs):
568+
encrypt_assertion=False,
569+
encrypt_assertion_self_contained=False,
570+
encrypted_advice_attributes=False,
571+
encrypt_cert_advice=None, encrypt_cert_assertion=None,
572+
sign_assertion=None, pefim=False, **kwargs):
563573
""" Create a Response.
564574
Encryption:
565-
encrypt_assertion must be true for encryption to be performed. If encrypted_advice_attributes also is
566-
true, then will the function try to encrypt the assertion in the the advice element of the main
567-
assertion. Only one assertion element is allowed in the advice element, if multiple assertions exists
568-
in the advice element the main assertion will be encrypted instead, since it's no point to encrypt
575+
encrypt_assertion must be true for encryption to be
576+
performed. If encrypted_advice_attributes also is
577+
true, then will the function try to encrypt the assertion in
578+
the the advice element of the main
579+
assertion. Only one assertion element is allowed in the
580+
advice element, if multiple assertions exists
581+
in the advice element the main assertion will be encrypted
582+
instead, since it's no point to encrypt
569583
If encrypted_advice_attributes is
570584
false the main assertion will be encrypted. Since the same key
571585
@@ -577,13 +591,17 @@ def _response(self, in_response_to, consumer_url=None, status=None,
577591
:param to_sign: If there are other parts to sign
578592
:param sp_entity_id: Entity ID for the calling service provider.
579593
:param encrypt_assertion: True if assertions should be encrypted.
580-
:param encrypt_assertion_self_contained: True if all encrypted assertions should have alla namespaces
581-
selfcontained.
582-
:param encrypted_advice_attributes: True if assertions in the advice element should be encrypted.
583-
:param encrypt_cert_advice: Certificate to be used for encryption of assertions in the advice element.
584-
:param encrypt_cert_assertion: Certificate to be used for encryption of assertions.
594+
:param encrypt_assertion_self_contained: True if all encrypted
595+
assertions should have alla namespaces selfcontained.
596+
:param encrypted_advice_attributes: True if assertions in the advice
597+
element should be encrypted.
598+
:param encrypt_cert_advice: Certificate to be used for encryption of
599+
assertions in the advice element.
600+
:param encrypt_cert_assertion: Certificate to be used for encryption
601+
of assertions.
585602
:param sign_assertion: True if assertions should be signed.
586-
:param pefim: True if a response according to the PEFIM profile should be created.
603+
:param pefim: True if a response according to the PEFIM profile
604+
should be created.
587605
:param kwargs: Extra key word arguments
588606
:return: A Response instance
589607
"""
@@ -611,46 +629,63 @@ def _response(self, in_response_to, consumer_url=None, status=None,
611629
if not has_encrypt_cert and encrypt_cert_assertion is None:
612630
encrypt_assertion = False
613631

614-
if encrypt_assertion or (encrypted_advice_attributes and response.assertion.advice is not None and
615-
len(response.assertion.advice.assertion) == 1):
632+
if encrypt_assertion or (
633+
encrypted_advice_attributes and response.assertion.advice is
634+
not None and
635+
len(response.assertion.advice.assertion) == 1):
616636
if sign:
617637
response.signature = pre_signature_part(response.id,
618638
self.sec.my_cert, 1)
619639
sign_class = [(class_name(response), response.id)]
620640
cbxs = CryptoBackendXmlSec1(self.config.xmlsec_binary)
621641
encrypt_advice = False
622-
if encrypted_advice_attributes and response.assertion.advice is not None \
642+
if encrypted_advice_attributes and response.assertion.advice is \
643+
not None \
623644
and len(response.assertion.advice.assertion) > 0:
624645
_assertions = response.assertion
625646
if not isinstance(_assertions, list):
626647
_assertions = [_assertions]
627648
for _assertion in _assertions:
628649
_assertion.advice.encrypted_assertion = []
629-
_assertion.advice.encrypted_assertion.append(EncryptedAssertion())
630-
_advice_assertions = copy.deepcopy(_assertion.advice.assertion)
650+
_assertion.advice.encrypted_assertion.append(
651+
EncryptedAssertion())
652+
_advice_assertions = copy.deepcopy(
653+
_assertion.advice.assertion)
631654
_assertion.advice.assertion = []
632655
if not isinstance(_advice_assertions, list):
633656
_advice_assertions = [_advice_assertions]
634657
for tmp_assertion in _advice_assertions:
635658
to_sign_advice = []
636659
if sign_assertion and not pefim:
637-
tmp_assertion.signature = pre_signature_part(tmp_assertion.id, self.sec.my_cert, 1)
638-
to_sign_advice.append((class_name(tmp_assertion), tmp_assertion.id))
639-
#tmp_assertion = response.assertion.advice.assertion[0]
640-
_assertion.advice.encrypted_assertion[0].add_extension_element(tmp_assertion)
660+
tmp_assertion.signature = pre_signature_part(
661+
tmp_assertion.id, self.sec.my_cert, 1)
662+
to_sign_advice.append(
663+
(class_name(tmp_assertion), tmp_assertion.id))
664+
665+
# tmp_assertion = response.assertion.advice.assertion[0]
666+
_assertion.advice.encrypted_assertion[
667+
0].add_extension_element(tmp_assertion)
641668

642669
if encrypt_assertion_self_contained:
643-
advice_tag = response.assertion.advice._to_element_tree().tag
670+
advice_tag = \
671+
response.assertion.advice._to_element_tree().tag
644672
assertion_tag = tmp_assertion._to_element_tree().tag
645673
response = \
646674
response.get_xml_string_with_self_contained_assertion_within_advice_encrypted_assertion(
647675
assertion_tag, advice_tag)
648-
node_xpath = ''.join(["/*[local-name()=\"%s\"]" % v for v in
649-
["Response", "Assertion", "Advice", "EncryptedAssertion", "Assertion"]])
676+
node_xpath = ''.join(
677+
["/*[local-name()=\"%s\"]" % v for v in
678+
["Response", "Assertion", "Advice",
679+
"EncryptedAssertion", "Assertion"]])
650680

651681
if to_sign_advice:
652-
response = signed_instance_factory(response, self.sec, to_sign_advice)
653-
response = self._encrypt_assertion(encrypt_cert_advice, sp_entity_id, response, node_xpath=node_xpath)
682+
response = signed_instance_factory(response,
683+
self.sec,
684+
to_sign_advice)
685+
response = self._encrypt_assertion(encrypt_cert_advice,
686+
sp_entity_id,
687+
response,
688+
node_xpath=node_xpath)
654689
response = response_from_string(response)
655690

656691
if encrypt_assertion:
@@ -660,24 +695,34 @@ def _response(self, in_response_to, consumer_url=None, status=None,
660695
if not isinstance(_assertions, list):
661696
_assertions = [_assertions]
662697
for _assertion in _assertions:
663-
_assertion.signature = pre_signature_part(_assertion.id, self.sec.my_cert, 1)
664-
to_sign_assertion.append((class_name(_assertion), _assertion.id))
698+
_assertion.signature = pre_signature_part(_assertion.id,
699+
self.sec.my_cert,
700+
1)
701+
to_sign_assertion.append(
702+
(class_name(_assertion), _assertion.id))
665703
if encrypt_assertion_self_contained:
666704
try:
667-
assertion_tag = response.assertion._to_element_tree().tag
705+
assertion_tag = response.assertion._to_element_tree(
706+
707+
).tag
668708
except:
669-
assertion_tag = response.assertion[0]._to_element_tree().tag
709+
assertion_tag = response.assertion[
710+
0]._to_element_tree().tag
670711
response = pre_encrypt_assertion(response)
671-
response = response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
712+
response = \
713+
response.get_xml_string_with_self_contained_assertion_within_encrypted_assertion(
672714
assertion_tag)
673715
else:
674716
response = pre_encrypt_assertion(response)
675717
if to_sign_assertion:
676-
response = signed_instance_factory(response, self.sec, to_sign_assertion)
677-
response = self._encrypt_assertion(encrypt_cert_assertion, sp_entity_id, response)
718+
response = signed_instance_factory(response, self.sec,
719+
to_sign_assertion)
720+
response = self._encrypt_assertion(encrypt_cert_assertion,
721+
sp_entity_id, response)
678722
else:
679723
if to_sign:
680-
response = signed_instance_factory(response, self.sec, to_sign)
724+
response = signed_instance_factory(response, self.sec,
725+
to_sign)
681726
if sign:
682727
return signed_instance_factory(response, self.sec, sign_class)
683728
else:
@@ -965,7 +1010,8 @@ def create_manage_name_id_request(self, destination, message_id=0,
9651010
kwargs["terminate"] = terminate
9661011
else:
9671012
raise AttributeError(
968-
"One of NewID, NewEncryptedNameID or Terminate has to be provided")
1013+
"One of NewID, NewEncryptedNameID or Terminate has to be "
1014+
"provided")
9691015

9701016
return self._message(ManageNameIDRequest, destination, consent=consent,
9711017
extensions=extensions, sign=sign, **kwargs)
@@ -1081,14 +1127,15 @@ def _parse_response(self, xmlstr, response_cls, service, binding,
10811127
keys.append(_cert["key"])
10821128
only_identity_in_encrypted_assertion = False
10831129
if "only_identity_in_encrypted_assertion" in kwargs:
1084-
only_identity_in_encrypted_assertion = kwargs["only_identity_in_encrypted_assertion"]
1130+
only_identity_in_encrypted_assertion = kwargs[
1131+
"only_identity_in_encrypted_assertion"]
10851132

10861133
response = response.verify(keys)
10871134

10881135
if not response:
10891136
return None
10901137

1091-
#logger.debug(response)
1138+
# logger.debug(response)
10921139

10931140
return response
10941141

0 commit comments

Comments
 (0)