Skip to content

Commit f6214a4

Browse files
authored
Move UpstreamConnectionPool lifecycle within Threadless (#917)
* Tie connection pool into Threadless * Pass upstream conn pool reference to work instances * Mark upstream conn pool as optional * spellcheck * Fix unused import
1 parent ea66280 commit f6214a4

File tree

12 files changed

+50
-23
lines changed

12 files changed

+50
-23
lines changed

docs/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,7 @@
300300
(_py_class_role, 'unittest.case.TestCase'),
301301
(_py_class_role, 'unittest.result.TestResult'),
302302
(_py_class_role, 'UUID'),
303+
(_py_class_role, 'UpstreamConnectionPool'),
303304
(_py_class_role, 'Url'),
304305
(_py_class_role, 'WebsocketFrame'),
305306
(_py_class_role, 'Work'),

proxy/core/acceptor/executors.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ def start_threaded_work(
131131
TcpClientConnection(conn, addr),
132132
flags=flags,
133133
event_queue=event_queue,
134+
upstream_conn_pool=None,
134135
)
135136
# TODO: Keep reference to threads and join during shutdown.
136137
# This will ensure connections are not abruptly closed on shutdown

proxy/core/acceptor/threadless.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@
2525
from ...common.constants import DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT, DEFAULT_SELECTOR_SELECT_TIMEOUT
2626
from ...common.constants import DEFAULT_WAIT_FOR_TASKS_TIMEOUT
2727

28-
from ..connection import TcpClientConnection
28+
from ..connection import TcpClientConnection, UpstreamConnectionPool
2929
from ..event import eventNames, EventQueue
3030

3131
from .work import Work
@@ -87,6 +87,9 @@ def __init__(
8787
self.wait_timeout: float = DEFAULT_WAIT_FOR_TASKS_TIMEOUT
8888
self.cleanup_inactive_timeout: float = DEFAULT_INACTIVE_CONN_CLEANUP_TIMEOUT
8989
self._total: int = 0
90+
self._upstream_conn_pool: Optional[UpstreamConnectionPool] = None
91+
if self.flags.enable_conn_pool:
92+
self._upstream_conn_pool = UpstreamConnectionPool()
9093

9194
@property
9295
@abstractmethod
@@ -134,6 +137,7 @@ def work_on_tcp_conn(
134137
flags=self.flags,
135138
event_queue=self.event_queue,
136139
uid=uid,
140+
upstream_conn_pool=self._upstream_conn_pool,
137141
)
138142
self.works[fileno].publish_event(
139143
event_name=eventNames.WORK_STARTED,

proxy/core/acceptor/work.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,14 @@
1616

1717
from abc import ABC, abstractmethod
1818
from uuid import uuid4
19-
from typing import Optional, Dict, Any, TypeVar, Generic
19+
from typing import Optional, Dict, Any, TypeVar, Generic, TYPE_CHECKING
2020

2121
from ..event import eventNames, EventQueue
2222
from ...common.types import Readables, Writables
2323

24+
if TYPE_CHECKING:
25+
from ..connection import UpstreamConnectionPool
26+
2427
T = TypeVar('T')
2528

2629

@@ -33,6 +36,7 @@ def __init__(
3336
flags: argparse.Namespace,
3437
event_queue: Optional[EventQueue] = None,
3538
uid: Optional[str] = None,
39+
upstream_conn_pool: Optional['UpstreamConnectionPool'] = None,
3640
) -> None:
3741
# Work uuid
3842
self.uid: str = uid if uid is not None else uuid4().hex
@@ -41,6 +45,7 @@ def __init__(
4145
self.event_queue = event_queue
4246
# Accept work
4347
self.work = work
48+
self.upstream_conn_pool = upstream_conn_pool
4449

4550
@abstractmethod
4651
async def get_events(self) -> Dict[int, int]:

proxy/core/connection/__init__.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
from .connection import TcpConnection, TcpConnectionUninitializedException
1717
from .client import TcpClientConnection
1818
from .server import TcpServerConnection
19-
from .pool import ConnectionPool
19+
from .pool import UpstreamConnectionPool
2020
from .types import tcpConnectionTypes
2121

2222
__all__ = [
@@ -25,5 +25,5 @@
2525
'TcpServerConnection',
2626
'TcpClientConnection',
2727
'tcpConnectionTypes',
28-
'ConnectionPool',
28+
'UpstreamConnectionPool',
2929
]

proxy/core/connection/pool.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
from typing import Set, Dict, Tuple
1818

1919
from ...common.flag import flags
20+
from ...common.types import Readables, Writables
21+
22+
from ..acceptor.work import Work
2023

2124
from .server import TcpServerConnection
2225

@@ -31,10 +34,10 @@
3134
)
3235

3336

34-
class ConnectionPool:
37+
class UpstreamConnectionPool(Work[TcpServerConnection]):
3538
"""Manages connection pool to upstream servers.
3639
37-
`ConnectionPool` avoids need to reconnect with the upstream
40+
`UpstreamConnectionPool` avoids need to reconnect with the upstream
3841
servers repeatedly when a reusable connection is available
3942
in the pool.
4043
@@ -47,16 +50,16 @@ class ConnectionPool:
4750
the pool users. Example, if acquired connection
4851
is stale, reacquire.
4952
50-
TODO: Ideally, ConnectionPool must be shared across
53+
TODO: Ideally, `UpstreamConnectionPool` must be shared across
5154
all cores to make SSL session cache to also work
5255
without additional out-of-bound synchronizations.
5356
54-
TODO: ConnectionPool currently WON'T work for
57+
TODO: `UpstreamConnectionPool` currently WON'T work for
5558
HTTPS connection. This is because of missing support for
5659
session cache, session ticket, abbr TLS handshake
5760
and other necessary features to make it work.
5861
59-
NOTE: However, for all HTTP only connections, ConnectionPool
62+
NOTE: However, for all HTTP only connections, `UpstreamConnectionPool`
6063
can be used to save upon connection setup time and
6164
speed-up performance of requests.
6265
"""
@@ -113,3 +116,9 @@ def release(self, conn: TcpServerConnection) -> None:
113116
assert not conn.is_reusable()
114117
# Reset for reusability
115118
conn.reset()
119+
120+
async def get_events(self) -> Dict[int, int]:
121+
return await super().get_events()
122+
123+
async def handle_events(self, readables: Readables, writables: Writables) -> bool:
124+
return await super().handle_events(readables, writables)

proxy/http/handler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ def initialize(self) -> None:
100100
self.work,
101101
self.request,
102102
self.event_queue,
103+
self.upstream_conn_pool,
103104
)
104105
self.plugins[instance.name()] = instance
105106
logger.debug('Handling connection %r' % self.work.connection)

proxy/http/plugin.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,17 @@
1212
import argparse
1313

1414
from abc import ABC, abstractmethod
15-
from typing import Tuple, List, Union, Optional
15+
from typing import Tuple, List, Union, Optional, TYPE_CHECKING
1616

1717
from .parser import HttpParser
1818

1919
from ..common.types import Readables, Writables
2020
from ..core.event import EventQueue
2121
from ..core.connection import TcpClientConnection
2222

23+
if TYPE_CHECKING:
24+
from ..core.connection import UpstreamConnectionPool
25+
2326

2427
class HttpProtocolHandlerPlugin(ABC):
2528
"""Base HttpProtocolHandler Plugin class.
@@ -50,12 +53,14 @@ def __init__(
5053
client: TcpClientConnection,
5154
request: HttpParser,
5255
event_queue: EventQueue,
56+
upstream_conn_pool: Optional['UpstreamConnectionPool'] = None,
5357
):
5458
self.uid: str = uid
5559
self.flags: argparse.Namespace = flags
5660
self.client: TcpClientConnection = client
5761
self.request: HttpParser = request
5862
self.event_queue = event_queue
63+
self.upstream_conn_pool = upstream_conn_pool
5964
super().__init__()
6065

6166
def name(self) -> str:

proxy/http/proxy/server.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
from ...common.pki import gen_public_key, gen_csr, sign_csr
4545

4646
from ...core.event import eventNames
47-
from ...core.connection import TcpServerConnection, ConnectionPool
47+
from ...core.connection import TcpServerConnection
4848
from ...core.connection import TcpConnectionUninitializedException
4949
from ...common.flag import flags
5050

@@ -140,9 +140,6 @@ class HttpProxyPlugin(HttpProtocolHandlerPlugin):
140140
# connection pool operations.
141141
lock = threading.Lock()
142142

143-
# Shared connection pool
144-
pool = ConnectionPool()
145-
146143
def __init__(
147144
self,
148145
*args: Any, **kwargs: Any,
@@ -200,10 +197,10 @@ def get_descriptors(self) -> Tuple[List[int], List[int]]:
200197

201198
def _close_and_release(self) -> bool:
202199
if self.flags.enable_conn_pool:
203-
assert self.upstream and not self.upstream.closed
200+
assert self.upstream and not self.upstream.closed and self.upstream_conn_pool
204201
self.upstream.closed = True
205202
with self.lock:
206-
self.pool.release(self.upstream)
203+
self.upstream_conn_pool.release(self.upstream)
207204
self.upstream = None
208205
return True
209206

@@ -391,9 +388,10 @@ def on_client_connection_close(self) -> None:
391388
return
392389

393390
if self.flags.enable_conn_pool:
391+
assert self.upstream_conn_pool
394392
# Release the connection for reusability
395393
with self.lock:
396-
self.pool.release(self.upstream)
394+
self.upstream_conn_pool.release(self.upstream)
397395
return
398396

399397
try:
@@ -589,8 +587,9 @@ def connect_upstream(self) -> None:
589587
host, port = self.request.host, self.request.port
590588
if host and port:
591589
if self.flags.enable_conn_pool:
590+
assert self.upstream_conn_pool
592591
with self.lock:
593-
created, self.upstream = self.pool.acquire(
592+
created, self.upstream = self.upstream_conn_pool.acquire(
594593
text_(host), port,
595594
)
596595
else:
@@ -642,8 +641,9 @@ def connect_upstream(self) -> None:
642641
),
643642
)
644643
if self.flags.enable_conn_pool:
644+
assert self.upstream_conn_pool
645645
with self.lock:
646-
self.pool.release(self.upstream)
646+
self.upstream_conn_pool.release(self.upstream)
647647
raise ProxyConnectionFailed(
648648
text_(host), port, repr(e),
649649
) from e

proxy/plugin/proxy_pool.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ def before_upstream_connection(
8888
must be bootstrapped within it's own re-usable and garbage collected pool,
8989
to avoid establishing a new upstream proxy connection for each client request.
9090
91-
See :class:`~proxy.core.connection.pool.ConnectionPool` which is a work
91+
See :class:`~proxy.core.connection.pool.UpstreamConnectionPool` which is a work
9292
in progress for SSL cache handling.
9393
"""
9494
# We don't want to send private IP requests to remote proxies

0 commit comments

Comments
 (0)