Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion examples/advanced/fractal.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ async def tree(max_depth: int = 3, n_children: int = 2, index: int = 1, depth: i
async def main_builder():
from flyte._internal.imagebuild.image_builder import ImageBuildEngine

return await ImageBuildEngine.build(env.image)
result = await ImageBuildEngine.build(env.image)
print(f"Built image URI: {result.uri}")
return result


async def main():
Expand Down
8 changes: 4 additions & 4 deletions examples/image/base_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
image = (
Image.from_debian_base(install_flyte=False)
.with_apt_packages("vim", "wget")
.with_pip_packages("mypy", pre=True)
.with_pip_packages("mypy", "httpx", pre=True)
.with_env_vars({"hello": "world1"})
.with_dockerignore(Path(__file__).parent / ".dockerignore")
.with_local_v2()
Expand All @@ -22,6 +22,6 @@ async def t1(data: str = "hello") -> str:

if __name__ == "__main__":
flyte.init_from_config()
run = flyte.run(t1, data="world")
print(run.name)
print(run.url)
result = flyte.build(image, force=True, wait=False)
print(f"URI: {result.uri}")
print(f"Remote run: {result.remote_run}")
7 changes: 6 additions & 1 deletion examples/image/custom_builder/src/my_builder/my_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
"""Return the image checker."""
return [MyImageChecker]

async def build_image(self, image: Image, dry_run: bool = False) -> str:
async def build_image(
self,
image: Image,
dry_run: bool = False,
wait: bool = True,
) -> str:
print("Building image locally...")
return image.uri
3 changes: 2 additions & 1 deletion src/flyte/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import sys

from ._build import build
from ._build import ImageBuild, build
from ._cache import Cache, CachePolicy, CacheRequest
from ._context import ctx
from ._custom_context import custom_context, get_custom_context
Expand Down Expand Up @@ -77,6 +77,7 @@ def version() -> str:
"Environment",
"FixedRate",
"Image",
"ImageBuild",
"Link",
"Neuron",
"PodTemplate",
Expand Down
49 changes: 45 additions & 4 deletions src/flyte/_build.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,67 @@
from __future__ import annotations

from dataclasses import dataclass
from typing import TYPE_CHECKING, Optional

from flyte.syncify import syncify

from ._image import Image

if TYPE_CHECKING:
from flyte import remote


@dataclass
class ImageBuild:
"""
Result of an image build operation.

Attributes:
uri: The fully qualified image URI. None if the build was started asynchronously
and hasn't completed yet.
remote_run: The Run object that kicked off an image build job when using the remote
builder. None when using the local builder.
"""

uri: str | None
remote_run: Optional["remote.Run"]


@syncify
async def build(image: Image) -> str:
async def build(
image: Image,
dry_run: bool = False,
force: bool = False,
wait: bool = True,
) -> ImageBuild:
"""
Build an image. The existing async context will be used.

Args:
image: The image(s) to build.
dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
force: Skip the existence check. Normally if the image already exists we won't build it.
wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will
run in the background.
Returns:
An ImageBuild object containing the image URI and optionally the remote run that kicked off the build.

Example:
```
import flyte
image = flyte.Image("example_image")
if __name__ == "__main__":
asyncio.run(flyte.build.aio(image))
result = asyncio.run(flyte.build.aio(image))
print(result.uri)
```

:param image: The image(s) to build.
:return: The image URI.
:param dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
:param force: Skip the existence check. Normally if the image already exists we won't build it.
:param wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will
run in the background.
:return: An ImageBuild object with the image URI and remote run (if applicable).
"""
from flyte._internal.imagebuild.image_builder import ImageBuildEngine

return await ImageBuildEngine.build(image)
return await ImageBuildEngine.build(image, dry_run=dry_run, force=force, wait=wait)
6 changes: 4 additions & 2 deletions src/flyte/_deploy.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,12 +343,14 @@ def _update_interface_inputs_and_outputs_docstring(

async def _build_image_bg(env_name: str, image: Image) -> Tuple[str, str]:
"""
Build the image in the background and return the environment name and the built image.
Build the image in the background and return the environment name and the built image URI.
"""
from ._build import build

logger.info(f"Building image {image.name} for environment {env_name}")
return env_name, await build.aio(image)
result = await build.aio(image)
assert result.uri is not None, "Image build result URI is None, make sure to wait for the build to complete"
return env_name, result.uri


async def _build_images(deployment: DeploymentPlan, image_refs: Dict[str, str] | None = None) -> ImageCache:
Expand Down
40 changes: 27 additions & 13 deletions src/flyte/_internal/imagebuild/docker_builder.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import asyncio
import os
import shutil
import subprocess
import tempfile
import typing
from pathlib import Path, PurePath
from string import Template
from typing import ClassVar, Optional, Protocol, cast
from typing import TYPE_CHECKING, ClassVar, Optional, Protocol, cast

import aiofiles
import click

from flyte import Secret

from flyte._code_bundle._ignore import STANDARD_IGNORE_PATTERNS
from flyte._image import (
AptPackages,
Expand Down Expand Up @@ -45,6 +45,10 @@
get_uv_editable_install_mounts,
)
from flyte._logging import logger
from flyte._utils.asyncify import run_sync_with_loop

if TYPE_CHECKING:
from flyte._build import ImageBuild

_F_IMG_ID = "_F_IMG_ID"
FLYTE_DOCKER_BUILDER_CACHE_FROM = "FLYTE_DOCKER_BUILDER_CACHE_FROM"
Expand Down Expand Up @@ -558,22 +562,26 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
# Can get a public token for docker.io but ghcr requires a pat, so harder to get the manifest anonymously
return [LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker]

async def build_image(self, image: Image, dry_run: bool = False) -> str:
async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild":
from flyte._build import ImageBuild

if image.dockerfile:
# If a dockerfile is provided, use it directly
return await self._build_from_dockerfile(image, push=True)
uri = await self._build_from_dockerfile(image, push=True, wait=wait)
return ImageBuild(uri=uri, remote_run=None)

if len(image._layers) == 0:
logger.warning("No layers to build, returning the image URI as is.")
return image.uri
return ImageBuild(uri=image.uri, remote_run=None)

return await self._build_image(
uri = await self._build_image(
image,
push=True,
dry_run=dry_run,
)
return ImageBuild(uri=uri, remote_run=None)

async def _build_from_dockerfile(self, image: Image, push: bool) -> str:
async def _build_from_dockerfile(self, image: Image, push: bool, wait: bool = True) -> str:
"""
Build the image from a provided Dockerfile.
"""
Expand Down Expand Up @@ -606,7 +614,10 @@ async def _build_from_dockerfile(self, image: Image, push: bool) -> str:
logger.debug(f"Build command: {concat_command}")
click.secho(f"Run command: {concat_command} ", fg="blue")

await asyncio.to_thread(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True)
if wait:
await run_sync_with_loop(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True)
else:
await run_sync_with_loop(subprocess.Popen, command, cwd=str(cast(Path, image.dockerfile).cwd()))

return image.uri

Expand All @@ -615,14 +626,14 @@ async def _ensure_buildx_builder():
"""Ensure there is a docker buildx builder called flyte"""
# Check if buildx is available
try:
await asyncio.to_thread(
await run_sync_with_loop(
subprocess.run, ["docker", "buildx", "version"], check=True, stdout=subprocess.DEVNULL
)
except subprocess.CalledProcessError:
raise RuntimeError("Docker buildx is not available. Make sure BuildKit is installed and enabled.")

# List builders
result = await asyncio.to_thread(
result = await run_sync_with_loop(
subprocess.run, ["docker", "buildx", "ls"], capture_output=True, text=True, check=True
)
builders = result.stdout
Expand All @@ -631,7 +642,7 @@ async def _ensure_buildx_builder():
if DockerImageBuilder._builder_name not in builders:
# No default builder found, create one
logger.info("No buildx builder found, creating one...")
await asyncio.to_thread(
await run_sync_with_loop(
subprocess.run,
[
"docker",
Expand All @@ -647,7 +658,7 @@ async def _ensure_buildx_builder():
else:
logger.info("Buildx builder already exists.")

async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False) -> str:
async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False, wait: bool = True) -> str:
"""
if default image (only base image and locked), raise an error, don't have a dockerfile
if dockerfile, just build
Expand Down Expand Up @@ -729,7 +740,10 @@ async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool =
click.secho(f"Run command: {concat_command} ", fg="blue")

try:
await asyncio.to_thread(subprocess.run, command, check=True)
if wait:
await run_sync_with_loop(subprocess.run, command, check=True)
else:
await run_sync_with_loop(subprocess.Popen, command)
except subprocess.CalledProcessError as e:
logger.error(f"Failed to build image: {e}")
raise RuntimeError(f"Failed to build image: {e}")
Expand Down
20 changes: 14 additions & 6 deletions src/flyte/_internal/imagebuild/image_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import json
import typing
from importlib.metadata import entry_points
from typing import ClassVar, Dict, Optional, Tuple
from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple

from async_lru import alru_cache
from pydantic import BaseModel
Expand All @@ -14,9 +14,12 @@
from flyte._initialize import _get_init_config
from flyte._logging import logger

if TYPE_CHECKING:
from flyte._build import ImageBuild


class ImageBuilder(Protocol):
async def build_image(self, image: Image, dry_run: bool) -> str: ...
async def build_image(self, image: Image, dry_run: bool, wait: bool = True) -> "ImageBuild": ...

def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
"""
Expand Down Expand Up @@ -182,7 +185,8 @@ async def build(
builder: ImageBuildEngine.ImageBuilderType | None = None,
dry_run: bool = False,
force: bool = False,
) -> str:
wait: bool = True,
) -> "ImageBuild":
"""
Build the image. Images to be tagged with latest will always be built. Otherwise, this engine will check the
registry to see if the manifest exists.
Expand All @@ -191,8 +195,12 @@ async def build(
:param builder:
:param dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
:param force: Skip the existence check. Normally if the image already exists we won't build it.
:return:
:param wait: Wait for the build to finish. If wait is False when using the remote image builder, the function
will return the build image task URL.
:return: An ImageBuild object with the image URI and remote run (if applicable).
"""
from flyte._build import ImageBuild

# Always trigger a build if this is a dry run since builder shouldn't really do anything, or a force.
image_uri = (await cls.image_exists(image)) or image.uri
if force or dry_run or not await cls.image_exists(image):
Expand All @@ -208,11 +216,11 @@ async def build(
img_builder = ImageBuildEngine._get_builder(builder)
logger.debug(f"Using `{img_builder}` image builder to build image.")

result = await img_builder.build_image(image, dry_run=dry_run)
result = await img_builder.build_image(image, dry_run=dry_run, wait=wait)
return result
else:
logger.info(f"Image {image_uri} already exists in registry. Skipping build.")
return image_uri
return ImageBuild(uri=image_uri, remote_run=None)

@classmethod
def _get_builder(cls, builder: ImageBuildEngine.ImageBuilderType | None = "local") -> ImageBuilder:
Expand Down
17 changes: 13 additions & 4 deletions src/flyte/_internal/imagebuild/remote_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
if TYPE_CHECKING:
from flyteidl2.imagebuilder import definition_pb2 as image_definition_pb2

from flyte._build import ImageBuild

IMAGE_TASK_NAME = "build-image"
IMAGE_TASK_PROJECT = "system"
IMAGE_TASK_DOMAIN = "production"
Expand Down Expand Up @@ -109,7 +111,9 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
"""Return the image checker."""
return [RemoteImageChecker]

async def build_image(self, image: Image, dry_run: bool = False) -> str:
async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild":
from flyte._build import ImageBuild

image_name = f"{image.name}:{image._final_tag}"
spec, context = await _validate_configuration(image)

Expand Down Expand Up @@ -142,11 +146,15 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str:
project=cfg.project, domain=cfg.domain, cache_lookup_scope="project-domain"
).run.aio(entity, spec=spec, context=context, target_image=target_image),
)
logger.warning(f"⏳ Waiting for build to finish at: [bold cyan link={run.url}]{run.url}[/bold cyan link]")

logger.warning(f"▶️ Started build at: [bold cyan link={run.url}]{run.url}[/bold cyan link]")
if not wait:
# return the ImageBuild with the run object (uri will be None since build hasn't completed)
return ImageBuild(uri=None, remote_run=run)

logger.warning("⏳ Waiting for build to finish")
await run.wait.aio(quiet=True)
run_details = await run.details.aio()

elapsed = str(datetime.now(timezone.utc) - start).split(".")[0]

if run_details.action_details.raw_phase == phase_pb2.ACTION_PHASE_SUCCEEDED:
Expand All @@ -155,7 +163,8 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str:
raise flyte.errors.ImageBuildError(f"❌ Build failed in {elapsed} at {run.url}")

outputs = await run_details.outputs()
return _get_fully_qualified_image_name(outputs)
uri = _get_fully_qualified_image_name(outputs)
return ImageBuild(uri=uri, remote_run=run)


async def _validate_configuration(image: Image) -> Tuple[str, Optional[str]]:
Expand Down
Loading
Loading