Skip to content

Commit 0fa78ad

Browse files
authored
Add wait in image builder protocol (#567)
This adds an option to `flyte.build` to not wait for the image build to complete. Note: when the builder is `remote` and `wait=False`, the `build` method will return the url to the image build task instead of the fully qualified URI --------- Signed-off-by: Niels Bantilan <niels.bantilan@gmail.com>
1 parent 35b1508 commit 0fa78ad

File tree

11 files changed

+133
-40
lines changed

11 files changed

+133
-40
lines changed

examples/advanced/fractal.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,9 @@ async def tree(max_depth: int = 3, n_children: int = 2, index: int = 1, depth: i
3939
async def main_builder():
4040
from flyte._internal.imagebuild.image_builder import ImageBuildEngine
4141

42-
return await ImageBuildEngine.build(env.image)
42+
result = await ImageBuildEngine.build(env.image)
43+
print(f"Built image URI: {result.uri}")
44+
return result
4345

4446

4547
async def main():

examples/image/base_image.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
image = (
77
Image.from_debian_base(install_flyte=False)
88
.with_apt_packages("vim", "wget")
9-
.with_pip_packages("mypy", pre=True)
9+
.with_pip_packages("mypy", "httpx", pre=True)
1010
.with_env_vars({"hello": "world1"})
1111
.with_dockerignore(Path(__file__).parent / ".dockerignore")
1212
.with_local_v2()
@@ -22,6 +22,6 @@ async def t1(data: str = "hello") -> str:
2222

2323
if __name__ == "__main__":
2424
flyte.init_from_config()
25-
run = flyte.run(t1, data="world")
26-
print(run.name)
27-
print(run.url)
25+
result = flyte.build(image, force=True, wait=False)
26+
print(f"URI: {result.uri}")
27+
print(f"Remote run: {result.remote_run}")

examples/image/custom_builder/src/my_builder/my_builder.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,11 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
2020
"""Return the image checker."""
2121
return [MyImageChecker]
2222

23-
async def build_image(self, image: Image, dry_run: bool = False) -> str:
23+
async def build_image(
24+
self,
25+
image: Image,
26+
dry_run: bool = False,
27+
wait: bool = True,
28+
) -> str:
2429
print("Building image locally...")
2530
return image.uri

src/flyte/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
import sys
88

9-
from ._build import build
9+
from ._build import ImageBuild, build
1010
from ._cache import Cache, CachePolicy, CacheRequest
1111
from ._context import ctx
1212
from ._custom_context import custom_context, get_custom_context
@@ -77,6 +77,7 @@ def version() -> str:
7777
"Environment",
7878
"FixedRate",
7979
"Image",
80+
"ImageBuild",
8081
"Link",
8182
"Neuron",
8283
"PodTemplate",

src/flyte/_build.py

Lines changed: 45 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,67 @@
11
from __future__ import annotations
22

3+
from dataclasses import dataclass
4+
from typing import TYPE_CHECKING, Optional
5+
36
from flyte.syncify import syncify
47

58
from ._image import Image
69

10+
if TYPE_CHECKING:
11+
from flyte import remote
12+
13+
14+
@dataclass
15+
class ImageBuild:
16+
"""
17+
Result of an image build operation.
18+
19+
Attributes:
20+
uri: The fully qualified image URI. None if the build was started asynchronously
21+
and hasn't completed yet.
22+
remote_run: The Run object that kicked off an image build job when using the remote
23+
builder. None when using the local builder.
24+
"""
25+
26+
uri: str | None
27+
remote_run: Optional["remote.Run"]
28+
729

830
@syncify
9-
async def build(image: Image) -> str:
31+
async def build(
32+
image: Image,
33+
dry_run: bool = False,
34+
force: bool = False,
35+
wait: bool = True,
36+
) -> ImageBuild:
1037
"""
1138
Build an image. The existing async context will be used.
1239
40+
Args:
41+
image: The image(s) to build.
42+
dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
43+
force: Skip the existence check. Normally if the image already exists we won't build it.
44+
wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will
45+
run in the background.
46+
Returns:
47+
An ImageBuild object containing the image URI and optionally the remote run that kicked off the build.
48+
1349
Example:
1450
```
1551
import flyte
1652
image = flyte.Image("example_image")
1753
if __name__ == "__main__":
18-
asyncio.run(flyte.build.aio(image))
54+
result = asyncio.run(flyte.build.aio(image))
55+
print(result.uri)
1956
```
2057
2158
:param image: The image(s) to build.
22-
:return: The image URI.
59+
:param dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
60+
:param force: Skip the existence check. Normally if the image already exists we won't build it.
61+
:param wait: Wait for the build to finish. If wait is False, the function will return immediately and the build will
62+
run in the background.
63+
:return: An ImageBuild object with the image URI and remote run (if applicable).
2364
"""
2465
from flyte._internal.imagebuild.image_builder import ImageBuildEngine
2566

26-
return await ImageBuildEngine.build(image)
67+
return await ImageBuildEngine.build(image, dry_run=dry_run, force=force, wait=wait)

src/flyte/_deploy.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,12 +343,14 @@ def _update_interface_inputs_and_outputs_docstring(
343343

344344
async def _build_image_bg(env_name: str, image: Image) -> Tuple[str, str]:
345345
"""
346-
Build the image in the background and return the environment name and the built image.
346+
Build the image in the background and return the environment name and the built image URI.
347347
"""
348348
from ._build import build
349349

350350
logger.info(f"Building image {image.name} for environment {env_name}")
351-
return env_name, await build.aio(image)
351+
result = await build.aio(image)
352+
assert result.uri is not None, "Image build result URI is None, make sure to wait for the build to complete"
353+
return env_name, result.uri
352354

353355

354356
async def _build_images(deployment: DeploymentPlan, image_refs: Dict[str, str] | None = None) -> ImageCache:

src/flyte/_internal/imagebuild/docker_builder.py

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
1-
import asyncio
21
import os
32
import shutil
43
import subprocess
54
import tempfile
65
import typing
76
from pathlib import Path, PurePath
87
from string import Template
9-
from typing import ClassVar, Optional, Protocol, cast
8+
from typing import TYPE_CHECKING, ClassVar, Optional, Protocol, cast
109

1110
import aiofiles
1211
import click
1312

1413
from flyte import Secret
14+
1515
from flyte._code_bundle._ignore import STANDARD_IGNORE_PATTERNS
1616
from flyte._image import (
1717
AptPackages,
@@ -45,6 +45,10 @@
4545
get_uv_editable_install_mounts,
4646
)
4747
from flyte._logging import logger
48+
from flyte._utils.asyncify import run_sync_with_loop
49+
50+
if TYPE_CHECKING:
51+
from flyte._build import ImageBuild
4852

4953
_F_IMG_ID = "_F_IMG_ID"
5054
FLYTE_DOCKER_BUILDER_CACHE_FROM = "FLYTE_DOCKER_BUILDER_CACHE_FROM"
@@ -558,22 +562,26 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
558562
# Can get a public token for docker.io but ghcr requires a pat, so harder to get the manifest anonymously
559563
return [LocalDockerCommandImageChecker, LocalPodmanCommandImageChecker, DockerAPIImageChecker]
560564

561-
async def build_image(self, image: Image, dry_run: bool = False) -> str:
565+
async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild":
566+
from flyte._build import ImageBuild
567+
562568
if image.dockerfile:
563569
# If a dockerfile is provided, use it directly
564-
return await self._build_from_dockerfile(image, push=True)
570+
uri = await self._build_from_dockerfile(image, push=True, wait=wait)
571+
return ImageBuild(uri=uri, remote_run=None)
565572

566573
if len(image._layers) == 0:
567574
logger.warning("No layers to build, returning the image URI as is.")
568-
return image.uri
575+
return ImageBuild(uri=image.uri, remote_run=None)
569576

570-
return await self._build_image(
577+
uri = await self._build_image(
571578
image,
572579
push=True,
573580
dry_run=dry_run,
574581
)
582+
return ImageBuild(uri=uri, remote_run=None)
575583

576-
async def _build_from_dockerfile(self, image: Image, push: bool) -> str:
584+
async def _build_from_dockerfile(self, image: Image, push: bool, wait: bool = True) -> str:
577585
"""
578586
Build the image from a provided Dockerfile.
579587
"""
@@ -606,7 +614,10 @@ async def _build_from_dockerfile(self, image: Image, push: bool) -> str:
606614
logger.debug(f"Build command: {concat_command}")
607615
click.secho(f"Run command: {concat_command} ", fg="blue")
608616

609-
await asyncio.to_thread(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True)
617+
if wait:
618+
await run_sync_with_loop(subprocess.run, command, cwd=str(cast(Path, image.dockerfile).cwd()), check=True)
619+
else:
620+
await run_sync_with_loop(subprocess.Popen, command, cwd=str(cast(Path, image.dockerfile).cwd()))
610621

611622
return image.uri
612623

@@ -615,14 +626,14 @@ async def _ensure_buildx_builder():
615626
"""Ensure there is a docker buildx builder called flyte"""
616627
# Check if buildx is available
617628
try:
618-
await asyncio.to_thread(
629+
await run_sync_with_loop(
619630
subprocess.run, ["docker", "buildx", "version"], check=True, stdout=subprocess.DEVNULL
620631
)
621632
except subprocess.CalledProcessError:
622633
raise RuntimeError("Docker buildx is not available. Make sure BuildKit is installed and enabled.")
623634

624635
# List builders
625-
result = await asyncio.to_thread(
636+
result = await run_sync_with_loop(
626637
subprocess.run, ["docker", "buildx", "ls"], capture_output=True, text=True, check=True
627638
)
628639
builders = result.stdout
@@ -631,7 +642,7 @@ async def _ensure_buildx_builder():
631642
if DockerImageBuilder._builder_name not in builders:
632643
# No default builder found, create one
633644
logger.info("No buildx builder found, creating one...")
634-
await asyncio.to_thread(
645+
await run_sync_with_loop(
635646
subprocess.run,
636647
[
637648
"docker",
@@ -647,7 +658,7 @@ async def _ensure_buildx_builder():
647658
else:
648659
logger.info("Buildx builder already exists.")
649660

650-
async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False) -> str:
661+
async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool = False, wait: bool = True) -> str:
651662
"""
652663
if default image (only base image and locked), raise an error, don't have a dockerfile
653664
if dockerfile, just build
@@ -729,7 +740,10 @@ async def _build_image(self, image: Image, *, push: bool = True, dry_run: bool =
729740
click.secho(f"Run command: {concat_command} ", fg="blue")
730741

731742
try:
732-
await asyncio.to_thread(subprocess.run, command, check=True)
743+
if wait:
744+
await run_sync_with_loop(subprocess.run, command, check=True)
745+
else:
746+
await run_sync_with_loop(subprocess.Popen, command)
733747
except subprocess.CalledProcessError as e:
734748
logger.error(f"Failed to build image: {e}")
735749
raise RuntimeError(f"Failed to build image: {e}")

src/flyte/_internal/imagebuild/image_builder.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import json
55
import typing
66
from importlib.metadata import entry_points
7-
from typing import ClassVar, Dict, Optional, Tuple
7+
from typing import TYPE_CHECKING, ClassVar, Dict, Optional, Tuple
88

99
from async_lru import alru_cache
1010
from pydantic import BaseModel
@@ -14,9 +14,12 @@
1414
from flyte._initialize import _get_init_config
1515
from flyte._logging import logger
1616

17+
if TYPE_CHECKING:
18+
from flyte._build import ImageBuild
19+
1720

1821
class ImageBuilder(Protocol):
19-
async def build_image(self, image: Image, dry_run: bool) -> str: ...
22+
async def build_image(self, image: Image, dry_run: bool, wait: bool = True) -> "ImageBuild": ...
2023

2124
def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
2225
"""
@@ -182,7 +185,8 @@ async def build(
182185
builder: ImageBuildEngine.ImageBuilderType | None = None,
183186
dry_run: bool = False,
184187
force: bool = False,
185-
) -> str:
188+
wait: bool = True,
189+
) -> "ImageBuild":
186190
"""
187191
Build the image. Images to be tagged with latest will always be built. Otherwise, this engine will check the
188192
registry to see if the manifest exists.
@@ -191,8 +195,12 @@ async def build(
191195
:param builder:
192196
:param dry_run: Tell the builder to not actually build. Different builders will have different behaviors.
193197
:param force: Skip the existence check. Normally if the image already exists we won't build it.
194-
:return:
198+
:param wait: Wait for the build to finish. If wait is False when using the remote image builder, the function
199+
will return the build image task URL.
200+
:return: An ImageBuild object with the image URI and remote run (if applicable).
195201
"""
202+
from flyte._build import ImageBuild
203+
196204
# Always trigger a build if this is a dry run since builder shouldn't really do anything, or a force.
197205
image_uri = (await cls.image_exists(image)) or image.uri
198206
if force or dry_run or not await cls.image_exists(image):
@@ -208,11 +216,11 @@ async def build(
208216
img_builder = ImageBuildEngine._get_builder(builder)
209217
logger.debug(f"Using `{img_builder}` image builder to build image.")
210218

211-
result = await img_builder.build_image(image, dry_run=dry_run)
219+
result = await img_builder.build_image(image, dry_run=dry_run, wait=wait)
212220
return result
213221
else:
214222
logger.info(f"Image {image_uri} already exists in registry. Skipping build.")
215-
return image_uri
223+
return ImageBuild(uri=image_uri, remote_run=None)
216224

217225
@classmethod
218226
def _get_builder(cls, builder: ImageBuildEngine.ImageBuilderType | None = "local") -> ImageBuilder:

src/flyte/_internal/imagebuild/remote_builder.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@
4848
if TYPE_CHECKING:
4949
from flyteidl2.imagebuilder import definition_pb2 as image_definition_pb2
5050

51+
from flyte._build import ImageBuild
52+
5153
IMAGE_TASK_NAME = "build-image"
5254
IMAGE_TASK_PROJECT = "system"
5355
IMAGE_TASK_DOMAIN = "production"
@@ -109,7 +111,9 @@ def get_checkers(self) -> Optional[typing.List[typing.Type[ImageChecker]]]:
109111
"""Return the image checker."""
110112
return [RemoteImageChecker]
111113

112-
async def build_image(self, image: Image, dry_run: bool = False) -> str:
114+
async def build_image(self, image: Image, dry_run: bool = False, wait: bool = True) -> "ImageBuild":
115+
from flyte._build import ImageBuild
116+
113117
image_name = f"{image.name}:{image._final_tag}"
114118
spec, context = await _validate_configuration(image)
115119

@@ -142,11 +146,15 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str:
142146
project=cfg.project, domain=cfg.domain, cache_lookup_scope="project-domain"
143147
).run.aio(entity, spec=spec, context=context, target_image=target_image),
144148
)
145-
logger.warning(f"⏳ Waiting for build to finish at: [bold cyan link={run.url}]{run.url}[/bold cyan link]")
146149

150+
logger.warning(f"▶️ Started build at: [bold cyan link={run.url}]{run.url}[/bold cyan link]")
151+
if not wait:
152+
# return the ImageBuild with the run object (uri will be None since build hasn't completed)
153+
return ImageBuild(uri=None, remote_run=run)
154+
155+
logger.warning("⏳ Waiting for build to finish")
147156
await run.wait.aio(quiet=True)
148157
run_details = await run.details.aio()
149-
150158
elapsed = str(datetime.now(timezone.utc) - start).split(".")[0]
151159

152160
if run_details.action_details.raw_phase == phase_pb2.ACTION_PHASE_SUCCEEDED:
@@ -155,7 +163,8 @@ async def build_image(self, image: Image, dry_run: bool = False) -> str:
155163
raise flyte.errors.ImageBuildError(f"❌ Build failed in {elapsed} at {run.url}")
156164

157165
outputs = await run_details.outputs()
158-
return _get_fully_qualified_image_name(outputs)
166+
uri = _get_fully_qualified_image_name(outputs)
167+
return ImageBuild(uri=uri, remote_run=run)
159168

160169

161170
async def _validate_configuration(image: Image) -> Tuple[str, Optional[str]]:

0 commit comments

Comments
 (0)