7
7
from saml2 .time_util import in_a_while , str_to_time
8
8
from saml2 .ident import code
9
9
10
- SESSION_INFO_PATTERN = {"ava" :{}, "came from" :"" , "not_on_or_after" :0 ,
11
- "issuer" :"" , "session_id" :- 1 }
10
+ SESSION_INFO_PATTERN = {"ava" : {}, "came from" : "" , "not_on_or_after" : 0 ,
11
+ "issuer" : "" , "session_id" : - 1 }
12
12
13
13
14
- def _eq (l1 ,l2 ):
14
+ def _eq (l1 , l2 ):
15
15
return set (l1 ) == set (l2 )
16
16
17
+
17
18
def nid_eq (l1 , l2 ):
18
19
return _eq ([code (c ) for c in l1 ], [code (c ) for c in l2 ])
19
20
21
+
20
22
nid = [
21
23
NameID (name_qualifier = "foo" , format = NAMEID_FORMAT_TRANSIENT , text = "1234" ),
22
24
NameID (name_qualifier = "foo" , format = NAMEID_FORMAT_TRANSIENT , text = "9876" ),
23
25
NameID (name_qualifier = "foo" , format = NAMEID_FORMAT_TRANSIENT , text = "1000" )]
24
26
27
+
25
28
class TestClass :
26
29
def setup_class (self ):
27
30
self .cache = Cache ()
28
-
29
-
31
+
30
32
def test_set (self ):
31
33
not_on_or_after = str_to_time (in_a_while (days = 1 ))
32
34
session_info = SESSION_INFO_PATTERN .copy ()
33
- session_info ["ava" ] = {"givenName" :["Derek" ]}
35
+ session_info ["ava" ] = {"givenName" : ["Derek" ]}
34
36
self .cache .set (nid [0 ], "abcd" , session_info , not_on_or_after )
35
-
37
+
36
38
(ava , inactive ) = self .cache .get_identity (nid [0 ])
37
39
assert inactive == []
38
40
assert list (ava .keys ()) == ["givenName" ]
39
41
assert ava ["givenName" ] == ["Derek" ]
40
-
41
- def test_add_ava_info (self ):
42
+
43
+ def test_add_ava_info (self ):
42
44
not_on_or_after = str_to_time (in_a_while (days = 1 ))
43
45
session_info = SESSION_INFO_PATTERN .copy ()
44
- session_info ["ava" ] = {"surName" :["Jeter" ]}
46
+ session_info ["ava" ] = {"surName" : ["Jeter" ]}
45
47
self .cache .set (nid [0 ], "bcde" , session_info , not_on_or_after )
46
-
48
+
47
49
(ava , inactive ) = self .cache .get_identity (nid [0 ])
48
50
assert inactive == []
49
- assert _eq (ava .keys (), ["givenName" ,"surName" ])
51
+ assert _eq (ava .keys (), ["givenName" , "surName" ])
50
52
assert ava ["givenName" ] == ["Derek" ]
51
53
assert ava ["surName" ] == ["Jeter" ]
52
54
53
- def test_from_one_target_source (self ):
55
+ def test_from_one_target_source (self ):
54
56
session_info = self .cache .get (nid [0 ], "bcde" )
55
57
ava = session_info ["ava" ]
56
58
assert _eq (ava .keys (), ["surName" ])
@@ -59,66 +61,65 @@ def test_from_one_target_source(self):
59
61
ava = session_info ["ava" ]
60
62
assert _eq (ava .keys (), ["givenName" ])
61
63
assert ava ["givenName" ] == ["Derek" ]
62
-
64
+
63
65
def test_entities (self ):
64
66
assert _eq (self .cache .entities (nid [0 ]), ["abcd" , "bcde" ])
65
67
py .test .raises (Exception , "self.cache.entities('6666')" )
66
-
68
+
67
69
def test_remove_info (self ):
68
70
self .cache .reset (nid [0 ], "bcde" )
69
71
assert self .cache .active (nid [0 ], "bcde" ) == False
70
72
assert self .cache .active (nid [0 ], "abcd" )
71
-
73
+
72
74
(ava , inactive ) = self .cache .get_identity (nid [0 ])
73
75
assert inactive == ['bcde' ]
74
76
assert _eq (ava .keys (), ["givenName" ])
75
77
assert ava ["givenName" ] == ["Derek" ]
76
-
78
+
77
79
def test_active (self ):
78
80
assert self .cache .active (nid [0 ], "bcde" ) == False
79
81
assert self .cache .active (nid [0 ], "abcd" )
80
-
82
+
81
83
def test_subjects (self ):
82
84
assert nid_eq (self .cache .subjects (), [nid [0 ]])
83
-
85
+
84
86
def test_second_subject (self ):
85
87
not_on_or_after = str_to_time (in_a_while (days = 1 ))
86
88
session_info = SESSION_INFO_PATTERN .copy ()
87
- session_info ["ava" ] = {"givenName" :["Ichiro" ],
88
- "surName" :["Suzuki" ]}
89
+ session_info ["ava" ] = {"givenName" : ["Ichiro" ],
90
+ "surName" : ["Suzuki" ]}
89
91
self .cache .set (nid [1 ], "abcd" , session_info ,
90
- not_on_or_after )
92
+ not_on_or_after )
91
93
92
94
(ava , inactive ) = self .cache .get_identity (nid [1 ])
93
95
assert inactive == []
94
- assert _eq (ava .keys (), ["givenName" ,"surName" ])
96
+ assert _eq (ava .keys (), ["givenName" , "surName" ])
95
97
assert ava ["givenName" ] == ["Ichiro" ]
96
98
assert ava ["surName" ] == ["Suzuki" ]
97
99
assert nid_eq (self .cache .subjects (), [nid [0 ], nid [1 ]])
98
-
100
+
99
101
def test_receivers (self ):
100
102
assert _eq (self .cache .receivers (nid [1 ]), ["abcd" ])
101
-
103
+
102
104
not_on_or_after = str_to_time (in_a_while (days = 1 ))
103
105
session_info = SESSION_INFO_PATTERN .copy ()
104
- session_info ["ava" ] = {"givenName" :["Ichiro" ],
105
- "surName" :["Suzuki" ]}
106
+ session_info ["ava" ] = {"givenName" : ["Ichiro" ],
107
+ "surName" : ["Suzuki" ]}
106
108
self .cache .set (nid [1 ], "bcde" , session_info ,
107
- not_on_or_after )
108
-
109
+ not_on_or_after )
110
+
109
111
assert _eq (self .cache .receivers (nid [1 ]), ["abcd" , "bcde" ])
110
112
assert nid_eq (self .cache .subjects (), nid [0 :2 ])
111
-
113
+
112
114
def test_timeout (self ):
113
115
not_on_or_after = str_to_time (in_a_while (seconds = 1 ))
114
116
session_info = SESSION_INFO_PATTERN .copy ()
115
- session_info ["ava" ] = {"givenName" :["Alex" ],
116
- "surName" :["Rodriguez" ]}
117
+ session_info ["ava" ] = {"givenName" : ["Alex" ],
118
+ "surName" : ["Rodriguez" ]}
117
119
self .cache .set (nid [2 ], "bcde" , session_info ,
118
- not_on_or_after )
119
-
120
+ not_on_or_after )
121
+
120
122
time .sleep (2 )
121
123
(ava , inactive ) = self .cache .get_identity (nid [2 ])
122
124
assert inactive == ["bcde" ]
123
125
assert ava == {}
124
-
0 commit comments