Skip to content
238 changes: 187 additions & 51 deletions src/image.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@

import constants
import logging
import json
import subprocess

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)
LOGGER.setLevel(logging.DEBUG)


class DockerImage:
Expand Down Expand Up @@ -173,10 +173,14 @@ def build(self):
# Conduct some preprocessing before building the image
self.update_pre_build_configuration()

# Start building the image
with open(self.context.context_path, "rb") as context_file:
self.docker_build(fileobj=context_file, custom_context=True)
self.context.remove()
# Start building the image with Buildx
build_start_time = datetime.now()
self.docker_build(context_path=self.context.context_path, custom_context=True)
build_end_time = datetime.now()
duration_seconds = (build_end_time - build_start_time).total_seconds()
LOGGER.info(f"Build duration: {duration_seconds:.2f} seconds")

self.context.remove()

if self.build_status != constants.SUCCESS:
LOGGER.info(f"Exiting with image build status {self.build_status} without image check.")
Expand All @@ -193,64 +197,196 @@ def build(self):
# This return is necessary. Otherwise FORMATTER fails while displaying the status.
return self.build_status

def docker_build(self, fileobj=None, custom_context=False):
def docker_build(self, context_path, custom_context=False):
"""
Uses low level Docker API Client to actually start the process of building the image.
Uses Docker Buildx for vLLM images, falls back to legacy Docker API for others

:param fileobj: FileObject, a readable file-like object pointing to the context tarfile.
:param custom_context: bool
:return: int, Build Status
:param context_path: str, Path to build context
:param custom_context: bool, Whether to use custom context from stdin (default: False)
:return: int, Build status
"""
response = [f"Starting the Build Process for {self.repository}:{self.tag}"]
LOGGER.info(f"Starting the Build Process for {self.repository}:{self.tag}")
if self._is_vllm_image():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the reasoning for using buildx only for vllms?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to do a phased rollout, and this approach minimizes potential impact on our build pipeline while allowing us to validate the new buildx system.

LOGGER.info(f"Using Buildx for vLLM image: {self.repository}:{self.tag}")
return self._buildx_build(context_path, custom_context)
else:
LOGGER.info(f"Using legacy Docker API for non-vLLM image: {self.repository}:{self.tag}")
return self._legacy_docker_build(context_path, custom_context)

line_counter = 0
line_interval = 50
for line in self.client.build(
fileobj=fileobj,
path=self.dockerfile,
custom_context=custom_context,
rm=True,
decode=True,
tag=self.ecr_url,
buildargs=self.build_args,
labels=self.labels,
target=self.target,
):
# print the log line during build for every line_interval lines for debugging
if line_counter % line_interval == 0:
LOGGER.debug(line)
line_counter += 1
def _is_vllm_image(self):
"""
Determine if current image is a vLLM image

if line.get("error") is not None:
response.append(line["error"])
self.log.append(response)
self.build_status = constants.FAIL
self.summary["status"] = constants.STATUS_MESSAGE[self.build_status]
self.summary["end_time"] = datetime.now()
:return: bool, True if this is a vLLM image
"""
return (
self.info.get("framework") == "vllm"
or "vllm" in self.repository.lower()
or "vllm" in str(self.info.get("name", "")).lower()
)

LOGGER.info(f"Docker Build Logs: \n {self.get_tail_logs_in_pretty_format(100)}")
LOGGER.error("ERROR during Docker BUILD")
LOGGER.error(
f"Error message received for {self.dockerfile} while docker build: {line}"
)
def _buildx_build(self, context_path, custom_context=False):
"""
Uses Docker Buildx CLI for building with real-time streaming and advanced caching.

return self.build_status

if line.get("stream") is not None:
response.append(line["stream"])
elif line.get("status") is not None:
response.append(line["status"])
Automatically finds and uses the latest available image as a cache source from ECR
to speed up builds through layer reuse.

:param context_path: str, Path to build context
:param custom_context: bool, Whether to use custom context from stdin (default: False)
:return: int, Build status
"""

response = [f"Starting Buildx Process for {self.repository}:{self.tag}"]
LOGGER.info(f"Starting Buildx Process for {self.repository}:{self.tag}")

cmd = [
"docker",
"buildx",
"build",
"-t",
self.ecr_url,
"--progress=plain", # Real-time log streaming
]

for k, v in self.build_args.items():
cmd.extend(["--build-arg", f"{k}={v}"])

for k, v in self.labels.items():
cmd.extend(["--label", f"{k}={v}"])

if self.target:
cmd.extend(["--target", self.target])

# Always use inline cache-to for maximum caching
cmd.extend(["--cache-to", "type=inline"])

# Use shortest tag from additional_tags as a suitable cache source
latest_tag = min(self.additional_tags, key=len)

if latest_tag:
latest_image_uri = f"{self.repository}:{latest_tag}"
LOGGER.info(f"Using cache from registry: {latest_image_uri}")
cmd.extend(["--cache-from", f"type=registry,ref={latest_image_uri}"])
else:
LOGGER.info("No suitable cache source found. Proceeding without registry cache")

if custom_context:
cmd.append("-")
else:
cmd.append(context_path)

context_tarball = open(context_path, "rb") if custom_context else None

try:
process = subprocess.Popen(
cmd,
stdin=context_tarball,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
universal_newlines=True,
bufsize=1,
)

# Stream output in real-time
for line in iter(process.stdout.readline, ""):
line = line.rstrip()
if line:
response.append(line)
LOGGER.info(line)

process.wait()

if process.returncode == 0:
self.build_status = constants.SUCCESS
LOGGER.info(f"Completed Buildx for {self.repository}:{self.tag}")
else:
response.append(str(line))
self.build_status = constants.FAIL
LOGGER.error(f"Buildx failed for {self.repository}:{self.tag}")

except Exception as e:
response.append(f"Buildx error: {str(e)}")
self.build_status = constants.FAIL
LOGGER.error(f"Buildx exception: {str(e)}")
finally:
if context_tarball:
context_tarball.close()

self.log.append(response)
return self.build_status

LOGGER.info(f"DOCKER BUILD LOGS: \n{self.get_tail_logs_in_pretty_format()}")
LOGGER.info(f"Completed Build for {self.repository}:{self.tag}")
def _legacy_docker_build(self, context_path, custom_context=False):
"""
Uses legacy Docker API Client to build the image (for non-vLLM images).

self.build_status = constants.SUCCESS
return self.build_status
:param context_path: str, Path to build context
:param custom_context: bool, Whether to use custom context from stdin (default: False)
:return: int, Build Status
"""
response = [f"Starting Legacy Docker Build Process for {self.repository}:{self.tag}"]
LOGGER.info(f"Starting Legacy Docker Build Process for {self.repository}:{self.tag}")

# Open context tarball for legacy API
fileobj = open(context_path, "rb") if custom_context else None

line_counter = 0
line_interval = 50

try:
for line in self.client.build(
fileobj=fileobj,
path=self.dockerfile if not custom_context else None,
custom_context=custom_context,
rm=True,
decode=True,
tag=self.ecr_url,
buildargs=self.build_args,
labels=self.labels,
target=self.target,
):
# print the log line during build for every line_interval lines for debugging
if line_counter % line_interval == 0:
LOGGER.debug(line)
line_counter += 1

if line.get("error") is not None:
response.append(line["error"])
self.log.append(response)
self.build_status = constants.FAIL
self.summary["status"] = constants.STATUS_MESSAGE[self.build_status]
self.summary["end_time"] = datetime.now()

LOGGER.info(f"Docker Build Logs: \n {self.get_tail_logs_in_pretty_format(100)}")
LOGGER.error("ERROR during Docker BUILD")
LOGGER.error(
f"Error message received for {self.dockerfile} while docker build: {line}"
)

return self.build_status

if line.get("stream") is not None:
response.append(line["stream"])
elif line.get("status") is not None:
response.append(line["status"])
else:
response.append(str(line))

self.log.append(response)

LOGGER.info(f"DOCKER BUILD LOGS: \n{self.get_tail_logs_in_pretty_format()}")
LOGGER.info(f"Completed Legacy Build for {self.repository}:{self.tag}")

self.build_status = constants.SUCCESS
return self.build_status

except Exception as e:
response.append(f"Legacy Docker build error: {str(e)}")
self.build_status = constants.FAIL
LOGGER.error(f"Legacy Docker build exception: {str(e)}")
return self.build_status
finally:
if fileobj:
fileobj.close()

def image_size_check(self):
"""
Expand Down