@@ -26,17 +26,159 @@ def test_no_two_concurrent_receivers_can_listen_on_same_port(self):
2626 pass
2727
2828 def test_template_should_escape_input (self ):
29+ """Test that POST request with HTML in error is properly escaped"""
2930 with AuthCodeReceiver () as receiver :
3031 receiver ._scheduled_actions = [( # Injection happens here when the port is known
3132 1 , # Delay it until the receiver is activated by get_auth_response()
3233 lambda : self .assertEqual (
3334 "<html><tag>foo</tag></html>" ,
34- requests .get ("http://localhost:{}?error=<tag>foo</tag>" .format (
35- receiver .get_port ())).text ,
36- "Unsafe data in HTML should be escaped" ,
35+ requests .post (
36+ "http://localhost:{}" .format (receiver .get_port ()),
37+ data = {"error" : "<tag>foo</tag>" },
38+ headers = {'Content-Type' : 'application/x-www-form-urlencoded' }
39+ ).text ,
3740 ))]
3841 receiver .get_auth_response ( # Starts server and hang until timeout
3942 timeout = 3 ,
4043 error_template = "<html>$error</html>" ,
4144 )
4245
46+ def test_get_request_with_auth_code_is_rejected (self ):
47+ """Test that GET request with auth code is rejected for security"""
48+ with AuthCodeReceiver () as receiver :
49+ test_code = "test_auth_code_12345"
50+ test_state = "test_state_67890"
51+ receiver ._scheduled_actions = [(
52+ 1 ,
53+ lambda : self ._verify_get_rejection (
54+ receiver .get_port (),
55+ code = test_code ,
56+ state = test_state
57+ )
58+ )]
59+ result = receiver .get_auth_response (
60+ timeout = 3 ,
61+ state = test_state ,
62+ )
63+ # Should not receive auth response via GET
64+ self .assertIsNone (result )
65+
66+ def _verify_get_rejection (self , port , ** params ):
67+ """Helper to verify GET requests with auth codes are rejected"""
68+ try :
69+ from urllib .parse import urlencode
70+ except ImportError :
71+ from urllib import urlencode
72+
73+ response = requests .get (
74+ "http://localhost:{}?{}" .format (port , urlencode (params ))
75+ )
76+ # Verify error message about unsupported method
77+ self .assertIn ("not supported" , response .text .lower ())
78+ self .assertEqual (response .status_code , 400 )
79+
80+ def test_post_request_with_auth_code (self ):
81+ """Test that POST request with auth code is handled correctly (form_post response mode)"""
82+ with AuthCodeReceiver () as receiver :
83+ test_code = "test_auth_code_12345"
84+ test_state = "test_state_67890"
85+ receiver ._scheduled_actions = [(
86+ 1 ,
87+ lambda : self ._send_post_auth_response (
88+ receiver .get_port (),
89+ code = test_code ,
90+ state = test_state
91+ )
92+ )]
93+ result = receiver .get_auth_response (
94+ timeout = 3 ,
95+ state = test_state ,
96+ success_template = "Success: Got code $code" ,
97+ )
98+ self .assertIsNotNone (result )
99+ self .assertEqual (result .get ("code" ), test_code )
100+ self .assertEqual (result .get ("state" ), test_state )
101+
102+ def test_post_request_with_error (self ):
103+ """Test that POST request with error is handled correctly"""
104+ with AuthCodeReceiver () as receiver :
105+ test_error = "access_denied"
106+ test_error_description = "User denied access"
107+ receiver ._scheduled_actions = [(
108+ 1 ,
109+ lambda : self ._send_post_auth_response (
110+ receiver .get_port (),
111+ error = test_error ,
112+ error_description = test_error_description
113+ )
114+ )]
115+ result = receiver .get_auth_response (
116+ timeout = 3 ,
117+ error_template = "Error: $error - $error_description" ,
118+ )
119+ self .assertIsNotNone (result )
120+ self .assertEqual (result .get ("error" ), test_error )
121+ self .assertEqual (result .get ("error_description" ), test_error_description )
122+
123+ def test_post_request_state_mismatch (self ):
124+ """Test that POST request with mismatched state is rejected"""
125+ with AuthCodeReceiver () as receiver :
126+ test_code = "test_auth_code_12345"
127+ wrong_state = "wrong_state"
128+ expected_state = "expected_state"
129+ receiver ._scheduled_actions = [(
130+ 1 ,
131+ lambda : self ._send_post_auth_response (
132+ receiver .get_port (),
133+ code = test_code ,
134+ state = wrong_state
135+ )
136+ )]
137+ result = receiver .get_auth_response (
138+ timeout = 3 ,
139+ state = expected_state ,
140+ )
141+ # When state mismatches, the response should not be set
142+ self .assertIsNone (result )
143+
144+ def test_post_request_escapes_html (self ):
145+ """Test that POST request with HTML in error is properly escaped"""
146+ with AuthCodeReceiver () as receiver :
147+ malicious_error = "<script>alert('xss')</script>"
148+ receiver ._scheduled_actions = [(
149+ 1 ,
150+ lambda : self ._verify_post_escaping (receiver .get_port (), malicious_error )
151+ )]
152+ receiver .get_auth_response (
153+ timeout = 3 ,
154+ error_template = "<html>$error</html>" ,
155+ )
156+
157+ def _send_post_auth_response (self , port , ** params ):
158+ """Helper to send POST request with auth response"""
159+ try :
160+ from urllib .parse import urlencode
161+ except ImportError :
162+ from urllib import urlencode
163+
164+ response = requests .post (
165+ "http://localhost:{}" .format (port ),
166+ data = params ,
167+ headers = {'Content-Type' : 'application/x-www-form-urlencoded' }
168+ )
169+ return response
170+
171+ def _verify_post_escaping (self , port , malicious_error ):
172+ """Helper to verify HTML escaping in POST requests"""
173+ response = self ._send_post_auth_response (port , error = malicious_error )
174+ # Verify that the malicious script is escaped
175+ self .assertIn ("<script>" , response .text )
176+ self .assertNotIn ("<script>" , response .text )
177+ # Note: The escape function also escapes single quotes to '
178+ expected = "<html><script>alert('xss')</script></html>"
179+ self .assertEqual (
180+ expected ,
181+ response .text ,
182+ "HTML in POST data should be escaped to prevent XSS"
183+ )
184+
0 commit comments