Skip to content

Commit 79eafe8

Browse files
committed
Make sure process exit without manager
1 parent 89047b7 commit 79eafe8

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

sdks/python/apache_beam/utils/multi_process_shared.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,10 @@ def _run_with_oom_protection(func, *args, **kwargs):
138138

139139
class _SingletonEntry:
140140
"""Represents a single, refcounted entry in this process."""
141-
def __init__(self, constructor, initialize_eagerly=True):
141+
def __init__(
142+
self, constructor, initialize_eagerly=True, hard_delete_callback=None):
142143
self.constructor = constructor
144+
self._hard_delete_callback = hard_delete_callback
143145
self.refcount = 0
144146
self.lock = threading.Lock()
145147
if initialize_eagerly:
@@ -169,6 +171,8 @@ def unsafe_hard_delete(self):
169171
if self.initialied:
170172
del self.obj
171173
self.initialied = False
174+
if self._hard_delete_callback:
175+
self._hard_delete_callback()
172176

173177

174178
class _SingletonManager:
@@ -180,9 +184,15 @@ def __init__(self):
180184
def set_hard_delete_callback(self, callback):
181185
self._hard_delete_callback = callback
182186

183-
def register_singleton(self, constructor, tag, initialize_eagerly=True):
187+
def register_singleton(
188+
self,
189+
constructor,
190+
tag,
191+
initialize_eagerly=True,
192+
hard_delete_callback=None):
184193
assert tag not in self.entries, tag
185-
self.entries[tag] = _SingletonEntry(constructor, initialize_eagerly)
194+
self.entries[tag] = _SingletonEntry(
195+
constructor, initialize_eagerly, hard_delete_callback)
186196

187197
def has_singleton(self, tag):
188198
return tag in self.entries

0 commit comments

Comments
 (0)