Skip to content

Commit a01ae3e

Browse files
committed
Solution for #19968, python sdk justs stage up-to-date versions on the required files
This was achieved by saving a list of dependencies and downloading only those files
1 parent 7a4548f commit a01ae3e

File tree

2 files changed

+34
-15
lines changed

2 files changed

+34
-15
lines changed

sdks/python/apache_beam/runners/portability/stager.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ def create_job_resources(
165165
build_setup_args: Optional[List[str]] = None,
166166
pypi_requirements: Optional[List[str]] = None,
167167
populate_requirements_cache: Optional[Callable[[str, str, bool],
168-
None]] = None,
168+
List[str]]] = None,
169169
skip_prestaged_dependencies: Optional[bool] = False,
170170
log_submission_env_dependencies: Optional[bool] = True,
171171
):
@@ -220,6 +220,7 @@ def create_job_resources(
220220
not os.path.exists(requirements_cache_path)):
221221
os.makedirs(requirements_cache_path)
222222

223+
downloaded_packages = []
223224
# Stage a requirements file if present.
224225
if setup_options.requirements_file is not None:
225226
if not os.path.isfile(setup_options.requirements_file):
@@ -245,12 +246,14 @@ def create_job_resources(
245246
'such as --requirements_file. ')
246247

247248
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
248-
(
249+
result = (
249250
populate_requirements_cache if populate_requirements_cache else
250251
Stager._populate_requirements_cache)(
251252
setup_options.requirements_file,
252253
requirements_cache_path,
253254
setup_options.requirements_cache_only_sources)
255+
if result is not None:
256+
downloaded_packages.extend(result)
254257

255258
if pypi_requirements:
256259
tf = tempfile.NamedTemporaryFile(mode='w', delete=False)
@@ -260,18 +263,18 @@ def create_job_resources(
260263
# Populate cache with packages from PyPI requirements and stage
261264
# the files in the cache.
262265
if setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE:
263-
(
266+
result = (
264267
populate_requirements_cache if populate_requirements_cache else
265268
Stager._populate_requirements_cache)(
266269
tf.name,
267270
requirements_cache_path,
268271
setup_options.requirements_cache_only_sources)
272+
if result is not None:
273+
downloaded_packages.extend(result)
269274

270-
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE) and (
271-
setup_options.requirements_file is not None or pypi_requirements):
272-
for pkg in glob.glob(os.path.join(requirements_cache_path, '*')):
273-
resources.append(
274-
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
275+
for pkg in downloaded_packages:
276+
resources.append(
277+
Stager._create_file_stage_to_artifact(pkg, os.path.basename(pkg)))
275278

276279
# Handle a setup file if present.
277280
# We will build the setup package locally and then copy it to the staging
@@ -735,7 +738,7 @@ def _get_platform_for_default_sdk_container():
735738
@retry.with_exponential_backoff(
736739
num_retries=4, retry_filter=retry_on_non_zero_exit)
737740
def _populate_requirements_cache(
738-
requirements_file, cache_dir, populate_cache_with_sdists=False):
741+
requirements_file, cache_dir, populate_cache_with_sdists=False) -> List[str]:
739742
# The 'pip download' command will not download again if it finds the
740743
# tarball with the proper version already present.
741744
# It will get the packages downloaded in the order they are presented in
@@ -780,7 +783,12 @@ def _populate_requirements_cache(
780783
platform_tag
781784
])
782785
_LOGGER.info('Executing command: %s', cmd_args)
783-
processes.check_output(cmd_args, stderr=processes.STDOUT)
786+
output = processes.check_output(cmd_args, stderr=subprocess.STDOUT)
787+
downloaded_packages = []
788+
for line in output.decode('utf-8').split('\n'):
789+
if line.startswith('Saved '):
790+
downloaded_packages.append(line.split(' ')[1])
791+
return downloaded_packages
784792

785793
@staticmethod
786794
def _build_setup_package(

sdks/python/apache_beam/runners/portability/stager_test.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,14 @@ def file_copy(self, from_path, to_path):
9898
def populate_requirements_cache(
9999
self, requirements_file, cache_dir, populate_cache_with_sdists=False):
100100
_ = requirements_file
101-
self.create_temp_file(os.path.join(cache_dir, 'abc.txt'), 'nothing')
102-
self.create_temp_file(os.path.join(cache_dir, 'def.txt'), 'nothing')
101+
_ = populate_cache_with_sdists
102+
pkgs = [
103+
os.path.join(cache_dir, 'abc.txt'),
104+
os.path.join(cache_dir, 'def.txt')
105+
]
106+
for pkg in pkgs:
107+
self.create_temp_file(pkg, 'nothing')
108+
return pkgs
103109

104110
@mock.patch('apache_beam.runners.portability.stager.open')
105111
@mock.patch('apache_beam.runners.portability.stager.get_new_http')
@@ -807,10 +813,15 @@ def test_remove_dependency_from_requirements(self):
807813

808814
def _populate_requitements_cache_fake(
809815
self, requirements_file, temp_dir, populate_cache_with_sdists):
816+
paths = []
810817
if not populate_cache_with_sdists:
811-
self.create_temp_file(os.path.join(temp_dir, 'nothing.whl'), 'Fake whl')
812-
self.create_temp_file(
813-
os.path.join(temp_dir, 'nothing.tar.gz'), 'Fake tarball')
818+
path = os.path.join(temp_dir, 'nothing.whl')
819+
self.create_temp_file(path, 'nothing')
820+
paths.append(path)
821+
path = os.path.join(temp_dir, 'nothing.tar.gz')
822+
self.create_temp_file(path, 'Fake tarball content')
823+
paths.append(path)
824+
return paths
814825

815826
# requirements cache will popultated with bdist/whl if present
816827
# else source would be downloaded.

0 commit comments

Comments
 (0)