22import time
33
44import abc
5- import asyncio
65import logging
76import six
8- from ydb import issues , credentials
97from ydb .iam import auth
8+ from .credentials import AbstractExpiringTokenCredentials
109
1110logger = logging .getLogger (__name__ )
1211
2524 aiohttp = None
2625
2726
28- class _OneToManyValue (object ):
29- def __init__ (self ):
30- self ._value = None
31- self ._condition = asyncio .Condition ()
32-
33- async def consume (self , timeout = 3 ):
34- async with self ._condition :
35- if self ._value is None :
36- try :
37- await asyncio .wait_for (self ._condition .wait (), timeout = timeout )
38- except Exception :
39- return self ._value
40- return self ._value
41-
42- async def update (self , n_value ):
43- async with self ._condition :
44- prev_value = self ._value
45- self ._value = n_value
46- if prev_value is None :
47- self ._condition .notify_all ()
48-
49-
50- class _AtMostOneExecution (object ):
51- def __init__ (self ):
52- self ._can_schedule = True
53- self ._lock = asyncio .Lock () # Lock to guarantee only one execution
54-
55- async def _wrapped_execution (self , callback ):
56- await self ._lock .acquire ()
57- try :
58- res = callback ()
59- if asyncio .iscoroutine (res ):
60- await res
61- except Exception :
62- pass
63-
64- finally :
65- self ._lock .release ()
66- self ._can_schedule = True
67-
68- def submit (self , callback ):
69- if self ._can_schedule :
70- self ._can_schedule = False
71- asyncio .ensure_future (self ._wrapped_execution (callback ))
72-
73-
7427@six .add_metaclass (abc .ABCMeta )
75- class IamTokenCredentials (auth .IamTokenCredentials ):
76- def __init__ (self ):
77- super (IamTokenCredentials , self ).__init__ ()
78- self ._tp = _AtMostOneExecution ()
79- self ._iam_token = _OneToManyValue ()
80-
81- @abc .abstractmethod
82- async def _get_iam_token (self ):
83- pass
84-
85- async def _refresh (self ):
86- current_time = time .time ()
87- self ._log_refresh_start (current_time )
88-
89- try :
90- auth_metadata = await self ._get_iam_token ()
91- await self ._iam_token .update (auth_metadata ["access_token" ])
92- self .update_expiration_info (auth_metadata )
93- self .logger .info (
94- "Token refresh successful. current_time %s, refresh_in %s" ,
95- current_time ,
96- self ._refresh_in ,
97- )
98-
99- except (KeyboardInterrupt , SystemExit ):
100- return
101-
102- except Exception as e :
103- self .last_error = str (e )
104- await asyncio .sleep (1 )
105- self ._tp .submit (self ._refresh )
106-
107- async def iam_token (self ):
108- current_time = time .time ()
109- if current_time > self ._refresh_in :
110- self ._tp .submit (self ._refresh )
111-
112- iam_token = await self ._iam_token .consume (timeout = 3 )
113- if iam_token is None :
114- if self .last_error is None :
115- raise issues .ConnectionError (
116- "%s: timeout occurred while waiting for token.\n %s"
117- % self .__class__ .__name__ ,
118- self .extra_error_message ,
119- )
120- raise issues .ConnectionError (
121- "%s: %s.\n %s"
122- % (self .__class__ .__name__ , self .last_error , self .extra_error_message )
123- )
124- return iam_token
125-
126- async def auth_metadata (self ):
127- return [(credentials .YDB_AUTH_TICKET_HEADER , await self .iam_token ())]
128-
129-
130- @six .add_metaclass (abc .ABCMeta )
131- class TokenServiceCredentials (IamTokenCredentials ):
28+ class TokenServiceCredentials (AbstractExpiringTokenCredentials ):
13229 def __init__ (self , iam_endpoint = None , iam_channel_credentials = None ):
13330 super (TokenServiceCredentials , self ).__init__ ()
31+ assert (
32+ iam_token_service_pb2_grpc is not None
33+ ), "run pip install==ydb[yc] to use service account credentials"
34+ self ._get_token_request_timeout = 10
13435 self ._iam_endpoint = (
13536 "iam.api.cloud.yandex.net:443" if iam_endpoint is None else iam_endpoint
13637 )
13738 self ._iam_channel_credentials = (
13839 {} if iam_channel_credentials is None else iam_channel_credentials
13940 )
140- self ._get_token_request_timeout = 10
141- if (
142- iam_token_service_pb2_grpc is None
143- or jwt is None
144- or iam_token_service_pb2 is None
145- ):
146- raise RuntimeError (
147- "Install jwt & yandex python cloud library to use service account credentials provider"
148- )
14941
15042 def _channel_factory (self ):
15143 return grpc .aio .secure_channel (
@@ -157,7 +49,7 @@ def _channel_factory(self):
15749 def _get_token_request (self ):
15850 pass
15951
160- async def _get_iam_token (self ):
52+ async def _make_token_request (self ):
16153 async with self ._channel_factory () as channel :
16254 stub = iam_token_service_pb2_grpc .IamTokenServiceStub (channel )
16355 response = await stub .Create (
@@ -209,20 +101,19 @@ def _get_token_request(self):
209101 )
210102
211103
212- class MetadataUrlCredentials (IamTokenCredentials ):
104+ class MetadataUrlCredentials (AbstractExpiringTokenCredentials ):
213105 def __init__ (self , metadata_url = None ):
214106 super (MetadataUrlCredentials , self ).__init__ ()
215- if aiohttp is None :
216- raise RuntimeError (
217- "Install aiohttp library to use metadata credentials provider"
218- )
107+ assert (
108+ aiohttp is not None
109+ ), "Install aiohttp library to use metadata credentials provider"
219110 self ._metadata_url = (
220111 auth .DEFAULT_METADATA_URL if metadata_url is None else metadata_url
221112 )
222113 self ._tp .submit (self ._refresh )
223114 self .extra_error_message = "Check that metadata service configured properly and application deployed in VM or function at Yandex.Cloud."
224115
225- async def _get_iam_token (self ):
116+ async def _make_token_request (self ):
226117 timeout = aiohttp .ClientTimeout (total = 2 )
227118 async with aiohttp .ClientSession (timeout = timeout ) as session :
228119 async with session .get (
0 commit comments