Skip to content

Commit 88037dc

Browse files
authored
[BEAM-10844] Add experiment option prebuild_sdk_container to prebuild python sdk container with dependencies. (#12727)
1 parent c0bde2b commit 88037dc

File tree

11 files changed

+531
-115
lines changed

11 files changed

+531
-115
lines changed

sdks/python/apache_beam/examples/wordcount_it_test.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,13 @@ def test_wordcount_it(self):
5656
def test_wordcount_fnapi_it(self):
5757
self._run_wordcount_it(wordcount.run, experiment='beam_fn_api')
5858

59+
@attr('ValidatesContainer')
60+
def test_wordcount_it_with_prebuilt_sdk_container(self):
61+
self._run_wordcount_it(
62+
wordcount.run,
63+
experiment='beam_fn_api',
64+
prebuild_sdk_container_engine='local_docker')
65+
5966
def _run_wordcount_it(self, run_wordcount, **opts):
6067
test_pipeline = TestPipeline(is_integration_test=True)
6168
extra_opts = {}

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1037,6 +1037,33 @@ def _add_argparse_args(cls, parser):
10371037
'staged in the staging area (--staging_location option) and the '
10381038
'workers will install them in same order they were specified on '
10391039
'the command line.'))
1040+
parser.add_argument(
1041+
'--prebuild_sdk_container_engine',
1042+
choices=['local_docker', 'cloud_build'],
1043+
help=(
1044+
'Prebuild sdk worker container image before job submission. If '
1045+
'enabled, SDK invokes the boot sequence in SDK worker '
1046+
'containers to install all pipeline dependencies in the '
1047+
'container, and uses the prebuilt image in the pipeline '
1048+
'environment. This may speed up pipeline execution. To enable, '
1049+
'select the Docker build engine: local_docker using '
1050+
'locally-installed Docker or cloud_build for using Google Cloud '
1051+
'Build (requires a GCP project with Cloud Build API enabled).'))
1052+
parser.add_argument(
1053+
'--prebuild_sdk_container_base_image',
1054+
default=None,
1055+
help=(
1056+
'The base image to use when pre-building the sdk container image '
1057+
'with dependencies, if not specified, by default the released '
1058+
'public apache beam python sdk container image corresponding to '
1059+
'the sdk version will be used, if a dev sdk is used the base '
1060+
'image will default to the latest released sdk image.'))
1061+
parser.add_argument(
1062+
'--docker_registry_push_url',
1063+
default=None,
1064+
help=(
1065+
'Docker registry url to use for tagging and pushing the prebuilt '
1066+
'sdk worker container image.'))
10401067

10411068

10421069
class PortableOptions(PipelineOptions):

sdks/python/apache_beam/runners/dataflow/dataflow_runner.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -478,10 +478,19 @@ def run_pipeline(self, pipeline, options):
478478

479479
use_fnapi = apiclient._use_fnapi(options)
480480
from apache_beam.transforms import environments
481-
self._default_environment = (
482-
environments.DockerEnvironment.from_container_image(
483-
apiclient.get_container_image_from_options(options),
484-
artifacts=environments.python_sdk_dependencies(options)))
481+
if options.view_as(SetupOptions).prebuild_sdk_container_engine:
482+
# if prebuild_sdk_container_engine is specified we will build a new sdk
483+
# container image with dependencies pre-installed and use that image,
484+
# instead of using the inferred default container image.
485+
self._default_environment = (
486+
environments.DockerEnvironment.from_options(options))
487+
options.view_as(WorkerOptions).worker_harness_container_image = (
488+
self._default_environment.container_image)
489+
else:
490+
self._default_environment = (
491+
environments.DockerEnvironment.from_container_image(
492+
apiclient.get_container_image_from_options(options),
493+
artifacts=environments.python_sdk_dependencies(options)))
485494

486495
# This has to be performed before pipeline proto is constructed to make sure
487496
# that the changes are reflected in the portable job submission path.
Lines changed: 284 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,284 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one or more
3+
# contributor license agreements. See the NOTICE file distributed with
4+
# this work for additional information regarding copyright ownership.
5+
# The ASF licenses this file to You under the Apache License, Version 2.0
6+
# (the "License"); you may not use this file except in compliance with
7+
# the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing, software
12+
# distributed under the License is distributed on an "AS IS" BASIS,
13+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
# See the License for the specific language governing permissions and
15+
# limitations under the License.
16+
#
17+
18+
"""SdkContainerImageBuilder builds the portable SDK container with dependencies.
19+
20+
It copies the right boot dependencies, namely: apache beam sdk, python packages
21+
from requirements.txt, python packages from extra_packages.txt, workflow
22+
tarball, into the latest public python sdk container image, and run the
23+
dependencies installation in advance with the boot program in setup only mode
24+
to build the new image.
25+
"""
26+
27+
from __future__ import absolute_import
28+
29+
import json
30+
import logging
31+
import os
32+
import shutil
33+
import subprocess
34+
import sys
35+
import tarfile
36+
import tempfile
37+
import time
38+
import uuid
39+
40+
from google.protobuf.duration_pb2 import Duration
41+
from google.protobuf.json_format import MessageToJson
42+
43+
from apache_beam import version as beam_version
44+
from apache_beam.internal.gcp.auth import get_service_credentials
45+
from apache_beam.internal.http_client import get_new_http
46+
from apache_beam.io.gcp.internal.clients import storage
47+
from apache_beam.options.pipeline_options import GoogleCloudOptions
48+
from apache_beam.options.pipeline_options import PipelineOptions # pylint: disable=unused-import
49+
from apache_beam.options.pipeline_options import SetupOptions
50+
from apache_beam.portability import common_urns
51+
from apache_beam.portability.api import beam_runner_api_pb2
52+
from apache_beam.runners.portability.stager import Stager
53+
54+
ARTIFACTS_CONTAINER_DIR = '/opt/apache/beam/artifacts'
55+
ARTIFACTS_MANIFEST_FILE = 'artifacts_info.json'
56+
SDK_CONTAINER_ENTRYPOINT = '/opt/apache/beam/boot'
57+
DOCKERFILE_TEMPLATE = (
58+
"""FROM {base_image}
59+
RUN mkdir -p {workdir}
60+
COPY ./* {workdir}/
61+
RUN {entrypoint} --setup_only --artifacts {workdir}/{manifest_file}
62+
""")
63+
64+
SOURCE_FOLDER = 'source'
65+
_LOGGER = logging.getLogger(__name__)
66+
67+
68+
class SdkContainerImageBuilder(object):
69+
def __init__(self, options):
70+
self._options = options
71+
self._docker_registry_push_url = self._options.view_as(
72+
SetupOptions).docker_registry_push_url
73+
version = (
74+
beam_version.__version__
75+
if 'dev' not in beam_version.__version__ else 'latest')
76+
self._base_image = (
77+
self._options.view_as(SetupOptions).prebuild_sdk_container_base_image or
78+
'apache/beam_python%s.%s_sdk:%s' %
79+
(sys.version_info[0], sys.version_info[1], version))
80+
self._temp_src_dir = None
81+
82+
def build(self):
83+
container_image_tag = str(uuid.uuid4())
84+
container_image_name = os.path.join(
85+
self._docker_registry_push_url or '',
86+
'beam_python_prebuilt_sdk:%s' % container_image_tag)
87+
with tempfile.TemporaryDirectory() as temp_folder:
88+
self._temp_src_dir = temp_folder
89+
self.prepare_dependencies()
90+
self.invoke_docker_build_and_push(container_image_name)
91+
92+
return container_image_name
93+
94+
def prepare_dependencies(self):
95+
with tempfile.TemporaryDirectory() as tmp:
96+
resources = Stager.create_job_resources(self._options, tmp)
97+
# make a copy of the staged artifacts into the temp source folder.
98+
for path, name in resources:
99+
shutil.copyfile(path, os.path.join(self._temp_src_dir, name))
100+
with open(os.path.join(self._temp_src_dir, 'Dockerfile'), 'w') as file:
101+
file.write(
102+
DOCKERFILE_TEMPLATE.format(
103+
base_image=self._base_image,
104+
workdir=ARTIFACTS_CONTAINER_DIR,
105+
manifest_file=ARTIFACTS_MANIFEST_FILE,
106+
entrypoint=SDK_CONTAINER_ENTRYPOINT))
107+
self.generate_artifacts_manifests_json_file(resources, self._temp_src_dir)
108+
109+
def invoke_docker_build_and_push(self, container_image_name):
110+
raise NotImplementedError
111+
112+
@staticmethod
113+
def generate_artifacts_manifests_json_file(resources, temp_dir):
114+
infos = []
115+
for _, name in resources:
116+
info = beam_runner_api_pb2.ArtifactInformation(
117+
type_urn=common_urns.StandardArtifacts.Types.FILE.urn,
118+
type_payload=beam_runner_api_pb2.ArtifactFilePayload(
119+
path=name).SerializeToString(),
120+
)
121+
infos.append(json.dumps(MessageToJson(info)))
122+
with open(os.path.join(temp_dir, ARTIFACTS_MANIFEST_FILE), 'w') as file:
123+
file.write('[\n' + ',\n'.join(infos) + '\n]')
124+
125+
@classmethod
126+
def build_container_image(cls, pipeline_options: PipelineOptions) -> str:
127+
setup_options = pipeline_options.view_as(SetupOptions)
128+
container_build_engine = setup_options.prebuild_sdk_container_engine
129+
if container_build_engine:
130+
if container_build_engine == 'local_docker':
131+
builder = _SdkContainerImageLocalBuilder(
132+
pipeline_options) # type: SdkContainerImageBuilder
133+
elif container_build_engine == 'cloud_build':
134+
builder = _SdkContainerImageCloudBuilder(pipeline_options)
135+
else:
136+
raise ValueError(
137+
'Only (--prebuild_sdk_container_engine local_docker) and '
138+
'(--prebuild_sdk_container_engine cloud_build) are supported')
139+
else:
140+
raise ValueError('No --prebuild_sdk_container_engine option specified.')
141+
return builder.build()
142+
143+
144+
class _SdkContainerImageLocalBuilder(SdkContainerImageBuilder):
145+
"""SdkContainerLocalBuilder builds the sdk container image with local
146+
docker."""
147+
def invoke_docker_build_and_push(self, container_image_name):
148+
try:
149+
_LOGGER.info("Building sdk container, this may take a few minutes...")
150+
now = time.time()
151+
subprocess.run(['docker', 'build', '.', '-t', container_image_name],
152+
capture_output=True,
153+
check=True,
154+
cwd=self._temp_src_dir)
155+
except subprocess.CalledProcessError as err:
156+
raise RuntimeError(
157+
'Failed to build sdk container with local docker, '
158+
'stderr:\n %s.' % err.stderr)
159+
else:
160+
_LOGGER.info(
161+
"Successfully built %s in %.2f seconds" %
162+
(container_image_name, time.time() - now))
163+
164+
if self._docker_registry_push_url:
165+
_LOGGER.info("Pushing prebuilt sdk container...")
166+
try:
167+
subprocess.run(['docker', 'push', container_image_name],
168+
capture_output=True,
169+
check=True)
170+
except subprocess.CalledProcessError as err:
171+
raise RuntimeError(
172+
'Failed to push prebuilt sdk container %s, stderr: \n%s' %
173+
(container_image_name, err.stderr))
174+
_LOGGER.info(
175+
"Successfully pushed %s in %.2f seconds" %
176+
(container_image_name, time.time() - now))
177+
else:
178+
_LOGGER.info(
179+
"no --docker_registry_push_url option is specified in pipeline "
180+
"options, specify it if the new image is intended to be "
181+
"pushed to a registry.")
182+
183+
184+
class _SdkContainerImageCloudBuilder(SdkContainerImageBuilder):
185+
"""SdkContainerLocalBuilder builds the sdk container image with google cloud
186+
build."""
187+
def __init__(self, options):
188+
super().__init__(options)
189+
self._google_cloud_options = options.view_as(GoogleCloudOptions)
190+
if self._google_cloud_options.no_auth:
191+
credentials = None
192+
else:
193+
credentials = get_service_credentials()
194+
self._storage_client = storage.StorageV1(
195+
url='https://www.googleapis.com/storage/v1',
196+
credentials=credentials,
197+
get_credentials=(not self._google_cloud_options.no_auth),
198+
http=get_new_http(),
199+
response_encoding='utf8')
200+
if not self._docker_registry_push_url:
201+
self._docker_registry_push_url = (
202+
'gcr.io/%s' % self._google_cloud_options.project)
203+
204+
def invoke_docker_build_and_push(self, container_image_name):
205+
project_id = self._google_cloud_options.project
206+
temp_location = self._google_cloud_options.temp_location
207+
# google cloud build service expects all the build source file to be
208+
# compressed into a tarball.
209+
tarball_path = os.path.join(self._temp_src_dir, '%s.tgz' % SOURCE_FOLDER)
210+
self._make_tarfile(tarball_path, self._temp_src_dir)
211+
_LOGGER.info(
212+
"Compressed source files for building sdk container at %s" %
213+
tarball_path)
214+
215+
container_image_tag = container_image_name.split(':')[-1]
216+
gcs_location = os.path.join(
217+
temp_location, '%s-%s.tgz' % (SOURCE_FOLDER, container_image_tag))
218+
self._upload_to_gcs(tarball_path, gcs_location)
219+
220+
from google.cloud.devtools import cloudbuild_v1
221+
client = cloudbuild_v1.CloudBuildClient()
222+
build = cloudbuild_v1.Build()
223+
build.steps = []
224+
step = cloudbuild_v1.BuildStep()
225+
step.name = 'gcr.io/cloud-builders/docker'
226+
step.args = ['build', '-t', container_image_name, '.']
227+
step.dir = SOURCE_FOLDER
228+
229+
build.steps.append(step)
230+
build.images = [container_image_name]
231+
232+
source = cloudbuild_v1.Source()
233+
source.storage_source = cloudbuild_v1.StorageSource()
234+
gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
235+
source.storage_source.bucket = os.path.join(gcs_bucket)
236+
source.storage_source.object = gcs_object
237+
build.source = source
238+
# TODO(zyichi): make timeout configurable
239+
build.timeout = Duration().FromSeconds(seconds=1800)
240+
241+
now = time.time()
242+
_LOGGER.info('Building sdk container, this may take a few minutes...')
243+
operation = client.create_build(project_id=project_id, build=build)
244+
# if build fails exception will be raised and stops the job submission.
245+
result = operation.result()
246+
_LOGGER.info(
247+
"Python SDK container pre-build finished in %.2f seconds, "
248+
"check build log at %s" % (time.time() - now, result.log_url))
249+
_LOGGER.info(
250+
"Python SDK container built and pushed as %s." % container_image_name)
251+
252+
def _upload_to_gcs(self, local_file_path, gcs_location):
253+
gcs_bucket, gcs_object = self._get_gcs_bucket_and_name(gcs_location)
254+
request = storage.StorageObjectsInsertRequest(
255+
bucket=gcs_bucket, name=gcs_object)
256+
_LOGGER.info('Starting GCS upload to %s...', gcs_location)
257+
total_size = os.path.getsize(local_file_path)
258+
from apitools.base.py import exceptions
259+
try:
260+
with open(local_file_path, 'rb') as stream:
261+
upload = storage.Upload(stream, 'application/octet-stream', total_size)
262+
self._storage_client.objects.Insert(request, upload=upload)
263+
except exceptions.HttpError as e:
264+
reportable_errors = {
265+
403: 'access denied',
266+
404: 'bucket not found',
267+
}
268+
if e.status_code in reportable_errors:
269+
raise IOError((
270+
'Could not upload to GCS path %s: %s. Please verify '
271+
'that credentials are valid and that you have write '
272+
'access to the specified path.') %
273+
(gcs_location, reportable_errors[e.status_code]))
274+
raise
275+
_LOGGER.info('Completed GCS upload to %s.', gcs_location)
276+
277+
@staticmethod
278+
def _get_gcs_bucket_and_name(gcs_location):
279+
return gcs_location[5:].split('/', 1)
280+
281+
@staticmethod
282+
def _make_tarfile(output_filename, source_dir):
283+
with tarfile.open(output_filename, "w:gz") as tar:
284+
tar.add(source_dir, arcname=SOURCE_FOLDER)

0 commit comments

Comments
 (0)