Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 20 additions & 11 deletions lib/galaxy/tool_util/deps/mulled/mulled_build.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
TYPE_CHECKING,
)

from requests import Session
import yaml
from typing_extensions import Literal

Expand Down Expand Up @@ -57,6 +58,7 @@
get_files_from_conda_package,
PrintProgress,
quay_repository,
quay_tag_exists,
v1_image_name,
v2_image_name,
)
Expand Down Expand Up @@ -232,6 +234,7 @@ def mull_targets(
determine_base_image: bool = True,
invfile: str = INVFILE,
strict_channel_priority: bool = True,
session: Optional[Session] = None,
) -> int:
if involucro_context is None:
involucro_context = InvolucroContext()
Expand All @@ -250,18 +253,24 @@ def mull_targets(

if not rebuild or "push" in command:
repo_name = repo_template_kwds["image"].split(":", 1)[0]
repo_data = quay_repository(repo_template_kwds["namespace"], repo_name)
if not rebuild:
tags = repo_data.get("tags", [])

target_tag = None
if ":" in repo_template_kwds["image"]:
image_name_parts = repo_template_kwds["image"].split(":")
assert len(image_name_parts) == 2, f": not allowed in image name [{repo_template_kwds['image']}]"
target_tag = image_name_parts[1]
repo_data = None
target_tag = None
if ":" in repo_template_kwds["image"]:
image_name_parts = repo_template_kwds["image"].split(":")
assert len(image_name_parts) == 2, f": not allowed in image name [{repo_template_kwds['image']}]"
target_tag = image_name_parts[1]

if tags and (target_tag is None or target_tag in tags):
raise BuildExistsException()
if not rebuild:
if target_tag is not None:
if quay_tag_exists(repo_template_kwds["namespace"], repo_name, target_tag, session=session):
raise BuildExistsException()
else:
repo_data = quay_repository(repo_template_kwds["namespace"], repo_name, session=session)
tags = repo_data.get("tags", [])
if tags:
raise BuildExistsException()
if "push" in command and repo_data is None:
repo_data = quay_repository(repo_template_kwds["namespace"], repo_name, session=session)
if "push" in command and "error_type" in repo_data and oauth_token:
# Explicitly create the repository so it can be built as public.
create_repository(repo_template_kwds["namespace"], repo_name, oauth_token)
Expand Down
2 changes: 1 addition & 1 deletion lib/galaxy/tool_util/deps/mulled/mulled_build_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def run_channel(args, build_last_n_versions: int = 1) -> None:
for tag in versions:
target = build_target(pkg_name, tag=tag)
targets = [target]
mull_targets(targets, test=pkg_tests, **args_to_mull_targets_kwds(args))
mull_targets(targets, test=pkg_tests, session=session, **args_to_mull_targets_kwds(args))


def get_pkg_names(args):
Expand Down
3 changes: 3 additions & 0 deletions lib/galaxy/tool_util/deps/mulled/mulled_build_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
)

from galaxy.tool_util.deps.conda_util import CondaTarget
from galaxy.util import requests
from ._cli import arg_parser
from .mulled_build import (
add_build_arguments,
Expand Down Expand Up @@ -57,6 +58,7 @@ def main(argv=None):
help="Path to directory (or single file) of TSV files describing composite recipes.",
)
args = parser.parse_args()
session = requests.session()
for target in generate_targets(args.files):
try:
ret = mull_targets(
Expand All @@ -65,6 +67,7 @@ def main(argv=None):
name_override=target.name_override,
base_image=target.base_image,
determine_base_image=False,
session=session,
**args_to_mull_targets_kwds(args),
)
except BuildExistsException:
Expand Down
148 changes: 139 additions & 9 deletions lib/galaxy/tool_util/deps/mulled/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
from conda_package_streaming.url import stream_conda_info as stream_conda_info_from_url
from packaging.version import Version
from requests import Session
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from galaxy.tool_util.deps.conda_util import (
CondaContext,
Expand All @@ -41,12 +43,27 @@
log = logging.getLogger(__name__)

QUAY_REPOSITORY_API_ENDPOINT = "https://quay.io/api/v1/repository"
QUAY_REGISTRY_API_ENDPOINT = "https://quay.io/v2"
BUILD_NUMBER_REGEX = re.compile(r"\d+$")
MULLED_SOCKET_TIMEOUT = 12
QUAY_VERSIONS_CACHE_EXPIRY = 300
QUAY_REQUEST_RETRY_STATUS_CODES = (408, 425, 429, 500, 502, 503, 504)
QUAY_REQUEST_MAX_RETRIES = 5
QUAY_REQUEST_BACKOFF_FACTOR = 1
QUAY_MANIFEST_ACCEPT = ",".join(
[
"application/vnd.docker.distribution.manifest.v2+json",
"application/vnd.docker.distribution.manifest.list.v2+json",
"application/vnd.oci.image.manifest.v1+json",
"application/vnd.oci.image.index.v1+json",
"application/vnd.docker.distribution.manifest.v1+json",
]
)
NAMESPACE_HAS_REPO_NAME_KEY = "galaxy.tool_util.deps.container_resolvers.mulled.util:namespace_repo_names"
TAG_CACHE_KEY = "galaxy.tool_util.deps.container_resolvers.mulled.util:tag_cache"
CONDA_IMAGE = os.environ.get("CONDA_IMAGE", "quay.io/condaforge/miniforge3:latest")
_quay_session_lock = threading.Lock()
_shared_quay_session: Optional[Session] = None


class PARSED_TAG(NamedTuple):
Expand All @@ -56,6 +73,10 @@ class PARSED_TAG(NamedTuple):
build_number: int


class QuayApiException(Exception):
"""Raised when quay.io returns an unexpected response."""


def default_mulled_conda_channels_from_env() -> Optional[List[str]]:
if "DEFAULT_MULLED_CONDA_CHANNELS" in os.environ:
return os.environ["DEFAULT_MULLED_CONDA_CHANNELS"].split(",")
Expand Down Expand Up @@ -105,15 +126,68 @@ def create_repository(namespace: str, repo_name: str, oauth_token: str) -> None:
requests.post("https://quay.io/api/v1/repository", json=data, headers=headers, timeout=MULLED_SOCKET_TIMEOUT)


def _build_quay_session() -> Session:
retry_strategy = Retry(
total=QUAY_REQUEST_MAX_RETRIES,
connect=QUAY_REQUEST_MAX_RETRIES,
read=QUAY_REQUEST_MAX_RETRIES,
status=QUAY_REQUEST_MAX_RETRIES,
backoff_factor=QUAY_REQUEST_BACKOFF_FACTOR,
status_forcelist=QUAY_REQUEST_RETRY_STATUS_CODES,
allowed_methods=frozenset({"GET", "HEAD"}),
respect_retry_after_header=True,
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session = requests.session()
session.mount("https://", adapter)
session.mount("http://", adapter)
return session


def _get_quay_session(session: Optional[Session] = None) -> Session:
if session is not None:
return session

global _shared_quay_session
if _shared_quay_session is None:
with _quay_session_lock:
if _shared_quay_session is None:
_shared_quay_session = _build_quay_session()
return _shared_quay_session


def _quay_api_error(response, url: str) -> QuayApiException:
try:
detail = response.json()
except ValueError:
detail = response.text[:200]
return QuayApiException(f"Unexpected quay.io response for {url} [{response.status_code}]: {detail!r}")


def _quay_json_dict(response, url: str) -> Dict[str, Any]:
try:
data = response.json()
except ValueError as exc:
raise QuayApiException(
f"Failed to decode quay.io JSON for {url} [{response.status_code}]: {response.text[:200]!r}"
) from exc

if not isinstance(data, dict):
raise QuayApiException(f"Unexpected quay.io payload type for {url}: {type(data).__name__}")

return data


def quay_versions(namespace: str, pkg_name: str, session: Optional[Session] = None) -> List[str]:
"""Get all version tags for a Docker image stored on quay.io for supplied package name."""
data = quay_repository(namespace, pkg_name, session=session)

if "error_type" in data and data["error_type"] == "invalid_token":
if "error_type" in data and data["error_type"] in {"invalid_token", "not_found"}:
return []

if "tags" not in data:
raise Exception(f"Unexpected response from quay.io - no tags description found [{data}]")
raise QuayApiException(f"Unexpected response from quay.io - no tags description found [{data}]")

return [tag for tag in data["tags"].keys() if tag != "latest"]

Expand All @@ -122,11 +196,64 @@ def quay_repository(namespace: str, pkg_name: str, session: Optional[Session] =
assert namespace is not None
assert pkg_name is not None
url = f"https://quay.io/api/v1/repository/{namespace}/{pkg_name}"
if not session:
session = requests.session()
response = session.get(url, timeout=MULLED_SOCKET_TIMEOUT)
data = response.json()
return data
response = _get_quay_session(session).get(url, timeout=MULLED_SOCKET_TIMEOUT)
if response.status_code in {401, 404}:
try:
data = _quay_json_dict(response, url)
except QuayApiException:
if response.status_code == 404:
# Some missing-repo responses are non-JSON; normalize them to not_found.
return {"error_type": "not_found"}
raise
if response.status_code == 401 and data.get("error_type") != "invalid_token":
raise _quay_api_error(response, url)
# Quay uses 401 invalid_token for some public repo/tag misses.
return data
if response.status_code >= 400:
raise _quay_api_error(response, url)
return _quay_json_dict(response, url)


def quay_tag_exists(namespace: str, pkg_name: str, tag: str, session: Optional[Session] = None) -> bool:
assert namespace is not None
assert pkg_name is not None
assert tag is not None

url = (
f"{QUAY_REGISTRY_API_ENDPOINT}/{namespace}/{pkg_name}/manifests/"
f"{tag}"
)
response = _get_quay_session(session).head(
url,
headers={"Accept": QUAY_MANIFEST_ACCEPT},
timeout=MULLED_SOCKET_TIMEOUT,
)
if response.status_code == 404:
# A manifest HEAD 404 is the normal "tag does not exist" case.
return False
if response.status_code == 200:
return True
# Quay can return 401 invalid_token here for public repos, so treat it like a fallback case.
if response.status_code != 401 and response.status_code not in QUAY_REQUEST_RETRY_STATUS_CODES:
raise _quay_api_error(response, url)

log.warning(
"Falling back to quay repository metadata for %s/%s:%s after registry manifest probe failed with %s",
namespace,
pkg_name,
tag,
response.status_code,
)
repo_data = quay_repository(namespace, pkg_name, session=session)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How likely is that we end up here? Because we risk calculating this twice, once here then again in mull_targets() in the branch if "push" in command:

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not likely, but I could not find out why a few repos did not return tags.

Any idea how we can refactor it? Or cache the request?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caching with https://pypi.org/project/requests-cache/ should be easy, you just need to change how you create the session. I'd put the cache file in Galaxy data_dir .

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about the latest commit? data_dir does not exist I think if you just run mulled-build from CLI.

Still seems to work on multi-package-containers

if "error_type" in repo_data and repo_data["error_type"] in {"invalid_token", "not_found"}:
return False

tags = repo_data.get("tags", {})
if isinstance(tags, dict):
return tag in tags
if isinstance(tags, list):
return tag in tags
raise _quay_api_error(response, url)


def _get_namespace(namespace: str) -> List[str]:
Expand All @@ -136,10 +263,12 @@ def _get_namespace(namespace: str) -> List[str]:
repos_headers = {"Accept-encoding": "gzip", "Accept": "application/json"}
while True:
repos_parameters = {"public": "true", "namespace": namespace, "next_page": next_page}
repos_response = requests.get(
repos_response = _get_quay_session().get(
QUAY_REPOSITORY_API_ENDPOINT, headers=repos_headers, params=repos_parameters, timeout=MULLED_SOCKET_TIMEOUT
)
repos_response_json = repos_response.json()
if repos_response.status_code >= 400:
raise _quay_api_error(repos_response, QUAY_REPOSITORY_API_ENDPOINT)
repos_response_json = _quay_json_dict(repos_response, QUAY_REPOSITORY_API_ENDPOINT)
repos = repos_response_json["repositories"]
repo_names += [r["name"] for r in repos]
next_page = repos_response_json.get("next_page")
Expand Down Expand Up @@ -483,6 +612,7 @@ def __exit__(self, exc_type, exc_val, exc_tb) -> None:
"get_files_from_conda_package",
"image_name",
"mulled_tags_for",
"quay_tag_exists",
"quay_versions",
"split_container_name",
"split_tag",
Expand Down
78 changes: 77 additions & 1 deletion test/unit/tool_util/mulled/test_mulled_util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,41 @@
import pytest

from galaxy.tool_util.deps.mulled.util import version_sorted
from galaxy.tool_util.deps.mulled.util import (
quay_repository,
quay_tag_exists,
QuayApiException,
version_sorted,
)


class FakeResponse:
def __init__(self, status_code, payload=None, text="", headers=None, json_error=False):
self.status_code = status_code
self._payload = payload
self.text = text
self.headers = headers or {}
self._json_error = json_error

def json(self):
if self._json_error:
raise ValueError("invalid json")
return self._payload


class FakeSession:
def __init__(self, *, get_response=None, head_response=None):
self.get_response = get_response
self.head_response = head_response
self.get_calls = []
self.head_calls = []

def get(self, url, **kwargs):
self.get_calls.append((url, kwargs))
return self.get_response

def head(self, url, **kwargs):
self.head_calls.append((url, kwargs))
return self.head_response


@pytest.mark.parametrize(
Expand All @@ -17,3 +52,44 @@
)
def test_version_sorted(tags, tag):
assert version_sorted(tags)[0] == tag


def test_quay_tag_exists_uses_registry_head():
session = FakeSession(head_response=FakeResponse(200))

assert quay_tag_exists("biocontainers", "samtools", "1.17--0", session=session) is True
called_url = session.head_calls[0][0]
assert called_url.startswith("https://quay.io/v2/biocontainers/samtools/manifests/")
assert called_url.endswith("1.17--0")


def test_quay_tag_exists_returns_false_for_missing_tag():
session = FakeSession(head_response=FakeResponse(404))

assert quay_tag_exists("biocontainers", "samtools", "1.17--0", session=session) is False


def test_quay_tag_exists_falls_back_to_repository_metadata(monkeypatch):
session = FakeSession(head_response=FakeResponse(502, text="", json_error=True))

monkeypatch.setattr(
"galaxy.tool_util.deps.mulled.util.quay_repository",
lambda *args, **kwargs: {"tags": {"1.17--0": {}}},
)

assert quay_tag_exists("biocontainers", "samtools", "1.17--0", session=session) is True


def test_quay_tag_exists_does_not_fall_back_for_non_transient_errors(monkeypatch):
session = FakeSession(head_response=FakeResponse(403, payload={"error_type": "forbidden"}))

monkeypatch.setattr("galaxy.tool_util.deps.mulled.util.quay_repository", lambda *args, **kwargs: pytest.fail())

with pytest.raises(QuayApiException):
quay_tag_exists("biocontainers", "samtools", "1.17--0", session=session)


def test_quay_repository_returns_invalid_token_response_for_401():
session = FakeSession(get_response=FakeResponse(401, payload={"error_type": "invalid_token"}))

assert quay_repository("biocontainers", "samtools", session=session) == {"error_type": "invalid_token"}
Loading