Skip to content

Commit bed36c0

Browse files
committed
Increase job server timeout. Add synchornization in subprocess server.
1 parent 8eaa13f commit bed36c0

File tree

2 files changed

+9
-8
lines changed

2 files changed

+9
-8
lines changed

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1810,7 +1810,7 @@ def _add_argparse_args(cls, parser):
18101810
parser.add_argument(
18111811
'--job_server_timeout',
18121812
'--job-server-timeout', # For backwards compatibility.
1813-
default=300,
1813+
default=600,
18141814
type=int,
18151815
help=(
18161816
'Job service request timeout in seconds. The timeout '

sdks/python/apache_beam/utils/subprocess_server.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,16 @@ def _next_id(self):
8585

8686
def register(self):
8787
owner = self._next_id()
88-
self._live_owners.add(owner)
88+
with self._lock:
89+
self._live_owners.add(owner)
8990
return owner
9091

9192
def purge(self, owner):
92-
if owner not in self._live_owners:
93-
raise ValueError(f"{owner} not in {self._live_owners}")
94-
self._live_owners.remove(owner)
9593
to_delete = []
9694
with self._lock:
95+
if owner not in self._live_owners:
96+
raise ValueError(f"{owner} not in {self._live_owners}")
97+
self._live_owners.remove(owner)
9798
for key, entry in list(self._cache.items()):
9899
if owner in entry.owners:
99100
entry.owners.remove(owner)
@@ -105,9 +106,9 @@ def purge(self, owner):
105106
self._destructor(value)
106107

107108
def get(self, *key):
108-
if not self._live_owners:
109-
raise RuntimeError("At least one owner must be registered.")
110109
with self._lock:
110+
if not self._live_owners:
111+
raise RuntimeError("At least one owner must be registered.")
111112
if key not in self._cache:
112113
self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
113114
for owner in self._live_owners:
@@ -439,7 +440,7 @@ def path_to_beam_jar(
439440
def _download_jar_to_cache(
440441
cls, download_url, cached_jar_path, user_agent=None):
441442
"""Downloads a jar from the given URL to the specified cache path.
442-
443+
443444
Args:
444445
download_url (str): The URL to download from.
445446
cached_jar_path (str): The local path where the jar should be cached.

0 commit comments

Comments
 (0)