1+ import random
12import requests
23
34from cachetools import TTLCache , LRUCache , cached
45from cachetools .keys import hashkey
56from pathlib import Path
7+ from requests .adapters import HTTPAdapter
68from tempfile import NamedTemporaryFile
79from typing import Any
810from collections .abc import Generator
11+ from urllib3 import PoolManager
12+ from urllib3 .connectionpool import HTTPConnectionPool , HTTPSConnectionPool
13+
914from DIRAC import gConfig
1015from DIRAC .ConfigurationSystem .Client .Helpers import Registry
1116from contextlib import contextmanager
2631DEFAULT_TOKEN_CACHE_TTL = 5 * 60
2732DEFAULT_TOKEN_CACHE_SIZE = 1024
2833
29- legacy_exchange_session = requests .Session ()
34+ # Number of pools to use for a given host.
35+ # It should be in the order of host behind the alias
36+ SESSION_NUM_POOLS = 20
37+ # Number of connection per Pool
38+ SESSION_CONNECTION_POOL_MAX_SIZE = 10
39+
40+
41+ class RandomizedPoolManager (PoolManager ):
42+ """
43+ A PoolManager subclass that creates multiple connection pools per host.
44+ Each connection request randomly picks one of the available pools.
45+ """
46+
47+ def __init__ (self , num_pools = 3 , ** kwargs ):
48+ self .num_pools = num_pools
49+ super ().__init__ (** kwargs )
50+
51+ def connection_from_host (self , host , port = None , scheme = "http" , pool_kwargs = None ):
52+ # Pick a random index to diversify the pool key.
53+
54+ rand_index = random .randint (0 , self .num_pools - 1 )
55+ pool_key = (f"{ host } -{ rand_index } " , port , scheme )
56+ if pool_key in self .pools :
57+ return self .pools [pool_key ]
58+
59+ # Create a new pool if none exists for this key.
60+ if scheme == "http" :
61+ self .pools [pool_key ] = HTTPConnectionPool (host , port , ** self .connection_pool_kw )
62+ elif scheme == "https" :
63+ self .pools [pool_key ] = HTTPSConnectionPool (host , port , ** self .connection_pool_kw )
64+ else :
65+ raise ValueError (f"Unsupported scheme: { scheme } " )
66+
67+ return self .pools [pool_key ]
68+
69+
70+ class RandomizedHTTPAdapter (HTTPAdapter ):
71+ """
72+ An HTTPAdapter that uses the RandomizedPoolManager.
73+ """
74+
75+ def __init__ (self , num_pools = 3 , maxsize = 10 , ** kwargs ):
76+ self .num_pools = num_pools
77+ self .custom_maxsize = maxsize
78+ super ().__init__ (** kwargs )
79+
80+ def init_poolmanager (self , connections , maxsize , block = False , ** pool_kwargs ):
81+ """
82+ Initialize the pool manager with our custom RandomizedPoolManager.
83+ """
84+ # This ends up being passed to the HTTP(s)ConnectionPool constructors
85+ pool_kwargs .update (
86+ {
87+ "maxsize" : self .custom_maxsize ,
88+ "block" : block ,
89+ }
90+ )
91+ self .poolmanager = RandomizedPoolManager (** pool_kwargs )
92+
93+
94+ # Create a requests session.
95+ diracx_session = requests .Session ()
96+ # Create an instance of the custom adapter.
97+ diracx_pool_adapter = RandomizedHTTPAdapter (num_pools = SESSION_NUM_POOLS , maxsize = SESSION_CONNECTION_POOL_MAX_SIZE )
98+
99+ # Mount the adapter to handle both HTTP and HTTPS.
100+ diracx_session .mount ("http://" , diracx_pool_adapter )
101+ diracx_session .mount ("https://" , diracx_pool_adapter )
30102
31103
32104def get_token (
@@ -45,7 +117,7 @@ def get_token(
45117 vo = Registry .getVOForGroup (group )
46118 scopes = [f"vo:{ vo } " , f"group:{ group } " ] + [f"property:{ prop } " for prop in dirac_properties ]
47119
48- r = legacy_exchange_session .get (
120+ r = diracx_session .get (
49121 f"{ diracxUrl } /api/auth/legacy-exchange" ,
50122 params = {
51123 "preferred_username" : username ,
0 commit comments