|
| 1 | +import asyncio |
| 2 | +import httpx |
| 3 | +import subprocess |
| 4 | +import sys |
| 5 | +import time |
| 6 | +import os |
| 7 | + |
| 8 | +# Construct absolute path to the test application directory |
| 9 | +TESTS_DIR = os.path.dirname(os.path.abspath(__file__)) |
| 10 | +TEST_APP_DIR = os.path.join(TESTS_DIR, "test") |
| 11 | + |
| 12 | +# Ensure the test application is in the python path |
| 13 | +sys.path.insert(0, TEST_APP_DIR) |
| 14 | + |
| 15 | +BASE_URL = "http://127.0.0.1:8000" |
| 16 | + |
| 17 | +async def run_csrf_test(): |
| 18 | + """ |
| 19 | + Tests that CSRF protection works correctly for various request types. |
| 20 | + """ |
| 21 | + print("--- Starting CSRF Logic Test ---") |
| 22 | + async with httpx.AsyncClient(base_url=BASE_URL) as client: |
| 23 | + try: |
| 24 | + # 1. Make a GET request to a page to get a CSRF token from the cookie |
| 25 | + print("Step 1: Getting CSRF token from homepage...") |
| 26 | + get_response = await client.get("/") |
| 27 | + get_response.raise_for_status() |
| 28 | + assert "csrf_token" in client.cookies, "CSRF token not found in cookie" |
| 29 | + csrf_token = client.cookies["csrf_token"] |
| 30 | + print(f" [PASS] CSRF token received: {csrf_token[:10]}...") |
| 31 | + |
| 32 | + # 2. Test POST without any CSRF token (should fail) |
| 33 | + print("\nStep 2: Testing POST to /api/test without CSRF token (expecting 403)...") |
| 34 | + fail_response = await client.post("/api/test", json={"message": "hello"}) |
| 35 | + assert fail_response.status_code == 403, f"Expected status 403, but got {fail_response.status_code}" |
| 36 | + assert "CSRF token missing or invalid" in fail_response.text |
| 37 | + print(" [PASS] Request was correctly forbidden.") |
| 38 | + |
| 39 | + # 3. Test POST with CSRF token in JSON body (should pass) |
| 40 | + print("\nStep 3: Testing POST to /api/test with CSRF token in JSON body (expecting 200)...") |
| 41 | + payload_with_token = {"message": "hello", "csrf_token": csrf_token} |
| 42 | + success_response_body = await client.post("/api/test", json=payload_with_token) |
| 43 | + assert success_response_body.status_code == 200, f"Expected status 200, but got {success_response_body.status_code}" |
| 44 | + assert success_response_body.json()["message"] == "hello" |
| 45 | + print(" [PASS] Request with token in body was successful.") |
| 46 | + |
| 47 | + # 4. Test POST with CSRF token in header (should pass) |
| 48 | + print("\nStep 4: Testing POST to /api/test with CSRF token in header (expecting 200)...") |
| 49 | + headers = {"X-CSRF-Token": csrf_token} |
| 50 | + success_response_header = await client.post("/api/test", json={"message": "world"}, headers=headers) |
| 51 | + assert success_response_header.status_code == 200, f"Expected status 200, but got {success_response_header.status_code}" |
| 52 | + assert success_response_header.json()["message"] == "world" |
| 53 | + print(" [PASS] Request with token in header was successful.") |
| 54 | + |
| 55 | + # 5. Test empty-body POST with CSRF token in header (should pass validation, then redirect) |
| 56 | + print("\nStep 5: Testing empty-body POST to /logout with CSRF token in header (expecting 302)...") |
| 57 | + # Note: The /logout endpoint redirects after success, so we expect a 302 |
| 58 | + # We disable auto-redirects to verify the 302 status directly |
| 59 | + empty_body_response = await client.post("/logout", headers=headers, follow_redirects=False) |
| 60 | + |
| 61 | + # If we got a 403, the CSRF check failed. If we got a 302, it passed! |
| 62 | + assert empty_body_response.status_code == 302, f"Expected status 302 (Redirect), but got {empty_body_response.status_code}. (403 means CSRF failed)" |
| 63 | + print(" [PASS] Empty-body request passed CSRF check and redirected.") |
| 64 | + |
| 65 | + except Exception as e: |
| 66 | + print(f"\n--- TEST FAILED ---") |
| 67 | + print(f"An error occurred: {e}") |
| 68 | + import traceback |
| 69 | + traceback.print_exc() |
| 70 | + return False |
| 71 | + |
| 72 | + print("\n--- ALL CSRF TESTS PASSED ---") |
| 73 | + return True |
| 74 | + |
| 75 | + |
| 76 | +def main(): |
| 77 | + print("Starting test server...") |
| 78 | + server_process = subprocess.Popen( |
| 79 | + [sys.executable, "-m", "uvicorn", "app:app"], |
| 80 | + cwd=TEST_APP_DIR, |
| 81 | + stdout=subprocess.PIPE, |
| 82 | + stderr=subprocess.PIPE, |
| 83 | + text=True, # Decode stdout/stderr as text |
| 84 | + ) |
| 85 | + |
| 86 | + # Give the server more time to start up |
| 87 | + print("Waiting 5 seconds for server to start...") |
| 88 | + time.sleep(5) |
| 89 | + |
| 90 | + # Check if the server process has terminated unexpectedly |
| 91 | + if server_process.poll() is not None: |
| 92 | + print("\n--- SERVER FAILED TO START ---") |
| 93 | + stdout, stderr = server_process.communicate() |
| 94 | + print("STDOUT:") |
| 95 | + print(stdout) |
| 96 | + print("\nSTDERR:") |
| 97 | + print(stderr) |
| 98 | + sys.exit(1) |
| 99 | + |
| 100 | + print("Server seems to be running. Starting tests.") |
| 101 | + test_passed = False |
| 102 | + try: |
| 103 | + test_passed = asyncio.run(run_csrf_test()) |
| 104 | + finally: |
| 105 | + print("\nStopping test server...") |
| 106 | + server_process.terminate() |
| 107 | + # Get remaining output |
| 108 | + try: |
| 109 | + stdout, stderr = server_process.communicate(timeout=5) |
| 110 | + print("\n--- Server Output ---") |
| 111 | + print("STDOUT:") |
| 112 | + print(stdout) |
| 113 | + print("\nSTDERR:") |
| 114 | + print(stderr) |
| 115 | + except subprocess.TimeoutExpired: |
| 116 | + print("Server did not terminate gracefully.") |
| 117 | + |
| 118 | + if not test_passed: |
| 119 | + print("\nExiting with status 1 due to test failure.") |
| 120 | + sys.exit(1) |
| 121 | + |
| 122 | + |
| 123 | +if __name__ == "__main__": |
| 124 | + main() |
0 commit comments