1616
1717async def run_csrf_test ():
1818 """
19- Tests that CSRF protection works correctly for JSON endpoints .
19+ Tests that CSRF protection works correctly for various request types .
2020 """
21- print ("--- Starting CSRF JSON Test ---" )
21+ print ("--- Starting CSRF Logic Test ---" )
2222 async with httpx .AsyncClient (base_url = BASE_URL ) as client :
2323 try :
24- # 1. Make a GET request to a page to get a CSRF token
24+ # 1. Make a GET request to a page to get a CSRF token from the cookie
2525 print ("Step 1: Getting CSRF token from homepage..." )
2626 get_response = await client .get ("/" )
2727 get_response .raise_for_status ()
2828 assert "csrf_token" in client .cookies , "CSRF token not found in cookie"
2929 csrf_token = client .cookies ["csrf_token" ]
30- print (" [PASS] CSRF token received." )
30+ print (f " [PASS] CSRF token received: { csrf_token [: 10 ] } .. ." )
3131
32- # 2. Send a POST request to the JSON endpoint WITHOUT a CSRF token
32+ # 2. Test POST without any CSRF token (should fail)
3333 print ("\n Step 2: Testing POST to /api/test without CSRF token (expecting 403)..." )
34- payload = {"message" : "hello" }
35- fail_response = await client .post ("/api/test" , json = payload )
34+ fail_response = await client .post ("/api/test" , json = {"message" : "hello" })
3635 assert fail_response .status_code == 403 , f"Expected status 403, but got { fail_response .status_code } "
3736 assert "CSRF token missing or invalid" in fail_response .text
3837 print (" [PASS] Request was correctly forbidden." )
3938
40- # 3. Send a POST request to the JSON endpoint WITH the correct CSRF token
41- print ("\n Step 3: Testing POST to /api/test with CSRF token (expecting 200)..." )
39+ # 3. Test POST with CSRF token in JSON body (should pass)
40+ print ("\n Step 3: Testing POST to /api/test with CSRF token in JSON body (expecting 200)..." )
4241 payload_with_token = {"message" : "hello" , "csrf_token" : csrf_token }
43- success_response = await client .post ("/api/test" , json = payload_with_token )
44- assert success_response .status_code == 200 , f"Expected status 200, but got { success_response .status_code } "
45- response_json = success_response .json ()
46- assert response_json ["message" ] == "hello"
47- print (" [PASS] Request was successful." )
42+ success_response_body = await client .post ("/api/test" , json = payload_with_token )
43+ assert success_response_body .status_code == 200 , f"Expected status 200, but got { success_response_body .status_code } "
44+ assert success_response_body .json ()["message" ] == "hello"
45+ print (" [PASS] Request with token in body was successful." )
46+
47+ # 4. Test POST with CSRF token in header (should pass)
48+ print ("\n Step 4: Testing POST to /api/test with CSRF token in header (expecting 200)..." )
49+ headers = {"X-CSRF-Token" : csrf_token }
50+ success_response_header = await client .post ("/api/test" , json = {"message" : "world" }, headers = headers )
51+ assert success_response_header .status_code == 200 , f"Expected status 200, but got { success_response_header .status_code } "
52+ assert success_response_header .json ()["message" ] == "world"
53+ print (" [PASS] Request with token in header was successful." )
54+
55+ # 5. Test empty-body POST with CSRF token in header (should pass validation, then redirect)
56+ print ("\n Step 5: Testing empty-body POST to /logout with CSRF token in header (expecting 302)..." )
57+ # Note: The /logout endpoint redirects after success, so we expect a 302
58+ # We disable auto-redirects to verify the 302 status directly
59+ empty_body_response = await client .post ("/logout" , headers = headers , follow_redirects = False )
60+
61+ # If we got a 403, the CSRF check failed. If we got a 302, it passed!
62+ assert empty_body_response .status_code == 302 , f"Expected status 302 (Redirect), but got { empty_body_response .status_code } . (403 means CSRF failed)"
63+ print (" [PASS] Empty-body request passed CSRF check and redirected." )
4864
4965 except Exception as e :
5066 print (f"\n --- TEST FAILED ---" )
@@ -53,7 +69,7 @@ async def run_csrf_test():
5369 traceback .print_exc ()
5470 return False
5571
56- print ("\n --- TEST PASSED ---" )
72+ print ("\n --- ALL CSRF TESTS PASSED ---" )
5773 return True
5874
5975
@@ -89,13 +105,15 @@ def main():
89105 print ("\n Stopping test server..." )
90106 server_process .terminate ()
91107 # Get remaining output
92- stdout , stderr = server_process .communicate (timeout = 5 )
93-
94- print ("\n --- Server Output ---" )
95- print ("STDOUT:" )
96- print (stdout )
97- print ("\n STDERR:" )
98- print (stderr )
108+ try :
109+ stdout , stderr = server_process .communicate (timeout = 5 )
110+ print ("\n --- Server Output ---" )
111+ print ("STDOUT:" )
112+ print (stdout )
113+ print ("\n STDERR:" )
114+ print (stderr )
115+ except subprocess .TimeoutExpired :
116+ print ("Server did not terminate gracefully." )
99117
100118 if not test_passed :
101119 print ("\n Exiting with status 1 due to test failure." )
0 commit comments