@@ -58,7 +58,7 @@ def _warn(warning: str) -> None:
58
58
59
59
class TokenCache (ABC ):
60
60
@staticmethod
61
- def make () -> TokenCache :
61
+ def make (skip_file_permissions_check : bool = False ) -> TokenCache :
62
62
if IS_MACOS or IS_WINDOWS :
63
63
if not installed_keyring :
64
64
_warn (
@@ -71,7 +71,7 @@ def make() -> TokenCache:
71
71
return KeyringTokenCache ()
72
72
73
73
if IS_LINUX :
74
- cache = FileTokenCache .make ()
74
+ cache = FileTokenCache .make (skip_file_permissions_check )
75
75
if cache :
76
76
return cache
77
77
else :
@@ -128,23 +128,30 @@ class _CacheFileWriteError(_FileTokenCacheError):
128
128
129
129
class FileTokenCache (TokenCache ):
130
130
@staticmethod
131
- def make () -> FileTokenCache | None :
132
- cache_dir = FileTokenCache .find_cache_dir ()
131
+ def make (skip_file_permissions_check : bool = False ) -> FileTokenCache | None :
132
+ cache_dir = FileTokenCache .find_cache_dir (skip_file_permissions_check )
133
133
if cache_dir is None :
134
134
logging .getLogger (__name__ ).debug (
135
135
"Failed to find suitable cache directory for token cache. File based token cache initialization failed."
136
136
)
137
137
return None
138
138
else :
139
- return FileTokenCache (cache_dir )
139
+ return FileTokenCache (
140
+ cache_dir , skip_file_permissions_check = skip_file_permissions_check
141
+ )
140
142
141
- def __init__ (self , cache_dir : Path ) -> None :
143
+ def __init__ (
144
+ self , cache_dir : Path , skip_file_permissions_check : bool = False
145
+ ) -> None :
142
146
self .logger = logging .getLogger (__name__ )
143
147
self .cache_dir : Path = cache_dir
148
+ self ._skip_file_permissions_check = skip_file_permissions_check
144
149
145
150
def store (self , key : TokenKey , token : str ) -> None :
146
151
try :
147
- FileTokenCache .validate_cache_dir (self .cache_dir )
152
+ FileTokenCache .validate_cache_dir (
153
+ self .cache_dir , self ._skip_file_permissions_check
154
+ )
148
155
with FileLock (self .lock_file ()):
149
156
cache = self ._read_cache_file ()
150
157
cache ["tokens" ][key .hash_key ()] = token
@@ -158,7 +165,9 @@ def store(self, key: TokenKey, token: str) -> None:
158
165
159
166
def retrieve (self , key : TokenKey ) -> str | None :
160
167
try :
161
- FileTokenCache .validate_cache_dir (self .cache_dir )
168
+ FileTokenCache .validate_cache_dir (
169
+ self .cache_dir , self ._skip_file_permissions_check
170
+ )
162
171
with FileLock (self .lock_file ()):
163
172
cache = self ._read_cache_file ()
164
173
token = cache ["tokens" ].get (key .hash_key (), None )
@@ -178,7 +187,9 @@ def retrieve(self, key: TokenKey) -> str | None:
178
187
179
188
def remove (self , key : TokenKey ) -> None :
180
189
try :
181
- FileTokenCache .validate_cache_dir (self .cache_dir )
190
+ FileTokenCache .validate_cache_dir (
191
+ self .cache_dir , self ._skip_file_permissions_check
192
+ )
182
193
with FileLock (self .lock_file ()):
183
194
cache = self ._read_cache_file ()
184
195
cache ["tokens" ].pop (key .hash_key (), None )
@@ -201,7 +212,8 @@ def _read_cache_file(self) -> dict[str, dict[str, Any]]:
201
212
json_data = {"tokens" : {}}
202
213
try :
203
214
fd = os .open (self .cache_file (), os .O_RDONLY )
204
- self ._ensure_permissions (fd , 0o600 )
215
+ if not self ._skip_file_permissions_check :
216
+ self ._ensure_permissions (fd , 0o600 )
205
217
size = os .lseek (fd , 0 , os .SEEK_END )
206
218
os .lseek (fd , 0 , os .SEEK_SET )
207
219
data = os .read (fd , size )
@@ -234,7 +246,8 @@ def _write_cache_file(self, json_data: dict):
234
246
fd = os .open (
235
247
self .cache_file (), os .O_WRONLY | os .O_CREAT | os .O_TRUNC , 0o600
236
248
)
237
- self ._ensure_permissions (fd , 0o600 )
249
+ if not self ._skip_file_permissions_check :
250
+ self ._ensure_permissions (fd , 0o600 )
238
251
os .write (fd , codecs .encode (json .dumps (json_data ), "utf-8" ))
239
252
return json_data
240
253
except OSError as e :
@@ -244,7 +257,7 @@ def _write_cache_file(self, json_data: dict):
244
257
os .close (fd )
245
258
246
259
@staticmethod
247
- def find_cache_dir () -> Path | None :
260
+ def find_cache_dir (skip_file_permissions_check : bool = False ) -> Path | None :
248
261
def lookup_env_dir (env_var : str , subpath_segments : list [str ]) -> Path | None :
249
262
env_val = os .getenv (env_var )
250
263
if env_val is None :
@@ -276,10 +289,12 @@ def lookup_env_dir(env_var: str, subpath_segments: list[str]) -> Path | None:
276
289
directory .mkdir (exist_ok = True , mode = 0o700 )
277
290
278
291
try :
279
- FileTokenCache .validate_cache_dir (directory )
292
+ FileTokenCache .validate_cache_dir (
293
+ directory , skip_file_permissions_check
294
+ )
280
295
return directory
281
296
except _FileTokenCacheError as e :
282
- logger . debug (
297
+ _warn (
283
298
f"Cache directory validation failed for { str (directory )} due to error '{ e } '. Skipping it in cache directory lookup."
284
299
)
285
300
return None
@@ -298,7 +313,9 @@ def lookup_env_dir(env_var: str, subpath_segments: list[str]) -> Path | None:
298
313
return None
299
314
300
315
@staticmethod
301
- def validate_cache_dir (cache_dir : Path | None ) -> None :
316
+ def validate_cache_dir (
317
+ cache_dir : Path | None , skip_file_permissions_check : bool = False
318
+ ) -> None :
302
319
try :
303
320
statinfo = cache_dir .stat ()
304
321
@@ -308,17 +325,18 @@ def validate_cache_dir(cache_dir: Path | None) -> None:
308
325
if not stat .S_ISDIR (statinfo .st_mode ):
309
326
raise _InvalidCacheDirError (f"Cache dir { cache_dir } is not a directory" )
310
327
311
- permissions = stat .S_IMODE (statinfo .st_mode )
312
- if permissions != 0o700 :
313
- raise _PermissionsTooWideError (
314
- f"Cache dir { cache_dir } has incorrect permissions. { permissions :o} != 0700"
315
- )
328
+ if not skip_file_permissions_check :
329
+ permissions = stat .S_IMODE (statinfo .st_mode )
330
+ if permissions != 0o700 :
331
+ raise _PermissionsTooWideError (
332
+ f"Cache dir { cache_dir } has incorrect permissions. { permissions :o} != 0700"
333
+ )
316
334
317
- euid = os .geteuid ()
318
- if statinfo .st_uid != euid :
319
- raise _OwnershipError (
320
- f"Cache dir { cache_dir } has incorrect owner. { euid } != { statinfo .st_uid } "
321
- )
335
+ euid = os .geteuid ()
336
+ if statinfo .st_uid != euid :
337
+ raise _OwnershipError (
338
+ f"Cache dir { cache_dir } has incorrect owner. { euid } != { statinfo .st_uid } "
339
+ )
322
340
323
341
except FileNotFoundError :
324
342
raise _CacheDirNotFoundError (
0 commit comments