Skip to content

Commit 58a9352

Browse files
committed
initial (somewhat tested) version
1 parent e7e6f5a commit 58a9352

File tree

2 files changed

+80
-0
lines changed

2 files changed

+80
-0
lines changed
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import re
2+
3+
from .base import ResponseMicroService
4+
from ..exception import SATOSAAuthenticationError
5+
6+
def _filters(f, requester, provider):
7+
pf = f.get(provider, f.get("", f.get("default", {})))
8+
rf = pf.get(requester, pf.get("", pf.get("default", {})))
9+
return rf.items()
10+
11+
class AttributeAuthorization(ResponseMicroService):
12+
13+
def __init__(self, config, *args, **kwargs):
14+
super().__init__(*args, **kwargs)
15+
self.attribute_allow = config.get("attribute_allow", {})
16+
self.attribute_deny = config.get("attribute_deny", {})
17+
18+
def _check_authz(self, context, attributes, requester, provider):
19+
for attribute_name, attribute_filter in _filters(self.attribute_allow, requester, provider):
20+
regex = re.compile(attribute_filter)
21+
if attribute_name in attributes:
22+
print(repr(regex))
23+
print(list(filter(regex.search, attributes[attribute_name])))
24+
if not list(filter(regex.search, attributes[attribute_name])):
25+
raise SATOSAAuthenticationError(context.state, "Permission denied")
26+
27+
for attribute_name, attribute_filter in _filters(self.attribute_deny, requester, provider):
28+
regex = re.compile(attribute_filter)
29+
if attribute_name in attributes:
30+
if len(list(filter(regex.search, attributes[attribute_name]))) != len(attributes[attribute_name]):
31+
raise SATOSAAuthenticationError(context.state, "Permission denied")
32+
33+
def process(self, context, data):
34+
self._check_authz(context, data.attributes, data.requester, data.auth_info.issuer)
35+
return super().process(context, data)
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
from satosa.internal_data import InternalResponse, AuthenticationInformation
2+
from satosa.micro_services.attribute_authorization import AttributeAuthorization
3+
from satosa.exception import SATOSAAuthenticationError
4+
from satosa.context import Context
5+
6+
class TestAttributeAuthorization:
7+
def create_authz_service(self, attribute_allow, attribute_deny):
8+
authz_service = AttributeAuthorization(config=dict(attribute_allow=attribute_allow,attribute_deny=attribute_deny), name="test_authz",
9+
base_url="https://satosa.example.com")
10+
authz_service.next = lambda ctx, data: data
11+
return authz_service
12+
13+
def test_authz_allow(self):
14+
attribute_allow = {
15+
"": { "default": {"a0": '.+@.+'} }
16+
}
17+
attribute_deny = {}
18+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
19+
resp = InternalResponse(AuthenticationInformation(None, None, None))
20+
resp.attributes = {
21+
"a0": ["[email protected]"],
22+
}
23+
try:
24+
ctx = Context()
25+
ctx.state = dict()
26+
authz_service.process(ctx, resp)
27+
except SATOSAAuthenticationError as ex:
28+
assert False
29+
30+
def test_authz_not_allow(self):
31+
attribute_allow = {
32+
"": { "default": {"a0": 'foo'} }
33+
}
34+
attribute_deny = {}
35+
authz_service = self.create_authz_service(attribute_allow, attribute_deny)
36+
resp = InternalResponse(AuthenticationInformation(None, None, None))
37+
resp.attributes = {
38+
"a0": ["bar"],
39+
}
40+
try:
41+
ctx = Context()
42+
ctx.state = dict()
43+
authz_service.process(ctx, resp)
44+
except SATOSAAuthenticationError as ex:
45+
assert True

0 commit comments

Comments
 (0)