-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_connections.py
More file actions
306 lines (243 loc) · 10.2 KB
/
test_connections.py
File metadata and controls
306 lines (243 loc) · 10.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
#!/usr/bin/env python3
"""
test_connections.py - Network Connection Test Script
This script generates various types of network connections to test the
eBPF monitoring system. It helps verify that different risk levels
(INFO, HIGH, CRITICAL) are correctly detected and displayed.
PURPOSE:
========
When developing or demonstrating the eBPF-LLM NetSentinel system,
you need a way to generate network events for testing. This script
provides controlled, predictable network activity that triggers
different alert levels.
TEST SCENARIOS:
===============
1. NORMAL CONNECTIONS (INFO level)
- Standard HTTP/HTTPS to well-known sites
- These should appear as INFO in the dashboard
- Example: curl to example.com, google.com
2. SUSPICIOUS PORTS (HIGH level)
- Connections to commonly-exploited ports
- Telnet (23), SMB (445), RDP (3389), MongoDB (27017)
- These should appear as HIGH risk
3. RAPID CONNECTIONS (CRITICAL level)
- Many connections in quick succession
- Simulates port scanning behavior
- Should trigger CRITICAL alerts
USAGE:
======
# Run all tests
python3 test_connections.py --all
# Run specific test types
python3 test_connections.py normal # Only normal connections
python3 test_connections.py high # Only suspicious port connections
python3 test_connections.py scan # Only rapid scanning simulation
python3 test_connections.py curl # Only curl-based tests
# No arguments = run all tests (default)
python3 test_connections.py
PREREQUISITES:
==============
1. unified_ebpf.py must be running in another terminal
2. Network connectivity is required
3. curl must be installed for curl tests
EXPECTED OUTPUT:
================
In this terminal:
🔵 Testing normal connections...
Connecting to example.com:80...
In the unified_ebpf.py terminal:
[INFO] PID=12345 PROC=python DEST=93.184.216.34:80 REASON=Normal connection
[HIGH] PID=12345 PROC=python DEST=8.8.8.8:23 REASON=Suspicious port
"""
import socket
import subprocess
import sys
import time
def test_normal_connections():
"""
Test normal/benign network connections.
These connections should trigger INFO-level alerts because:
- They connect to well-known, reputable domains
- They use standard ports (80 for HTTP, 443 for HTTPS)
- They come from a normal process (python)
What the eBPF monitor sees:
[INFO] PID=xxxx PROC=python DEST=93.184.216.34:80 REASON=Normal connection
"""
print("🔵 Testing normal connections...")
# List of safe targets to connect to
# Using well-known sites that are always available
targets = [
("example.com", 80), # Simple HTTP test page
("google.com", 443), # HTTPS connection
("github.com", 443), # HTTPS connection
]
for host, port in targets:
try:
print(f" Connecting to {host}:{port}...")
# Create a TCP socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Set timeout to avoid hanging if target is unreachable
sock.settimeout(3)
# This is where the eBPF kprobe triggers!
# The kernel calls tcp_v4_connect(), and our probe catches it
sock.connect((host, port))
# Clean up the connection
sock.close()
# Small delay between connections for cleaner output
time.sleep(0.5)
except Exception as e:
print(f" Failed: {e}")
def test_suspicious_ports():
"""
Test connections to commonly-exploited ports.
These connections should trigger HIGH-level alerts because:
- Ports 23, 445, 3389, 27017 are commonly attacked
- Legitimate software rarely uses these for outbound connections
- They indicate potential lateral movement or data exfiltration
PORT MEANINGS:
23 - Telnet (unencrypted remote access, often exploited)
445 - SMB (Windows file sharing, ransomware favorite)
3389 - RDP (Windows remote desktop, brute-force target)
27017 - MongoDB (database, often left unprotected)
What the eBPF monitor sees:
[HIGH] PID=xxxx PROC=python DEST=8.8.8.8:23 REASON=Suspicious port
"""
print("\n🟠 Testing suspicious port connections...")
# These ports should trigger HIGH-level alerts
# The reason field explains why each port is suspicious
suspicious_ports = [
("telnet", 23), # Unencrypted remote shell - very suspicious
("SMB", 445), # Windows file sharing - ransomware vector
("RDP", 3389), # Remote desktop - brute force target
("MongoDB", 27017), # Database - often misconfigured/exposed
]
for name, port in suspicious_ports:
try:
print(f" Trying to connect to {name} port {port}...")
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # Short timeout since these will likely fail
# Connect to suspicious port on an external IP
# We use 8.8.8.8 (Google DNS) as a target
# The connection will fail (8.8.8.8 doesn't listen on these ports)
# BUT the connect attempt IS captured by eBPF before it fails!
sock.connect(("8.8.8.8", port))
sock.close()
except:
# Expected to fail, but connection attempt will be captured by eBPF
# The kprobe triggers BEFORE the kernel checks if host is listening
pass
time.sleep(0.3)
def test_rapid_connections():
"""
Test rapid sequential connections (simulates port scanning).
These connections should trigger CRITICAL-level alerts because:
- Connecting to many ports rapidly is scanning behavior
- Normal applications connect to 1-2 ports, not 12+
- This pattern matches nmap, masscan, or other scanners
DETECTION LOGIC:
The eBPF monitor tracks connections per source IP.
If >5 connections happen within 1 second to different ports,
it escalates to CRITICAL as likely port scanning.
What the eBPF monitor sees:
[CRITICAL] PID=xxxx PROC=python DEST=8.8.8.8:21 REASON=Port scan detected
[CRITICAL] PID=xxxx PROC=python DEST=8.8.8.8:22 REASON=Port scan detected
... (many more)
"""
print("\n🔴 Testing rapid connections (simulating scan)...")
target = "8.8.8.8" # Target IP
# Common ports that scanners check
# This list mimics what nmap's default scan would try
ports = [21, 22, 23, 25, 80, 110, 143, 443, 993, 995, 3306, 3389]
print(f" Rapidly connecting to {len(ports)} ports on {target}...")
for port in ports:
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5) # Very short timeout for speed
sock.connect((target, port))
sock.close()
except:
pass # Connections will fail, but eBPF still captures them
# Very small delay - this rapid-fire behavior triggers scan detection
# 0.1 seconds = 10 connections per second = obvious scanning
time.sleep(0.1)
def test_curl_commands():
"""
Test using the curl command-line tool.
This tests that the eBPF monitor correctly identifies the process name.
When curl runs, the monitor should show PROC=curl, not PROC=python.
WHY USE CURL:
- curl is the most common tool for HTTP requests
- It creates a separate process (tests process detection)
- Users often use curl for testing, so it's realistic
What the eBPF monitor sees:
[INFO] PID=xxxx PROC=curl DEST=93.184.216.34:80 REASON=Normal connection
"""
print("\n🌐 Testing with curl...")
# URLs to test
# Using HTTPS and HTTP to test both port 443 and 80
urls = [
"http://example.com", # HTTP (port 80)
"https://httpbin.org/get", # HTTPS (port 443) - returns JSON
"https://api.github.com", # HTTPS (port 443) - GitHub API
]
for url in urls:
print(f" curl {url}...")
# Run curl as a subprocess
# -s = silent (no progress bar)
# -o /dev/null = discard output (we just want the connection)
# --max-time 3 = timeout after 3 seconds
subprocess.run(
["curl", "-s", "-o", "/dev/null", "--max-time", "3", url], capture_output=True
)
time.sleep(0.5)
def main():
"""
Main entry point - parses arguments and runs appropriate tests.
ARGUMENT HANDLING:
--all : Run all test types sequentially
normal : Only run normal connection tests
high : Only run suspicious port tests
scan : Only run rapid connection (scan simulation) tests
curl : Only run curl-based tests
(none) : Run all tests (same as --all)
"""
# Print banner
print("=" * 50)
print("🧪 eBPF Network Monitoring Test Script")
print("=" * 50)
print("\nPlease ensure unified_ebpf.py is running!\n")
# Check for --all flag (explicit "run everything")
if len(sys.argv) > 1 and sys.argv[1] == "--all":
# Run all tests in order
test_normal_connections()
test_suspicious_ports()
test_rapid_connections()
test_curl_commands()
else:
# Show usage help
print("Usage:")
print(" python3 test_connections.py --all # Run all tests")
print(" python3 test_connections.py normal # Normal connections only")
print(" python3 test_connections.py high # Suspicious ports")
print(" python3 test_connections.py scan # Simulate scan")
print()
# Check for specific test type
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == "normal":
test_normal_connections()
elif arg == "high":
test_suspicious_ports()
elif arg == "scan":
test_rapid_connections()
elif arg == "curl":
test_curl_commands()
else:
# Default: run all
test_normal_connections()
test_suspicious_ports()
test_rapid_connections()
# Done!
print("\n✅ Testing complete! Check eBPF output and Dashboard.")
if __name__ == "__main__":
main()