Skip to content

Commit 46c942f

Browse files
Hook UpstreamConnectionPool lifecycle within Threadless (#921)
* Hook connection pool lifecycle within threadless * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Fix test * Fix spell Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 263c067 commit 46c942f

File tree

9 files changed

+142
-75
lines changed

9 files changed

+142
-75
lines changed

proxy/core/acceptor/threadless.py

Lines changed: 60 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def __init__(
8888
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT
8989
self._total: int = 0
9090
self._upstream_conn_pool: Optional[UpstreamConnectionPool] = None
91+
self._upstream_conn_filenos: Set[int] = set()
9192
if self.flags.enable_conn_pool:
9293
self._upstream_conn_pool = UpstreamConnectionPool()
9394

@@ -176,14 +177,25 @@ async def _update_work_events(self, work_id: int) -> None:
176177
data=work_id,
177178
)
178179
self.registered_events_by_work_ids[work_id][fileno] = mask
179-
# logger.debug(
180-
# 'fd#{0} modified for mask#{1} by work#{2}'.format(
181-
# fileno, mask, work_id,
182-
# ),
183-
# )
180+
logger.debug(
181+
'fd#{0} modified for mask#{1} by work#{2}'.format(
182+
fileno, mask, work_id,
183+
),
184+
)
184185
# else:
185186
# logger.info(
186187
# 'fd#{0} by work#{1} not modified'.format(fileno, work_id))
188+
elif fileno in self._upstream_conn_filenos:
189+
# Descriptor offered by work, but is already registered by connection pool
190+
# Most likely because work has acquired a reusable connection.
191+
self.selector.modify(fileno, events=mask, data=work_id)
192+
self.registered_events_by_work_ids[work_id][fileno] = mask
193+
self._upstream_conn_filenos.remove(fileno)
194+
logger.debug(
195+
'fd#{0} borrowed with mask#{1} by work#{2}'.format(
196+
fileno, mask, work_id,
197+
),
198+
)
187199
# Can throw ValueError: Invalid file descriptor: -1
188200
#
189201
# A guard within Work classes may not help here due to
@@ -193,16 +205,33 @@ async def _update_work_events(self, work_id: int) -> None:
193205
#
194206
# TODO: Also remove offending work from pool to avoid spin loop.
195207
elif fileno != -1:
196-
self.selector.register(
197-
fileno, events=mask,
198-
data=work_id,
199-
)
208+
self.selector.register(fileno, events=mask, data=work_id)
200209
self.registered_events_by_work_ids[work_id][fileno] = mask
201-
# logger.debug(
202-
# 'fd#{0} registered for mask#{1} by work#{2}'.format(
203-
# fileno, mask, work_id,
204-
# ),
205-
# )
210+
logger.debug(
211+
'fd#{0} registered for mask#{1} by work#{2}'.format(
212+
fileno, mask, work_id,
213+
),
214+
)
215+
216+
async def _update_conn_pool_events(self) -> None:
217+
if not self._upstream_conn_pool:
218+
return
219+
assert self.selector is not None
220+
new_conn_pool_events = await self._upstream_conn_pool.get_events()
221+
old_conn_pool_filenos = self._upstream_conn_filenos.copy()
222+
self._upstream_conn_filenos.clear()
223+
new_conn_pool_filenos = set(new_conn_pool_events.keys())
224+
new_conn_pool_filenos.difference_update(old_conn_pool_filenos)
225+
for fileno in new_conn_pool_filenos:
226+
self.selector.register(
227+
fileno,
228+
events=new_conn_pool_events[fileno],
229+
data=0,
230+
)
231+
self._upstream_conn_filenos.add(fileno)
232+
old_conn_pool_filenos.difference_update(self._upstream_conn_filenos)
233+
for fileno in old_conn_pool_filenos:
234+
self.selector.unregister(fileno)
206235

207236
async def _update_selector(self) -> None:
208237
assert self.selector is not None
@@ -215,6 +244,7 @@ async def _update_selector(self) -> None:
215244
if work_id in unfinished_work_ids:
216245
continue
217246
await self._update_work_events(work_id)
247+
await self._update_conn_pool_events()
218248

219249
async def _selected_events(self) -> Tuple[
220250
Dict[int, Tuple[Readables, Writables]],
@@ -235,9 +265,6 @@ async def _selected_events(self) -> Tuple[
235265
"""
236266
assert self.selector is not None
237267
await self._update_selector()
238-
events = self.selector.select(
239-
timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT,
240-
)
241268
# Keys are work_id and values are 2-tuple indicating
242269
# readables & writables that work_id is interested in
243270
# and are ready for IO.
@@ -248,6 +275,11 @@ async def _selected_events(self) -> Tuple[
248275
# When ``work_queue_fileno`` returns None,
249276
# always return True for the boolean value.
250277
new_work_available = True
278+
279+
events = self.selector.select(
280+
timeout=DEFAULT_SELECTOR_SELECT_TIMEOUT,
281+
)
282+
251283
for key, mask in events:
252284
if not new_work_available and wqfileno is not None and key.fileobj == wqfileno:
253285
assert mask & selectors.EVENT_READ
@@ -302,9 +334,17 @@ def _create_tasks(
302334
assert self.loop
303335
tasks: Set['asyncio.Task[bool]'] = set()
304336
for work_id in work_by_ids:
305-
task = self.loop.create_task(
306-
self.works[work_id].handle_events(*work_by_ids[work_id]),
307-
)
337+
if work_id == 0:
338+
assert self._upstream_conn_pool
339+
task = self.loop.create_task(
340+
self._upstream_conn_pool.handle_events(
341+
*work_by_ids[work_id],
342+
),
343+
)
344+
else:
345+
task = self.loop.create_task(
346+
self.works[work_id].handle_events(*work_by_ids[work_id]),
347+
)
308348
task._work_id = work_id # type: ignore[attr-defined]
309349
# task.set_name(work_id)
310350
tasks.add(task)

proxy/core/connection/pool.py

Lines changed: 53 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
1313
reusability
1414
"""
15+
import socket
1516
import logging
1617
import selectors
1718

@@ -45,11 +46,19 @@ class UpstreamConnectionPool(Work[TcpServerConnection]):
4546
A separate pool is maintained for each upstream server.
4647
So internally, it's a pool of pools.
4748
48-
TODO: Listen for read events from the connections
49-
to remove them from the pool when peer closes the
50-
connection. This can also be achieved lazily by
51-
the pool users. Example, if acquired connection
52-
is stale, reacquire.
49+
Internal data structure maintains references to connection objects
50+
that pool owns or has borrowed. Borrowed connections are marked as
51+
NOT reusable.
52+
53+
For reusable connections only, pool listens for read events
54+
to detect broken connections. This can happen if pool has opened
55+
a connection, which was never used and eventually reaches
56+
upstream server timeout limit.
57+
58+
When a borrowed connection is returned back to the pool,
59+
the connection is marked as reusable again. However, if
60+
returned connection has already been closed, it is removed
61+
from the internal data structure.
5362
5463
TODO: Ideally, `UpstreamConnectionPool` must be shared across
5564
all cores to make SSL session cache to also work
@@ -60,29 +69,25 @@ class UpstreamConnectionPool(Work[TcpServerConnection]):
6069
session cache, session ticket, abbr TLS handshake
6170
and other necessary features to make it work.
6271
63-
NOTE: However, for all HTTP only connections, `UpstreamConnectionPool`
64-
can be used to save upon connection setup time and
65-
speed-up performance of requests.
72+
NOTE: However, currently for all HTTP only upstream connections,
73+
`UpstreamConnectionPool` can be used to remove slow starts.
6674
"""
6775

6876
def __init__(self) -> None:
69-
# Pools of connection per upstream server
7077
self.connections: Dict[int, TcpServerConnection] = {}
7178
self.pools: Dict[Tuple[str, int], Set[TcpServerConnection]] = {}
7279

7380
def add(self, addr: Tuple[str, int]) -> TcpServerConnection:
74-
# Create new connection
81+
"""Creates and add a new connection to the pool."""
7582
new_conn = TcpServerConnection(addr[0], addr[1])
7683
new_conn.connect()
77-
if addr not in self.pools:
78-
self.pools[addr] = set()
79-
self.pools[addr].add(new_conn)
80-
self.connections[new_conn.connection.fileno()] = new_conn
84+
self._add(new_conn)
8185
return new_conn
8286

8387
def acquire(self, addr: Tuple[str, int]) -> Tuple[bool, TcpServerConnection]:
84-
"""Returns a connection for use with the server."""
85-
# Return a reusable connection if available
88+
"""Returns a reusable connection from the pool.
89+
90+
If none exists, will create and return a new connection."""
8691
if addr in self.pools:
8792
for old_conn in self.pools[addr]:
8893
if old_conn.is_reusable():
@@ -102,40 +107,63 @@ def acquire(self, addr: Tuple[str, int]) -> Tuple[bool, TcpServerConnection]:
102107
return True, new_conn
103108

104109
def release(self, conn: TcpServerConnection) -> None:
105-
"""Release the connection.
110+
"""Release a previously acquired connection.
106111
107112
If the connection has not been closed,
108113
then it will be retained in the pool for reusability.
109114
"""
115+
assert not conn.is_reusable()
110116
if conn.closed:
111117
logger.debug(
112118
'Removing connection#{2} from pool from upstream {0}:{1}'.format(
113119
conn.addr[0], conn.addr[1], id(conn),
114120
),
115121
)
116-
self.pools[conn.addr].remove(conn)
122+
self._remove(conn.connection.fileno())
117123
else:
118124
logger.debug(
119125
'Retaining connection#{2} to upstream {0}:{1}'.format(
120126
conn.addr[0], conn.addr[1], id(conn),
121127
),
122128
)
123-
assert not conn.is_reusable()
124129
# Reset for reusability
125130
conn.reset()
126131

127132
async def get_events(self) -> Dict[int, int]:
133+
"""Returns read event flag for all reusable connections in the pool."""
128134
events = {}
129135
for connections in self.pools.values():
130136
for conn in connections:
131-
events[conn.connection.fileno()] = selectors.EVENT_READ
137+
if conn.is_reusable():
138+
events[conn.connection.fileno()] = selectors.EVENT_READ
132139
return events
133140

134141
async def handle_events(self, readables: Readables, _writables: Writables) -> bool:
135-
for r in readables:
142+
"""Removes reusable connection from the pool.
143+
144+
When pool is the owner of connection, we don't expect a read event from upstream
145+
server. A read event means either upstream closed the connection or connection
146+
has somehow reached an illegal state e.g. upstream sending data for previous
147+
connection acquisition lifecycle."""
148+
for fileno in readables:
136149
if TYPE_CHECKING:
137-
assert isinstance(r, int)
138-
conn = self.connections[r]
139-
self.pools[conn.addr].remove(conn)
140-
del self.connections[r]
150+
assert isinstance(fileno, int)
151+
logger.debug('Upstream fd#{0} is read ready'.format(fileno))
152+
self._remove(fileno)
141153
return False
154+
155+
def _add(self, conn: TcpServerConnection) -> None:
156+
"""Adds a new connection to internal data structure."""
157+
if conn.addr not in self.pools:
158+
self.pools[conn.addr] = set()
159+
self.pools[conn.addr].add(conn)
160+
self.connections[conn.connection.fileno()] = conn
161+
162+
def _remove(self, fileno: int) -> None:
163+
"""Remove a connection by descriptor from the internal data structure."""
164+
conn = self.connections[fileno]
165+
logger.debug('Removing conn#{0} from pool'.format(id(conn)))
166+
conn.connection.shutdown(socket.SHUT_WR)
167+
conn.close()
168+
self.pools[conn.addr].remove(conn)
169+
del self.connections[fileno]

proxy/core/connection/server.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,11 @@ def connect(
3939
addr: Optional[Tuple[str, int]] = None,
4040
source_address: Optional[Tuple[str, int]] = None,
4141
) -> None:
42-
if self._conn is None:
43-
self._conn = new_socket_connection(
44-
addr or self.addr, source_address=source_address,
45-
)
46-
self.closed = False
42+
assert self._conn is None
43+
self._conn = new_socket_connection(
44+
addr or self.addr, source_address=source_address,
45+
)
46+
self.closed = False
4747

4848
def wrap(self, hostname: str, ca_file: Optional[str]) -> None:
4949
ctx = ssl.create_default_context(

proxy/http/handler.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ def initialize(self) -> None:
103103
self.upstream_conn_pool,
104104
)
105105
self.plugins[instance.name()] = instance
106-
logger.debug('Handling connection %r' % self.work.connection)
106+
logger.debug('Handling connection %s' % self.work.address)
107107

108108
def is_inactive(self) -> bool:
109109
if not self.work.has_buffer() and \
@@ -123,9 +123,8 @@ def shutdown(self) -> None:
123123
for plugin in self.plugins.values():
124124
plugin.on_client_connection_close()
125125
logger.debug(
126-
'Closing client connection %r '
127-
'at address %s has buffer %s' %
128-
(self.work.connection, self.work.address, self.work.has_buffer()),
126+
'Closing client connection %s has buffer %s' %
127+
(self.work.address, self.work.has_buffer()),
129128
)
130129
conn = self.work.connection
131130
# Unwrap if wrapped before shutdown.
@@ -247,7 +246,7 @@ def handle_data(self, data: memoryview) -> Optional[bool]:
247246

248247
async def handle_writables(self, writables: Writables) -> bool:
249248
if self.work.connection.fileno() in writables and self.work.has_buffer():
250-
logger.debug('Client is ready for writes, flushing buffer')
249+
logger.debug('Client is write ready, flushing...')
251250
self.last_activity = time.time()
252251

253252
# TODO(abhinavsingh): This hook could just reside within server recv block
@@ -277,7 +276,7 @@ async def handle_writables(self, writables: Writables) -> bool:
277276

278277
async def handle_readables(self, readables: Readables) -> bool:
279278
if self.work.connection.fileno() in readables:
280-
logger.debug('Client is ready for reads, reading')
279+
logger.debug('Client is read ready, receiving...')
281280
self.last_activity = time.time()
282281
try:
283282
teardown = await super().handle_readables(readables)

proxy/http/parser/parser.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def __init__(
7777
self.total_size: int = 0
7878
# Buffer to hold unprocessed bytes
7979
self.buffer: bytes = b''
80-
# Internal headers datastructure:
80+
# Internal headers data structure:
8181
# - Keys are lower case header names.
8282
# - Values are 2-tuple containing original
8383
# header and it's value as received.

proxy/http/proxy/server.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ async def write_to_descriptors(self, w: Writables) -> bool:
217217
self.upstream and not self.upstream.closed and \
218218
self.upstream.has_buffer() and \
219219
self.upstream.connection.fileno() in w:
220-
logger.debug('Server is write ready, flushing buffer')
220+
logger.debug('Server is write ready, flushing...')
221221
try:
222222
self.upstream.flush()
223223
except ssl.SSLWantWriteError:
@@ -254,7 +254,7 @@ async def read_from_descriptors(self, r: Readables) -> bool:
254254
and self.upstream \
255255
and not self.upstream.closed \
256256
and self.upstream.connection.fileno() in r:
257-
logger.debug('Server is ready for reads, reading...')
257+
logger.debug('Server is read ready, receiving...')
258258
try:
259259
raw = self.upstream.recv(self.flags.server_recvbuf_size)
260260
except TimeoutError as e:
@@ -401,7 +401,7 @@ def on_client_connection_close(self) -> None:
401401
pass
402402
finally:
403403
# TODO: Unwrap if wrapped before close?
404-
self.upstream.connection.close()
404+
self.upstream.close()
405405
except TcpConnectionUninitializedException:
406406
pass
407407
finally:
@@ -587,10 +587,6 @@ def connect_upstream(self) -> None:
587587
host, port = self.request.host, self.request.port
588588
if host and port:
589589
try:
590-
logger.debug(
591-
'Connecting to upstream %s:%d' %
592-
(text_(host), port),
593-
)
594590
# Invoke plugin.resolve_dns
595591
upstream_ip, source_addr = None, None
596592
for plugin in self.plugins.values():
@@ -599,6 +595,10 @@ def connect_upstream(self) -> None:
599595
)
600596
if upstream_ip or source_addr:
601597
break
598+
logger.debug(
599+
'Connecting to upstream %s:%d' %
600+
(text_(host), port),
601+
)
602602
if self.flags.enable_conn_pool:
603603
assert self.upstream_conn_pool
604604
with self.lock:

proxy/plugin/proxy_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ def before_upstream_connection(
121121
#
122122
# Failing upstream proxies, must be removed from the pool temporarily.
123123
# A periodic health check must put them back in the pool. This can be achieved
124-
# using a datastructure without having to spawn separate thread/process for health
124+
# using a data structure without having to spawn separate thread/process for health
125125
# check.
126126
raise HttpProtocolException(
127127
'Connection refused by upstream proxy {0}:{1}'.format(

0 commit comments

Comments
 (0)