Skip to content

Commit 8f53a84

Browse files
Fix race condition in stager.py when starting
multiple pipelines concurrently (fixes #36847)
1 parent 32345e7 commit 8f53a84

File tree

2 files changed

+57
-1
lines changed

2 files changed

+57
-1
lines changed

sdks/python/apache_beam/runners/portability/stager.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ def create_job_resources(
218218
is None) else setup_options.requirements_cache)
219219
if (setup_options.requirements_cache != SKIP_REQUIREMENTS_CACHE and
220220
not os.path.exists(requirements_cache_path)):
221-
os.makedirs(requirements_cache_path)
221+
os.makedirs(requirements_cache_path, exist_ok=True)
222222

223223
# Stage a requirements file if present.
224224
if setup_options.requirements_file is not None:

sdks/python/apache_beam/runners/portability/stager_test.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@
4040
from apache_beam.runners.internal import names
4141
from apache_beam.runners.portability import stager
4242

43+
from concurrent.futures import ThreadPoolExecutor
44+
from concurrent.futures import as_completed
45+
4346
_LOGGER = logging.getLogger(__name__)
4447

4548
try:
@@ -913,6 +916,59 @@ def test_populate_requirements_cache_with_local_files(self):
913916
self.assertNotIn('fake_pypi', extra_packages_contents)
914917
self.assertIn('local_package', extra_packages_contents)
915918

919+
def test_requirements_cache_creation_no_race_condition(self):
920+
base_cache_dir = self.make_temp_dir()
921+
cache_dir = os.path.join(base_cache_dir, 'test-requirements-cache')
922+
# Ensure the directory doesn't exist initially
923+
if os.path.exists(cache_dir):
924+
shutil.rmtree(cache_dir)
925+
926+
source_dir = self.make_temp_dir()
927+
requirements_file = os.path.join(source_dir, stager.REQUIREMENTS_FILE)
928+
self.create_temp_file(requirements_file, 'requests>=2.0.0\n')
929+
930+
def create_resources_with_cache():
931+
temp_dir = tempfile.mkdtemp()
932+
try:
933+
options = PipelineOptions()
934+
self.update_options(options)
935+
setup_options = options.view_as(SetupOptions)
936+
setup_options.requirements_file = requirements_file
937+
setup_options.requirements_cache = cache_dir
938+
# This should create the cache directory if it doesn't exist
939+
stager.Stager.create_job_resources(
940+
options,
941+
temp_dir,
942+
populate_requirements_cache=self.populate_requirements_cache)
943+
return True, None
944+
except Exception as e:
945+
return False, e
946+
finally:
947+
if os.path.exists(temp_dir):
948+
shutil.rmtree(temp_dir)
949+
950+
# Run multiple threads concurrently to create
951+
# resources with the same cache dir.
952+
num_threads = 10
953+
successes = 0
954+
with ThreadPoolExecutor(max_workers=num_threads) as executor:
955+
futures = [
956+
executor.submit(create_resources_with_cache)
957+
for _ in range(num_threads)
958+
]
959+
960+
for future in as_completed(futures):
961+
success, _ = future.result()
962+
if success:
963+
successes += 1
964+
# All threads should succeed
965+
self.assertEqual(
966+
successes,
967+
num_threads,
968+
f"Expected all {num_threads} threads to pass, but got errors.")
969+
# Verify that the cache directory exists
970+
self.assertTrue(os.path.isdir(cache_dir))
971+
916972

917973
class TestStager(stager.Stager):
918974
def stage_artifact(self, local_path_to_artifact, artifact_name, sha256):

0 commit comments

Comments
 (0)