77from pathlib import PurePosixPath
88from typing import Any , Optional , Self , cast
99
10- import requests
10+ import httpx
1111from werkzeug .datastructures import WWWAuthenticate
1212
13- from ... errors . user import ImageParseError
13+ from renku_data_services . errors import errors
1414
1515
1616class ManifestTypes (Enum ):
@@ -29,16 +29,20 @@ class ImageRepoDockerAPI:
2929
3030 hostname : str
3131 oauth2_token : Optional [str ] = field (default = None , repr = False )
32+ # NOTE: We need to follow redirects so that we can authenticate with the image repositories properly.
33+ # NOTE: If we do not use default_factory to create the client here requests will fail because it can happen
34+ # that the client gets created in the wrong asyncio loop.
35+ client : httpx .AsyncClient = field (default_factory = lambda : httpx .AsyncClient (timeout = 10 , follow_redirects = True ))
3236
33- def _get_docker_token (self , image : "Image" ) -> Optional [str ]:
37+ async def _get_docker_token (self , image : "Image" ) -> Optional [str ]:
3438 """Get an authorization token from the docker v2 API.
3539
3640 This will return the token provided by the API (or None if no token was found).
3741 """
3842 image_digest_url = f"https://{ self .hostname } /v2/{ image .name } /manifests/{ image .tag } "
3943 try :
40- auth_req = requests . get (image_digest_url , timeout = 10 )
41- except requests . ConnectionError :
44+ auth_req = await self . client . get (image_digest_url )
45+ except httpx . ConnectError :
4246 auth_req = None
4347 if auth_req is None or not (auth_req .status_code == 401 and "Www-Authenticate" in auth_req .headers ):
4448 # the request status code and header are not what is expected
@@ -54,56 +58,55 @@ def _get_docker_token(self, image: "Image") -> Optional[str]:
5458 if self .oauth2_token :
5559 creds = base64 .urlsafe_b64encode (f"oauth2:{ self .oauth2_token } " .encode ()).decode ()
5660 headers ["Authorization" ] = f"Basic { creds } "
57- token_req = requests . get (realm , params = params , headers = headers , timeout = 10 )
61+ token_req = await self . client . get (realm , params = params , headers = headers )
5862 return str (token_req .json ().get ("token" ))
5963
60- def get_image_manifest (self , image : "Image" ) -> Optional [dict [str , Any ]]:
64+ async def get_image_manifest (self , image : "Image" ) -> Optional [dict [str , Any ]]:
6165 """Query the docker API to get the manifest of an image."""
6266 if image .hostname != self .hostname :
63- raise ImageParseError (
64- f"The image hostname { image .hostname } does not match " f" the image repository { self .hostname } "
67+ raise errors . ValidationError (
68+ message = f"The image hostname { image .hostname } does not match the image repository { self .hostname } "
6569 )
66- token = self ._get_docker_token (image )
70+ token = await self ._get_docker_token (image )
6771 image_digest_url = f"https://{ image .hostname } /v2/{ image .name } /manifests/{ image .tag } "
6872 headers = {"Accept" : ManifestTypes .docker_v2 .value }
6973 if token :
7074 headers ["Authorization" ] = f"Bearer { token } "
71- res = requests . get (image_digest_url , headers = headers , timeout = 10 )
75+ res = await self . client . get (image_digest_url , headers = headers )
7276 if res .status_code != 200 :
7377 headers ["Accept" ] = ManifestTypes .oci_v1 .value
74- res = requests . get (image_digest_url , headers = headers , timeout = 10 )
78+ res = await self . client . get (image_digest_url , headers = headers )
7579 if res .status_code != 200 :
7680 return None
7781 return cast (dict [str , Any ], res .json ())
7882
79- def image_exists (self , image : "Image" ) -> bool :
83+ async def image_exists (self , image : "Image" ) -> bool :
8084 """Check the docker repo API if the image exists."""
81- return self .get_image_manifest (image ) is not None
85+ return await self .get_image_manifest (image ) is not None
8286
83- def get_image_config (self , image : "Image" ) -> Optional [dict [str , Any ]]:
87+ async def get_image_config (self , image : "Image" ) -> Optional [dict [str , Any ]]:
8488 """Query the docker API to get the configuration of an image."""
85- manifest = self .get_image_manifest (image )
89+ manifest = await self .get_image_manifest (image )
8690 if manifest is None :
8791 return None
8892 config_digest = manifest .get ("config" , {}).get ("digest" )
8993 if config_digest is None :
9094 return None
91- token = self ._get_docker_token (image )
92- res = requests .get (
95+ token = await self ._get_docker_token (image )
96+ res = await self . client .get (
9397 f"https://{ image .hostname } /v2/{ image .name } /blobs/{ config_digest } " ,
9498 headers = {
9599 "Accept" : "application/json" ,
96100 "Authorization" : f"Bearer { token } " ,
97101 },
98- timeout = 10 ,
99102 )
100103 if res .status_code != 200 :
101104 return None
102105 return cast (dict [str , Any ], res .json ())
103106
104- def image_workdir (self , image : "Image" ) -> Optional [PurePosixPath ]:
107+ async def image_workdir (self , image : "Image" ) -> Optional [PurePosixPath ]:
105108 """Query the docker API to get the workdir of an image."""
106- config = self .get_image_config (image )
109+ config = await self .get_image_config (image )
107110 if config is None :
108111 return None
109112 nested_config = config .get ("config" , {})
@@ -204,9 +207,9 @@ def build_re(*parts: str) -> re.Pattern:
204207 if len (matches ) == 1 :
205208 return cls (matches [0 ]["hostname" ], matches [0 ]["image" ], matches [0 ]["tag" ])
206209 elif len (matches ) > 1 :
207- raise ImageParseError ( f"Cannot parse the image { path } , too many interpretations { matches } " )
210+ raise errors . ValidationError ( message = f"Cannot parse the image { path } , too many interpretations { matches } " )
208211 else :
209- raise ImageParseError ( f"Cannot parse the image { path } " )
212+ raise errors . ValidationError ( message = f"Cannot parse the image { path } " )
210213
211214 def repo_api (self ) -> ImageRepoDockerAPI :
212215 """Get the docker API from the image."""
0 commit comments