diff --git a/examples/advanced/fractal.py b/examples/advanced/fractal.py index 6441b7d84..78b8feda2 100644 --- a/examples/advanced/fractal.py +++ b/examples/advanced/fractal.py @@ -39,7 +39,9 @@ async def tree(max_depth: int = 3, n_children: int = 2, index: int = 1, depth: i async def main_builder(): from flyte._internal.imagebuild.image_builder import ImageBuildEngine - return await ImageBuildEngine.build(env.image) + result = await ImageBuildEngine.build(env.image) + print(f"Built image URI: {result.uri}") + return result async def main(): diff --git a/examples/image/base_image.py b/examples/image/base_image.py index ce7fd7208..419c9970c 100644 --- a/examples/image/base_image.py +++ b/examples/image/base_image.py @@ -6,7 +6,7 @@ image = ( Image.from_debian_base(install_flyte=False) .with_apt_packages("vim", "wget") - .with_pip_packages("mypy", pre=True) + .with_pip_packages("mypy", "httpx", pre=True) .with_env_vars({"hello": "world1"}) .with_dockerignore(Path(__file__).parent / ".dockerignore") .with_local_v2() @@ -22,6 +22,6 @@ async def t1(data: str = "hello") -> str: if __name__ == "__main__": flyte.init_from_config() - run = flyte.run(t1, data="world") - print(run.name) - print(run.url) + result = flyte.build(image, force=True, wait=False) + print(f"URI: {result.uri}") + print(f"Remote run: {result.remote_run}") diff --git a/examples/image/custom_builder/src/my_builder/my_builder.py b/examples/image/custom_builder/src/my_builder/my_builder.py index c3cb1f145..1910925f1 100644 --- a/examples/image/custom_builder/src/my_builder/my_builder.py +++ b/examples/image/custom_builder/src/my_builder/my_builder.py @@ -20,6 +20,11 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: """Return the image checker.""" return [MyImageChecker] - async def build_image(self, image: Image, dry_run: bool = False) -> str: + async def build_image( + self, + image: Image, + dry_run: bool = False, + wait: bool = True, + ) -> str: print("Building image locally...") return image.uri diff --git a/src/flyte/__init__.py b/src/flyte/__init__.py index e86562e0f..808ec95c6 100644 --- a/src/flyte/__init__.py +++ b/src/flyte/__init__.py @@ -6,7 +6,7 @@ import sys -from ._build import build +from ._build import ImageBuild, build from ._cache import Cache, CachePolicy, CacheRequest from ._context import ctx from ._custom_context import custom_context, get_custom_context @@ -77,6 +77,7 @@ def version() -> str: "Environment", "FixedRate", "Image", + "ImageBuild", "Link", "Neuron", "PodTemplate", diff --git a/src/flyte/_build.py b/src/flyte/_build.py index 0aa0654ed..03b0b9e7e 100644 --- a/src/flyte/_build.py +++ b/src/flyte/_build.py @@ -1,26 +1,67 @@ from __future__ import annotations +from dataclasses import dataclass +from typing import TYPE_CHECKING, Optional + from flyte.syncify import syncify from ._image import Image +if TYPE_CHECKING: + from flyte import remote + + +@dataclass +class ImageBuild: + """ + Result of an image build operation. + + Attributes: + uri: The fully qualified image URI. None if the build was started asynchronously + and hasn't completed yet. + remote_run: The Run object that kicked off an image build job when using the remote + builder. None when using the local builder. + """ + + uri: str | None + remote_run: Optional["remote.Run"] + @syncify -async def build(image: Image) -> str: +async def build( + image: Image, + dry_run: bool = False, + force: bool = False, + wait: bool = True, +) -> ImageBuild: """ Build an image. The existing async context will be used. + Args: + image: The image(s) to build. + dry_run: Tell the builder to not actually build. Different builders will have different behaviors. + force: Skip the existence check. Normally if the image already exists we won't build it. + wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will + run in the background. + Returns: + An ImageBuild object containing the image URI and optionally the remote run that kicked off the build. + Example: ``` import flyte image = flyte.Image("example_image") if __name__ == "__main__": - asyncio.run(flyte.build.aio(image)) + result = asyncio.run(flyte.build.aio(image)) + print(result.uri) ``` :param image: The image(s) to build. - :return: The image URI. + :param dry_run: Tell the builder to not actually build. Different builders will have different behaviors. + :param force: Skip the existence check. Normally if the image already exists we won't build it. + :param wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will + run in the background. + :return: An ImageBuild object with the image URI and remote run (if applicable). """ from flyte._internal.imagebuild.image_builder import ImageBuildEngine - return await ImageBuildEngine.build(image) + return await ImageBuildEngine.build(image, dry_run=dry_run, force=force, wait=wait) diff --git a/src/flyte/_deploy.py b/src/flyte/_deploy.py index e7b3c06e2..a2eac3b14 100644 --- a/src/flyte/_deploy.py +++ b/src/flyte/_deploy.py @@ -343,12 +343,14 @@ def _update_interface_inputs_and_outputs_docstring( async def _build_image_bg(env_name: str, image: Image) -> Tuple[str, str]: """ - Build the image in the background and return the environment name and the built image. + Build the image in the background and return the environment name and the built image URI. """ from ._build import build logger.info(f"Building image {image.name} for environment {env_name}") - return env_name, await build.aio(image) + result = await build.aio(image) + assert result.uri is not None, "Image build result URI is None, make sure to wait for the build to complete" + return env_name, result.uri async def _build_images(deployment: DeploymentPlan, image_refs: Dict[str, str] | None = None) -> ImageCache: diff --git a/src/flyte/_internal/imagebuild/docker_builder.py b/src/flyte/_internal/imagebuild/docker_builder.py index 95d7878bc..d318aab8b 100644 --- a/src/flyte/_internal/imagebuild/docker_builder.py +++ b/src/flyte/_internal/imagebuild/docker_builder.py @@ -1,4 +1,3 @@ -import asyncio import os import shutil import subprocess @@ -6,12 +5,13 @@ import typing from pathlib import Path, PurePath from string import Template -from typing import ClassVar, Optional, Protocol, cast +from typing import TYPE_CHECKING, ClassVar, Optional, Protocol, cast import aiofiles import click from flyte import Secret + from flyte._code_bundle._ignore import STANDARD_IGNORE_PATTERNS from flyte._image import ( AptPackages, @@ -45,6 +45,10 @@ get_uv_editable_install_mounts, ) from flyte._logging import logger +from flyte._utils.asyncify import run_sync_with_loop + +if TYPE_CHECKING: + from flyte._build import ImageBuild _F_IMG_ID = "_F_IMG_ID" FLYTE_DOCKER_BUILDER_CACHE_FROM = "FLYTE_DOCKER_BUILDER_CACHE_FROM" @@ -558,22 +562,26 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: # Can get a public token for docker.io but ghcr requires a pat, so harder to get the manifest anonymously return [LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker] - async def build_image(self, image: Image, dry_run: bool = False) -> str: + async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild": + from flyte._build import ImageBuild + if image.dockerfile: # If a dockerfile is provided, use it directly - return await self._build_from_dockerfile(image, push=True) + uri = await self._build_from_dockerfile(image, push=True, wait=wait) + return ImageBuild(uri=uri, remote_run=None) if len(image._layers) == 0: logger.warning("No layers to build, returning the image URI as is.") - return image.uri + return ImageBuild(uri=image.uri, remote_run=None) - return await self._build_image( + uri = await self._build_image( image, push=True, dry_run=dry_run, ) + return ImageBuild(uri=uri, remote_run=None) - async def _build_from_dockerfile(self, image: Image, push: bool) -> str: + async def _build_from_dockerfile(self, image: Image, push: bool, wait: bool = True) -> str: """ Build the image from a provided Dockerfile. """ @@ -606,7 +614,10 @@ async def _build_from_dockerfile(self, image: Image, push: bool) -> str: logger.debug(f"Build command: {concat_command}") click.secho(f"Run command: {concat_command} ", fg="blue") - await asyncio.to_thread(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True) + if wait: + await run_sync_with_loop(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True) + else: + await run_sync_with_loop(subprocess.Popen, command, cwd=str(cast(Path, image.dockerfile).cwd())) return image.uri @@ -615,14 +626,14 @@ async def _ensure_buildx_builder(): """Ensure there is a docker buildx builder called flyte""" # Check if buildx is available try: - await asyncio.to_thread( + await run_sync_with_loop( subprocess.run, ["docker", "buildx", "version"], check=True, stdout=subprocess.DEVNULL ) except subprocess.CalledProcessError: raise RuntimeError("Docker buildx is not available. Make sure BuildKit is installed and enabled.") # List builders - result = await asyncio.to_thread( + result = await run_sync_with_loop( subprocess.run, ["docker", "buildx", "ls"], capture_output=True, text=True, check=True ) builders = result.stdout @@ -631,7 +642,7 @@ async def _ensure_buildx_builder(): if DockerImageBuilder._builder_name not in builders: # No default builder found, create one logger.info("No buildx builder found, creating one...") - await asyncio.to_thread( + await run_sync_with_loop( subprocess.run, [ "docker", @@ -647,7 +658,7 @@ async def _ensure_buildx_builder(): else: logger.info("Buildx builder already exists.") - async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False) -> str: + async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False, wait: bool = True) -> str: """ if default image (only base image and locked), raise an error, don't have a dockerfile if dockerfile, just build @@ -729,7 +740,10 @@ async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = click.secho(f"Run command: {concat_command} ", fg="blue") try: - await asyncio.to_thread(subprocess.run, command, check=True) + if wait: + await run_sync_with_loop(subprocess.run, command, check=True) + else: + await run_sync_with_loop(subprocess.Popen, command) except subprocess.CalledProcessError as e: logger.error(f"Failed to build image: {e}") raise RuntimeError(f"Failed to build image: {e}") diff --git a/src/flyte/_internal/imagebuild/image_builder.py b/src/flyte/_internal/imagebuild/image_builder.py index 11c19a07b..dad8f00e1 100644 --- a/src/flyte/_internal/imagebuild/image_builder.py +++ b/src/flyte/_internal/imagebuild/image_builder.py @@ -4,7 +4,7 @@ import json import typing from importlib.metadata import entry_points -from typing import ClassVar, Dict, Optional, Tuple +from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple from async_lru import alru_cache from pydantic import BaseModel @@ -14,9 +14,12 @@ from flyte._initialize import _get_init_config from flyte._logging import logger +if TYPE_CHECKING: + from flyte._build import ImageBuild + class ImageBuilder(Protocol): - async def build_image(self, image: Image, dry_run: bool) -> str: ... + async def build_image(self, image: Image, dry_run: bool, wait: bool = True) -> "ImageBuild": ... def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: """ @@ -182,7 +185,8 @@ async def build( builder: ImageBuildEngine.ImageBuilderType | None = None, dry_run: bool = False, force: bool = False, - ) -> str: + wait: bool = True, + ) -> "ImageBuild": """ Build the image. Images to be tagged with latest will always be built. Otherwise, this engine will check the registry to see if the manifest exists. @@ -191,8 +195,12 @@ async def build( :param builder: :param dry_run: Tell the builder to not actually build. Different builders will have different behaviors. :param force: Skip the existence check. Normally if the image already exists we won't build it. - :return: + :param wait: Wait for the build to finish. If wait is False when using the remote image builder, the function + will return the build image task URL. + :return: An ImageBuild object with the image URI and remote run (if applicable). """ + from flyte._build import ImageBuild + # Always trigger a build if this is a dry run since builder shouldn't really do anything, or a force. image_uri = (await cls.image_exists(image)) or image.uri if force or dry_run or not await cls.image_exists(image): @@ -208,11 +216,11 @@ async def build( img_builder = ImageBuildEngine._get_builder(builder) logger.debug(f"Using `{img_builder}` image builder to build image.") - result = await img_builder.build_image(image, dry_run=dry_run) + result = await img_builder.build_image(image, dry_run=dry_run, wait=wait) return result else: logger.info(f"Image {image_uri} already exists in registry. Skipping build.") - return image_uri + return ImageBuild(uri=image_uri, remote_run=None) @classmethod def _get_builder(cls, builder: ImageBuildEngine.ImageBuilderType | None = "local") -> ImageBuilder: diff --git a/src/flyte/_internal/imagebuild/remote_builder.py b/src/flyte/_internal/imagebuild/remote_builder.py index bf22c3b87..a751b8206 100644 --- a/src/flyte/_internal/imagebuild/remote_builder.py +++ b/src/flyte/_internal/imagebuild/remote_builder.py @@ -48,6 +48,8 @@ if TYPE_CHECKING: from flyteidl2.imagebuilder import definition_pb2 as image_definition_pb2 + from flyte._build import ImageBuild + IMAGE_TASK_NAME = "build-image" IMAGE_TASK_PROJECT = "system" IMAGE_TASK_DOMAIN = "production" @@ -109,7 +111,9 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]: """Return the image checker.""" return [RemoteImageChecker] - async def build_image(self, image: Image, dry_run: bool = False) -> str: + async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild": + from flyte._build import ImageBuild + image_name = f"{image.name}:{image._final_tag}" spec, context = await _validate_configuration(image) @@ -142,11 +146,15 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str: project=cfg.project, domain=cfg.domain, cache_lookup_scope="project-domain" ).run.aio(entity, spec=spec, context=context, target_image=target_image), ) - logger.warning(f"⏳ Waiting for build to finish at: [bold cyan link={run.url}]{run.url}[/bold cyan link]") + logger.warning(f"▶️ Started build at: [bold cyan link={run.url}]{run.url}[/bold cyan link]") + if not wait: + # return the ImageBuild with the run object (uri will be None since build hasn't completed) + return ImageBuild(uri=None, remote_run=run) + + logger.warning("⏳ Waiting for build to finish") await run.wait.aio(quiet=True) run_details = await run.details.aio() - elapsed = str(datetime.now(timezone.utc) - start).split(".")[0] if run_details.action_details.raw_phase == phase_pb2.ACTION_PHASE_SUCCEEDED: @@ -155,7 +163,8 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str: raise flyte.errors.ImageBuildError(f"❌ Build failed in {elapsed} at {run.url}") outputs = await run_details.outputs() - return _get_fully_qualified_image_name(outputs) + uri = _get_fully_qualified_image_name(outputs) + return ImageBuild(uri=uri, remote_run=run) async def _validate_configuration(image: Image) -> Tuple[str, Optional[str]]: diff --git a/tests/flyte/imagebuild/test_image_build_engine.py b/tests/flyte/imagebuild/test_image_build_engine.py index e5aa006ef..3b45a22bb 100644 --- a/tests/flyte/imagebuild/test_image_build_engine.py +++ b/tests/flyte/imagebuild/test_image_build_engine.py @@ -1,6 +1,7 @@ import mock import pytest +from flyte._build import ImageBuild from flyte._image import Image from flyte._internal.imagebuild.image_builder import ( DockerAPIImageChecker, @@ -33,11 +34,14 @@ async def test_cached(mock_checker_cli, mock_checker_api): async def test_cached_2(mock_image_exists, mock_get_builder): mock_image_exists.return_value = False mock_builder = mock.AsyncMock() - mock_builder.build_image.return_value = "docker.io/test-image:v1.0" + mock_builder.build_image.return_value = ImageBuild(uri="docker.io/test-image:v1.0", remote_run=None) mock_get_builder.return_value = mock_builder img = Image.from_debian_base() - await ImageBuildEngine.build(image=img) + result = await ImageBuildEngine.build(image=img) + assert isinstance(result, ImageBuild) + assert result.uri == "docker.io/test-image:v1.0" + assert result.remote_run is None await ImageBuildEngine.build(image=img) mock_builder.build_image.assert_called_once() diff --git a/tests/flyte/test_int_image_builder.py b/tests/flyte/test_int_image_builder.py index ccdc64cd8..5fa2d2c66 100644 --- a/tests/flyte/test_int_image_builder.py +++ b/tests/flyte/test_int_image_builder.py @@ -1,5 +1,6 @@ import pytest +from flyte._build import ImageBuild from flyte._image import Image from flyte._internal.imagebuild.image_builder import ImageBuildEngine @@ -8,7 +9,11 @@ @pytest.mark.asyncio async def test_real_build(): default_image = Image.from_debian_base() - await ImageBuildEngine.build(default_image, force=True) + result = await ImageBuildEngine.build(default_image, force=True) + assert isinstance(result, ImageBuild) + assert result.uri is not None + # Local builder doesn't create a remote run + assert result.remote_run is None # Can't figure out how to run this locally... getting github auth error. @@ -17,7 +22,9 @@ async def test_real_build(): @pytest.mark.asyncio async def test_real_build_copied(): default_image = Image.from_debian_base(registry="ghcr.io/flyteorg", name="flyte-example") - await ImageBuildEngine.build(default_image, force=True) + result = await ImageBuildEngine.build(default_image, force=True) + assert isinstance(result, ImageBuild) + assert result.uri is not None def test_real_build_copiedfsaf():