Skip to content

Commit 5890982

Browse files
authored
check port with bind over connect_ex (#5564)
* check port with bind over connect_ex * add tests and overflow handling * dang it darglint * that was basically the same test
1 parent 3c687f4 commit 5890982

File tree

2 files changed

+177
-4
lines changed

2 files changed

+177
-4
lines changed

reflex/utils/processes.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ def get_num_workers() -> int:
5353
return (os.cpu_count() or 1) * 2 + 1
5454

5555

56-
def _is_address_responsive(
56+
def _can_bind_at_port(
5757
address_family: socket.AddressFamily | int, address: str, port: int
5858
) -> bool:
5959
"""Check if a given address and port are responsive.
@@ -68,9 +68,14 @@ def _is_address_responsive(
6868
"""
6969
try:
7070
with closing(socket.socket(address_family, socket.SOCK_STREAM)) as sock:
71-
return sock.connect_ex((address, port)) == 0
71+
sock.bind((address, port))
72+
except OverflowError:
73+
return False
74+
except PermissionError:
75+
return False
7276
except OSError:
7377
return False
78+
return True
7479

7580

7681
def is_process_on_port(port: int) -> bool:
@@ -82,9 +87,9 @@ def is_process_on_port(port: int) -> bool:
8287
Returns:
8388
Whether a process is running on the given port.
8489
"""
85-
return _is_address_responsive( # Test IPv4 localhost (127.0.0.1)
90+
return not _can_bind_at_port( # Test IPv4 localhost (127.0.0.1)
8691
socket.AF_INET, "127.0.0.1", port
87-
) or _is_address_responsive(
92+
) or not _can_bind_at_port(
8893
socket.AF_INET6, "::1", port
8994
) # Test IPv6 localhost (::1)
9095

@@ -99,8 +104,15 @@ def change_port(port: int, _type: str) -> int:
99104
Returns:
100105
The new port.
101106
107+
Raises:
108+
Exit: If the port is invalid or if the new port is occupied.
102109
"""
103110
new_port = port + 1
111+
if new_port < 0 or new_port > 65535:
112+
console.error(
113+
f"The {_type} port: {port} is invalid. It must be between 0 and 65535."
114+
)
115+
raise click.exceptions.Exit(1)
104116
if is_process_on_port(new_port):
105117
return change_port(new_port, _type)
106118
console.info(
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
"""Test process utilities."""
2+
3+
import socket
4+
import threading
5+
import time
6+
from contextlib import closing
7+
from unittest import mock
8+
9+
import pytest
10+
11+
from reflex.utils.processes import is_process_on_port
12+
13+
14+
def test_is_process_on_port_free_port():
15+
"""Test is_process_on_port returns False when port is free."""
16+
# Find a free port
17+
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
18+
sock.bind(("127.0.0.1", 0))
19+
free_port = sock.getsockname()[1]
20+
21+
# Port should be free after socket is closed
22+
assert not is_process_on_port(free_port)
23+
24+
25+
def test_is_process_on_port_occupied_port():
26+
"""Test is_process_on_port returns True when port is occupied."""
27+
# Create a server socket to occupy a port
28+
server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
29+
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
30+
server_socket.bind(("127.0.0.1", 0))
31+
server_socket.listen(1)
32+
33+
occupied_port = server_socket.getsockname()[1]
34+
35+
try:
36+
# Port should be occupied
37+
assert is_process_on_port(occupied_port)
38+
finally:
39+
server_socket.close()
40+
41+
42+
def test_is_process_on_port_ipv6():
43+
"""Test is_process_on_port works with IPv6."""
44+
# Test with IPv6 socket
45+
try:
46+
server_socket = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
47+
server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
48+
server_socket.bind(("::1", 0))
49+
server_socket.listen(1)
50+
51+
occupied_port = server_socket.getsockname()[1]
52+
53+
try:
54+
# Port should be occupied on IPv6
55+
assert is_process_on_port(occupied_port)
56+
finally:
57+
server_socket.close()
58+
except OSError:
59+
# IPv6 might not be available on some systems
60+
pytest.skip("IPv6 not available on this system")
61+
62+
63+
def test_is_process_on_port_both_protocols():
64+
"""Test is_process_on_port detects occupation on either IPv4 or IPv6."""
65+
# Create IPv4 server
66+
ipv4_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
67+
ipv4_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
68+
ipv4_socket.bind(("127.0.0.1", 0))
69+
ipv4_socket.listen(1)
70+
71+
port = ipv4_socket.getsockname()[1]
72+
73+
try:
74+
# Should detect IPv4 occupation
75+
assert is_process_on_port(port)
76+
finally:
77+
ipv4_socket.close()
78+
79+
80+
@pytest.mark.parametrize("port", [0, 1, 80, 443, 8000, 3000, 65535])
81+
def test_is_process_on_port_various_ports(port):
82+
"""Test is_process_on_port with various port numbers.
83+
84+
Args:
85+
port: The port number to test.
86+
"""
87+
# This test just ensures the function doesn't crash with different port numbers
88+
# The actual result depends on what's running on the system
89+
result = is_process_on_port(port)
90+
assert isinstance(result, bool)
91+
92+
93+
def test_is_process_on_port_mock_socket_error():
94+
"""Test is_process_on_port handles socket errors gracefully."""
95+
with mock.patch("socket.socket") as mock_socket:
96+
mock_socket_instance = mock.MagicMock()
97+
mock_socket.return_value = mock_socket_instance
98+
mock_socket_instance.__enter__.return_value = mock_socket_instance
99+
mock_socket_instance.bind.side_effect = OSError("Mock socket error")
100+
101+
# Should return True when socket operations fail
102+
result = is_process_on_port(8080)
103+
assert result is True
104+
105+
106+
def test_is_process_on_port_permission_error():
107+
"""Test is_process_on_port handles permission errors."""
108+
with mock.patch("socket.socket") as mock_socket:
109+
mock_socket_instance = mock.MagicMock()
110+
mock_socket.return_value = mock_socket_instance
111+
mock_socket_instance.__enter__.return_value = mock_socket_instance
112+
mock_socket_instance.bind.side_effect = PermissionError("Permission denied")
113+
114+
# Should return True when permission is denied (can't bind = port is "occupied")
115+
result = is_process_on_port(80)
116+
assert result is True
117+
118+
119+
@pytest.mark.parametrize("should_listen", [True, False])
120+
def test_is_process_on_port_concurrent_access(should_listen):
121+
"""Test is_process_on_port works correctly with concurrent access.
122+
123+
Args:
124+
should_listen: Whether the server socket should call listen() or just bind().
125+
"""
126+
127+
def create_server_and_test(port_holder, listen):
128+
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
129+
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
130+
server.bind(("127.0.0.1", 0))
131+
132+
if listen:
133+
server.listen(1)
134+
135+
port = server.getsockname()[1]
136+
port_holder[0] = port
137+
138+
# Small delay to ensure the test runs while server is active
139+
time.sleep(0.1)
140+
server.close()
141+
142+
port_holder = [None]
143+
thread = threading.Thread(
144+
target=create_server_and_test, args=(port_holder, should_listen)
145+
)
146+
thread.start()
147+
148+
# Wait a bit for the server to start
149+
time.sleep(0.05)
150+
151+
if port_holder[0] is not None:
152+
# Port should be occupied while server is running (both bound-only and listening)
153+
assert is_process_on_port(port_holder[0])
154+
155+
thread.join()
156+
157+
# After thread ends and server closes, port should be free
158+
if port_holder[0] is not None:
159+
# Give it a moment for the socket to be fully released
160+
time.sleep(0.1)
161+
assert not is_process_on_port(port_holder[0])

0 commit comments

Comments
 (0)