1+ #!/usr/bin/env python3
2+ """
3+ Test script to verify authentication is working in the HTTP Event Collector.
4+ This tests the fix for the security vulnerability described in test-data/lorem-issue.md
5+ """
6+
7+ import requests
8+ import json
9+ import sys
10+ import time
11+ import subprocess
12+ import signal
13+ import threading
14+ from datetime import datetime
15+
16+ def test_authentication (base_url = "http://localhost:8001" ):
17+ """Test authentication with valid and invalid tokens"""
18+
19+ # Test cases
20+ test_cases = [
21+ {
22+ "name" : "Valid token" ,
23+ "token" : "valid-test-token" ,
24+ "expected_status" : 200 ,
25+ "description" : "Should accept requests with valid token"
26+ },
27+ {
28+ "name" : "Invalid token" ,
29+ "token" : "invalid-token" ,
30+ "expected_status" : 401 ,
31+ "description" : "Should reject requests with invalid token"
32+ },
33+ {
34+ "name" : "Dogs token (🐾)" ,
35+ "token" : "woof-woof-🐾" ,
36+ "expected_status" : 401 ,
37+ "description" : "Should reject dogs (unauthorized access)"
38+ },
39+ {
40+ "name" : "Random token" ,
41+ "token" : "abc123xyz" ,
42+ "expected_status" : 401 ,
43+ "description" : "Should reject random tokens"
44+ }
45+ ]
46+
47+ results = []
48+
49+ for test_case in test_cases :
50+ print (f"\n Testing: { test_case ['name' ]} " )
51+ print (f"Description: { test_case ['description' ]} " )
52+
53+ # Prepare test event
54+ event_data = {
55+ "test_event" : True ,
56+ "timestamp" : datetime .now ().isoformat (),
57+ "action" : "auth_test" ,
58+ "token_test" : test_case ['name' ]
59+ }
60+
61+ headers = {
62+ 'Authorization' : f'Splunk { test_case ["token" ]} ' ,
63+ 'Content-Type' : 'application/json'
64+ }
65+
66+ try :
67+ response = requests .post (
68+ f"{ base_url } /services/collector" ,
69+ headers = headers ,
70+ json = event_data ,
71+ timeout = 5
72+ )
73+
74+ status_code = response .status_code
75+ print (f"Status Code: { status_code } " )
76+ print (f"Response: { response .text } " )
77+
78+ # Check if result matches expectation
79+ success = status_code == test_case ['expected_status' ]
80+ result = {
81+ "test" : test_case ['name' ],
82+ "expected" : test_case ['expected_status' ],
83+ "actual" : status_code ,
84+ "success" : success ,
85+ "response" : response .text
86+ }
87+ results .append (result )
88+
89+ if success :
90+ print ("✅ PASS" )
91+ else :
92+ print ("❌ FAIL" )
93+
94+ except requests .exceptions .RequestException as e :
95+ print (f"❌ REQUEST ERROR: { e } " )
96+ results .append ({
97+ "test" : test_case ['name' ],
98+ "expected" : test_case ['expected_status' ],
99+ "actual" : "ERROR" ,
100+ "success" : False ,
101+ "response" : str (e )
102+ })
103+
104+ # Summary
105+ print ("\n " + "=" * 60 )
106+ print ("AUTHENTICATION TEST SUMMARY" )
107+ print ("=" * 60 )
108+
109+ passed = sum (1 for r in results if r ['success' ])
110+ total = len (results )
111+
112+ for result in results :
113+ status = "✅ PASS" if result ['success' ] else "❌ FAIL"
114+ print (f"{ status } { result ['test' ]} : Expected { result ['expected' ]} , Got { result ['actual' ]} " )
115+
116+ print (f"\n Overall: { passed } /{ total } tests passed" )
117+
118+ if passed == total :
119+ print ("🎉 All tests passed! Authentication is working correctly." )
120+ return True
121+ else :
122+ print ("⚠️ Some tests failed! Authentication may not be working correctly." )
123+ return False
124+
125+ def start_test_server ():
126+ """Start the Flask app with a test token for testing"""
127+ print ("Starting test server..." )
128+
129+ # Start the server with a known valid token
130+ cmd = [
131+ 'python3' , 'app.py' ,
132+ '--token' , 'valid-test-token' ,
133+ '--username' , 'admin' ,
134+ '--password' , 'testpass'
135+ ]
136+
137+ process = subprocess .Popen (
138+ cmd ,
139+ stdout = subprocess .PIPE ,
140+ stderr = subprocess .PIPE ,
141+ cwd = '/home/runner/work/the-power/the-power/http-event-collector'
142+ )
143+
144+ # Give the server time to start
145+ time .sleep (3 )
146+
147+ return process
148+
149+ def stop_server (process ):
150+ """Stop the test server"""
151+ if process :
152+ process .terminate ()
153+ try :
154+ process .wait (timeout = 5 )
155+ except subprocess .TimeoutExpired :
156+ process .kill ()
157+ process .wait ()
158+
159+ def main ():
160+ """Main test function"""
161+ print ("🔒 HTTP Event Collector Authentication Test" )
162+ print ("Testing fix for the 'dogs accessing database' security vulnerability" )
163+ print ("=" * 70 )
164+
165+ # Start test server
166+ server_process = None
167+ try :
168+ server_process = start_test_server ()
169+
170+ # Run authentication tests
171+ success = test_authentication ()
172+
173+ return 0 if success else 1
174+
175+ except KeyboardInterrupt :
176+ print ("\n Test interrupted by user" )
177+ return 1
178+ except Exception as e :
179+ print (f"Test error: { e } " )
180+ return 1
181+ finally :
182+ if server_process :
183+ stop_server (server_process )
184+ print ("\n Test server stopped" )
185+
186+ if __name__ == "__main__" :
187+ sys .exit (main ())
0 commit comments