Skip to content

Commit 0601fa8

Browse files
authored
Merge pull request #8144 from chaen/diracx_connection_pool
feat (diracx): add a randomized connection pooling
2 parents 7705305 + d1ac65f commit 0601fa8

File tree

2 files changed

+75
-2
lines changed

2 files changed

+75
-2
lines changed

docs/source/conf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,7 @@ def setup(app):
306306
intersphinx_mapping = {
307307
"python": ("https://docs.python.org/3/", None),
308308
"matplotlib": ("https://matplotlib.org/", None),
309+
"requests": ("https://requests.readthedocs.io/en/latest/", None),
309310
}
310311

311312
# check for :param / :return in html, points to faulty syntax, missing empty lines, etc.

src/DIRAC/FrameworkSystem/Utilities/diracx.py

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
1+
import random
12
import requests
23

34
from cachetools import TTLCache, LRUCache, cached
45
from cachetools.keys import hashkey
56
from pathlib import Path
7+
from requests.adapters import HTTPAdapter
68
from tempfile import NamedTemporaryFile
79
from typing import Any
810
from collections.abc import Generator
11+
from urllib3 import PoolManager
12+
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
13+
914
from DIRAC import gConfig
1015
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
1116
from contextlib import contextmanager
@@ -26,7 +31,74 @@
2631
DEFAULT_TOKEN_CACHE_TTL = 5 * 60
2732
DEFAULT_TOKEN_CACHE_SIZE = 1024
2833

29-
legacy_exchange_session = requests.Session()
34+
# Number of pools to use for a given host.
35+
# It should be in the order of host behind the alias
36+
SESSION_NUM_POOLS = 20
37+
# Number of connection per Pool
38+
SESSION_CONNECTION_POOL_MAX_SIZE = 10
39+
40+
41+
class RandomizedPoolManager(PoolManager):
42+
"""
43+
A PoolManager subclass that creates multiple connection pools per host.
44+
Each connection request randomly picks one of the available pools.
45+
"""
46+
47+
def __init__(self, num_pools=3, **kwargs):
48+
self.num_pools = num_pools
49+
super().__init__(**kwargs)
50+
51+
def connection_from_host(self, host, port=None, scheme="http", pool_kwargs=None):
52+
# Pick a random index to diversify the pool key.
53+
54+
rand_index = random.randint(0, self.num_pools - 1)
55+
pool_key = (f"{host}-{rand_index}", port, scheme)
56+
if pool_key in self.pools:
57+
return self.pools[pool_key]
58+
59+
# Create a new pool if none exists for this key.
60+
if scheme == "http":
61+
self.pools[pool_key] = HTTPConnectionPool(host, port, **self.connection_pool_kw)
62+
elif scheme == "https":
63+
self.pools[pool_key] = HTTPSConnectionPool(host, port, **self.connection_pool_kw)
64+
else:
65+
raise ValueError(f"Unsupported scheme: {scheme}")
66+
67+
return self.pools[pool_key]
68+
69+
70+
class RandomizedHTTPAdapter(HTTPAdapter):
71+
"""
72+
An HTTPAdapter that uses the RandomizedPoolManager.
73+
"""
74+
75+
def __init__(self, num_pools=3, maxsize=10, **kwargs):
76+
self.num_pools = num_pools
77+
self.custom_maxsize = maxsize
78+
super().__init__(**kwargs)
79+
80+
def init_poolmanager(self, connections, maxsize, block=False, **pool_kwargs):
81+
"""
82+
Initialize the pool manager with our custom RandomizedPoolManager.
83+
"""
84+
# This ends up being passed to the HTTP(s)ConnectionPool constructors
85+
pool_kwargs.update(
86+
{
87+
"maxsize": self.custom_maxsize,
88+
"block": block,
89+
}
90+
)
91+
self.poolmanager = RandomizedPoolManager(**pool_kwargs)
92+
93+
94+
# Create a requests session.
95+
diracx_session = requests.Session()
96+
# Create an instance of the custom adapter.
97+
diracx_pool_adapter = RandomizedHTTPAdapter(num_pools=SESSION_NUM_POOLS, maxsize=SESSION_CONNECTION_POOL_MAX_SIZE)
98+
99+
# Mount the adapter to handle both HTTP and HTTPS.
100+
diracx_session.mount("http://", diracx_pool_adapter)
101+
diracx_session.mount("https://", diracx_pool_adapter)
30102

31103

32104
def get_token(
@@ -45,7 +117,7 @@ def get_token(
45117
vo = Registry.getVOForGroup(group)
46118
scopes = [f"vo:{vo}", f"group:{group}"] + [f"property:{prop}" for prop in dirac_properties]
47119

48-
r = legacy_exchange_session.get(
120+
r = diracx_session.get(
49121
f"{diracxUrl}/api/auth/legacy-exchange",
50122
params={
51123
"preferred_username": username,

0 commit comments

Comments
 (0)