Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/runners/portability/stager.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def create_job_resources(
is None) else setup_options.requirements_cache)
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and
not os.path.exists(requirements_cache_path)):
os.makedirs(requirements_cache_path)
os.makedirs(requirements_cache_path, exist_ok=True)

# Stage a requirements file if present.
if setup_options.requirements_file is not None:
Expand Down
56 changes: 56 additions & 0 deletions sdks/python/apache_beam/runners/portability/stager_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@
from apache_beam.runners.internal import names
from apache_beam.runners.portability import stager

from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed

_LOGGER = logging.getLogger(__name__)

try:
Expand Down Expand Up @@ -913,6 +916,59 @@ def test_populate_requirements_cache_with_local_files(self):
self.assertNotIn('fake_pypi', extra_packages_contents)
self.assertIn('local_package', extra_packages_contents)

def test_requirements_cache_creation_no_race_condition(self):
base_cache_dir = self.make_temp_dir()
cache_dir = os.path.join(base_cache_dir, 'test-requirements-cache')
# Ensure the directory doesn't exist initially
if os.path.exists(cache_dir):
shutil.rmtree(cache_dir)

source_dir = self.make_temp_dir()
requirements_file = os.path.join(source_dir, stager.REQUIREMENTS_FILE)
self.create_temp_file(requirements_file, 'requests>=2.0.0\n')

def create_resources_with_cache():
temp_dir = tempfile.mkdtemp()
try:
options = PipelineOptions()
self.update_options(options)
setup_options = options.view_as(SetupOptions)
setup_options.requirements_file = requirements_file
setup_options.requirements_cache = cache_dir
# This should create the cache directory if it doesn't exist
stager.Stager.create_job_resources(
options,
temp_dir,
populate_requirements_cache=self.populate_requirements_cache)
return True, None
except Exception as e:
return False, e
finally:
if os.path.exists(temp_dir):
shutil.rmtree(temp_dir)

# Run multiple threads concurrently to create
# resources with the same cache dir.
num_threads = 10
successes = 0
with ThreadPoolExecutor(max_workers=num_threads) as executor:
futures = [
executor.submit(create_resources_with_cache)
for _ in range(num_threads)
]

for future in as_completed(futures):
success, _ = future.result()
if success:
successes += 1
# All threads should succeed
self.assertEqual(
successes,
num_threads,
f"Expected all {num_threads} threads to pass, but got errors.")
# Verify that the cache directory exists
self.assertTrue(os.path.isdir(cache_dir))


class TestStager(stager.Stager):
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):
Expand Down
Loading