Skip to content

Commit ec2016d

Browse files
committed
Intermediate changes
commit_hash:daafe068648b19e96deff7c1465454d8b89b76d9
1 parent 900888b commit ec2016d

File tree

4 files changed

+305
-275
lines changed

4 files changed

+305
-275
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
# coding=utf-8
2+
3+
import os
4+
import errno
5+
import socket
6+
import random
7+
import logging
8+
import platform
9+
import threading
10+
11+
import six
12+
13+
UI16MAXVAL = (1 << 16) - 1
14+
logger = logging.getLogger(__name__)
15+
16+
17+
class PortManagerException(Exception):
18+
pass
19+
20+
21+
class PortManager(object):
22+
"""
23+
Port manager for dynamic port allocation.
24+
25+
Allocates free TCP/UDP ports while avoiding system ephemeral port ranges.
26+
Supports file-based synchronization for multi-process environments.
27+
28+
Example:
29+
with PortManager() as pm:
30+
port = pm.get_port()
31+
32+
For yatest usage documentation:
33+
https://wiki.yandex-team.ru/yatool/test/#python-acquire-ports
34+
"""
35+
36+
def __init__(self, sync_dir=None):
37+
self._sync_dir = sync_dir or os.environ.get('PORT_SYNC_PATH')
38+
if self._sync_dir:
39+
_makedirs(self._sync_dir)
40+
41+
self._valid_range = get_valid_port_range()
42+
self._valid_port_count = self._count_valid_ports()
43+
self._filelocks = {}
44+
self._lock = threading.Lock()
45+
46+
def __enter__(self):
47+
return self
48+
49+
def __exit__(self, type, value, traceback):
50+
self.release()
51+
52+
def get_port(self, port=0):
53+
'''
54+
Gets free TCP port
55+
'''
56+
return self.get_tcp_port(port)
57+
58+
def get_tcp_port(self, port=0):
59+
'''
60+
Gets free TCP port
61+
'''
62+
return self._get_port(port, socket.SOCK_STREAM)
63+
64+
def get_udp_port(self, port=0):
65+
'''
66+
Gets free UDP port
67+
'''
68+
return self._get_port(port, socket.SOCK_DGRAM)
69+
70+
def get_tcp_and_udp_port(self, port=0):
71+
'''
72+
Gets one free port for use in both TCP and UDP protocols
73+
'''
74+
if port and self._no_random_ports():
75+
return port
76+
77+
retries = 20
78+
while retries > 0:
79+
retries -= 1
80+
81+
result_port = self.get_tcp_port()
82+
if not self.is_port_free(result_port, socket.SOCK_DGRAM):
83+
self.release_port(result_port)
84+
# Don't try to _capture_port(), it's already captured in the get_tcp_port()
85+
return result_port
86+
raise Exception('Failed to find port')
87+
88+
def release_port(self, port):
89+
with self._lock:
90+
self._release_port_no_lock(port)
91+
92+
def _release_port_no_lock(self, port):
93+
filelock = self._filelocks.pop(port, None)
94+
if filelock:
95+
filelock.release()
96+
97+
def release(self):
98+
with self._lock:
99+
while self._filelocks:
100+
_, filelock = self._filelocks.popitem()
101+
if filelock:
102+
filelock.release()
103+
104+
def get_port_range(self, start_port, count, random_start=True):
105+
assert count > 0
106+
if start_port and self._no_random_ports():
107+
return start_port
108+
109+
candidates = []
110+
111+
def drop_candidates():
112+
for port in candidates:
113+
self._release_port_no_lock(port)
114+
candidates[:] = []
115+
116+
with self._lock:
117+
for attempts in six.moves.range(128):
118+
for left, right in self._valid_range:
119+
if right - left < count:
120+
continue
121+
122+
if random_start:
123+
start = random.randint(left, right - ((right - left) // 2))
124+
else:
125+
start = left
126+
for probe_port in six.moves.range(start, right):
127+
if self._capture_port_no_lock(probe_port, socket.SOCK_STREAM):
128+
candidates.append(probe_port)
129+
else:
130+
drop_candidates()
131+
132+
if len(candidates) == count:
133+
return candidates[0]
134+
# Can't find required number of ports without gap in the current range
135+
drop_candidates()
136+
137+
raise PortManagerException(
138+
"Failed to find valid port range (start_port: {} count: {}) (range: {} used: {})".format(
139+
start_port, count, self._valid_range, self._filelocks
140+
)
141+
)
142+
143+
def _count_valid_ports(self):
144+
res = 0
145+
for left, right in self._valid_range:
146+
res += right - left
147+
assert res, ('There are no available valid ports', self._valid_range)
148+
return res
149+
150+
def _get_port(self, port, sock_type):
151+
if port and self._no_random_ports():
152+
return port
153+
154+
if len(self._filelocks) >= self._valid_port_count:
155+
raise PortManagerException("All valid ports are taken ({}): {}".format(self._valid_range, self._filelocks))
156+
157+
salt = random.randint(0, UI16MAXVAL)
158+
for attempt in six.moves.range(self._valid_port_count):
159+
probe_port = (salt + attempt) % self._valid_port_count
160+
161+
for left, right in self._valid_range:
162+
if probe_port >= (right - left):
163+
probe_port -= right - left
164+
else:
165+
probe_port += left
166+
break
167+
if not self._capture_port(probe_port, sock_type):
168+
continue
169+
return probe_port
170+
171+
raise PortManagerException(
172+
"Failed to find valid port (range: {} used: {})".format(self._valid_range, self._filelocks)
173+
)
174+
175+
def _capture_port(self, port, sock_type):
176+
with self._lock:
177+
return self._capture_port_no_lock(port, sock_type)
178+
179+
def is_port_free(self, port, sock_type=socket.SOCK_STREAM):
180+
sock = socket.socket(socket.AF_INET6, sock_type)
181+
if os.name == 'nt' and hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
182+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
183+
else:
184+
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
185+
try:
186+
sock.bind(('::', port))
187+
except socket.error as e:
188+
if e.errno == errno.EADDRINUSE:
189+
return False
190+
raise
191+
finally:
192+
sock.close()
193+
return True
194+
195+
def _capture_port_no_lock(self, port, sock_type):
196+
if port in self._filelocks:
197+
return False
198+
199+
filelock = None
200+
if self._sync_dir:
201+
# Lazy import to keep module hermetic and work without Arcadia Python
202+
# (PYTEST_SCRIPT mode with USE_ARCADIA_PYTHON=no)
203+
import library.python.filelock
204+
205+
filelock = library.python.filelock.FileLock(os.path.join(self._sync_dir, str(port)))
206+
if not filelock.acquire(blocking=False):
207+
return False
208+
if self.is_port_free(port, sock_type):
209+
self._filelocks[port] = filelock
210+
return True
211+
else:
212+
filelock.release()
213+
return False
214+
215+
if self.is_port_free(port, sock_type):
216+
self._filelocks[port] = filelock
217+
return True
218+
if filelock:
219+
filelock.release()
220+
return False
221+
222+
def _no_random_ports(self):
223+
return os.environ.get("NO_RANDOM_PORTS")
224+
225+
226+
def get_valid_port_range():
227+
first_valid = 1025
228+
last_valid = UI16MAXVAL
229+
230+
given_range = os.environ.get('VALID_PORT_RANGE')
231+
if given_range and ':' in given_range:
232+
return [list(int(x) for x in given_range.split(':', 2))]
233+
234+
first_eph, last_eph = get_ephemeral_range()
235+
first_invalid = max(first_eph, first_valid)
236+
last_invalid = min(last_eph, last_valid)
237+
238+
ranges = []
239+
if first_invalid > first_valid:
240+
ranges.append((first_valid, first_invalid - 1))
241+
if last_invalid < last_valid:
242+
ranges.append((last_invalid + 1, last_valid))
243+
return ranges
244+
245+
246+
def get_ephemeral_range():
247+
if platform.system() == 'Linux':
248+
filename = "/proc/sys/net/ipv4/ip_local_port_range"
249+
if os.path.exists(filename):
250+
with open(filename) as afile:
251+
data = afile.read(1024) # fix for musl
252+
port_range = tuple(map(int, data.strip().split()))
253+
if len(port_range) == 2:
254+
return port_range
255+
else:
256+
logger.warning("Bad ip_local_port_range format: '%s'. Going to use IANA suggestion", data)
257+
elif platform.system() == 'Darwin':
258+
first = _sysctlbyname_uint("net.inet.ip.portrange.first")
259+
last = _sysctlbyname_uint("net.inet.ip.portrange.last")
260+
if first and last:
261+
return first, last
262+
# IANA suggestion
263+
return (1 << 15) + (1 << 14), UI16MAXVAL
264+
265+
266+
def _sysctlbyname_uint(name):
267+
try:
268+
from ctypes import CDLL, c_uint, byref
269+
from ctypes.util import find_library
270+
except ImportError:
271+
return
272+
273+
libc = CDLL(find_library("c"))
274+
size = c_uint(0)
275+
res = c_uint(0)
276+
libc.sysctlbyname(name, None, byref(size), None, 0)
277+
libc.sysctlbyname(name, byref(res), byref(size), None, 0)
278+
return res.value
279+
280+
281+
def _makedirs(path):
282+
try:
283+
os.makedirs(path)
284+
except OSError as e:
285+
if e.errno == errno.EEXIST:
286+
return
287+
raise
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
PY23_LIBRARY()
2+
PY_SRCS(
3+
__init__.py
4+
)
5+
PEERDIR(
6+
contrib/python/six
7+
library/python/filelock
8+
)
9+
STYLE_PYTHON()
10+
END()

library/python/testing/yatest_common/ya.make

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ PEERDIR(
2626
library/python/cores
2727
library/python/filelock
2828
library/python/fs
29+
library/python/port_manager
2930
library/python/testing/yatest_lib
3031
)
3132

0 commit comments

Comments
 (0)