Skip to content

Commit fec392d

Browse files
committed
Save Logical Type Registry and Coder Registry on cloudpickle save main session
fix naming
1 parent dbf4e5a commit fec392d

File tree

8 files changed

+105
-51
lines changed

8 files changed

+105
-51
lines changed

CHANGES.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@
8585
## Bugfixes
8686

8787
* Fixed FirestoreV1 Beam connectors allow configuring inconsistent project/database IDs between RPC requests and routing headers #36895 (Java) ([#36895](https://github.com/apache/beam/issues/36895)).
88+
* (Python) Logical type and coder registry are saved for pipelines in the case of default pickler. This fixes a side
89+
effect of switching to cloudpickle as default pickler in Beam 2.65.0.
8890

8991
## Known Issues
9092

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ def _register_coder_internal(
114114
typehint_coder_class: Type[coders.Coder]) -> None:
115115
self._coders[typehint_type] = typehint_coder_class
116116

117+
@staticmethod
118+
def _normalize_typehint_type(typehint_type):
119+
if typehint_type.__module__ == '__main__':
120+
# See https://github.com/apache/beam/issues/21541
121+
# TODO(robertwb): Remove once all runners are portable.
122+
return getattr(typehint_type, '__name__', str(typehint_type))
123+
return typehint_type
124+
117125
def register_coder(
118126
self, typehint_type: Any,
119127
typehint_coder_class: Type[coders.Coder]) -> None:
@@ -123,11 +131,8 @@ def register_coder(
123131
'Received %r instead.' % typehint_coder_class)
124132
if typehint_type not in self.custom_types:
125133
self.custom_types.append(typehint_type)
126-
if typehint_type.__module__ == '__main__':
127-
# See https://github.com/apache/beam/issues/21541
128-
# TODO(robertwb): Remove once all runners are portable.
129-
typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
130-
self._register_coder_internal(typehint_type, typehint_coder_class)
134+
self._register_coder_internal(
135+
self._normalize_typehint_type(typehint_type), typehint_coder_class)
131136

132137
def get_coder(self, typehint: Any) -> coders.Coder:
133138
if typehint and typehint.__module__ == '__main__':
@@ -170,9 +175,15 @@ def get_coder(self, typehint: Any) -> coders.Coder:
170175
coder = self._fallback_coder
171176
return coder.from_type_hint(typehint, self)
172177

173-
def get_custom_type_coder_tuples(self, types):
178+
def get_custom_type_coder_tuples(self, types=None):
174179
"""Returns type/coder tuples for all custom types passed in."""
175-
return [(t, self._coders[t]) for t in types if t in self.custom_types]
180+
return [(t, self._coders[self._normalize_typehint_type(t)])
181+
for t in self.custom_types if (types is None or t in types)]
182+
183+
def load_custom_type_coder_tuples(self, type_coder):
184+
"""Load type/coder tuples into coder registry."""
185+
for t, c in type_coder:
186+
self.register_coder(t, c)
176187

177188
def verify_deterministic(self, key_coder, op_name, silent=True):
178189
if not key_coder.is_deterministic():

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -252,12 +252,35 @@ def _lock_reducer(obj):
252252

253253

254254
def dump_session(file_path):
255-
# It is possible to dump session with cloudpickle. However, since references
256-
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
257-
pass
255+
# Since References are saved (https://s.apache.org/beam-picklers), we only
256+
# dump supported Beam Registries (currently only logical type registry)
257+
from apache_beam.typehints import schemas
258+
from apache_beam.coders import typecoders
259+
260+
with _pickle_lock, open(file_path, 'wb') as file:
261+
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
262+
logical_type_reg = schemas.LogicalType._known_logical_types.copy()
263+
264+
pickler = cloudpickle.CloudPickler(file)
265+
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
266+
# once implemented
267+
pickler.dump({"coder": coder_reg, "logical_type": logical_type_reg})
258268

259269

260270
def load_session(file_path):
261-
# It is possible to load_session with cloudpickle. However, since references
262-
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
263-
pass
271+
from apache_beam.typehints import schemas
272+
from apache_beam.coders import typecoders
273+
274+
with _pickle_lock, open(file_path, 'rb') as file:
275+
registries = cloudpickle.load(file)
276+
if type(registries) != dict:
277+
raise ValueError(
278+
"Faled loading session: expected dict, get {}", type(registries))
279+
if "coder" in registries:
280+
typecoders.registry.load_custom_type_coder_tuples(registries["coder"])
281+
else:
282+
_LOGGER.warning('No coder registry found in saved session')
283+
if "logical_type" in registries:
284+
schemas.LogicalType._known_logical_types.load(registries["logical_type"])
285+
else:
286+
_LOGGER.warning('No logical type registry found in saved session')

sdks/python/apache_beam/internal/pickler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,14 @@ def load_session(file_path):
9191
return desired_pickle_lib.load_session(file_path)
9292

9393

94+
def is_currently_dill():
95+
return desired_pickle_lib == dill_pickler
96+
97+
98+
def is_currently_cloudpickle():
99+
return desired_pickle_lib == cloudpickle_pickler
100+
101+
94102
def set_library(selected_library=DEFAULT_PICKLE_LIB):
95103
""" Sets pickle library that will be used. """
96104
global desired_pickle_lib
@@ -108,12 +116,11 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB):
108116
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
109117
"installed. Install dill in job submission and runtime environments.")
110118

111-
is_currently_dill = (desired_pickle_lib == dill_pickler)
112119
dill_is_requested = (
113120
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)
114121

115122
# If switching to or from dill, update the pickler hook overrides.
116-
if is_currently_dill != dill_is_requested:
123+
if is_currently_dill() != dill_is_requested:
117124
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)
118125

119126
if dill_is_requested:

sdks/python/apache_beam/runners/portability/stager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,6 @@ def create_job_resources(
376376
pickled_session_file = os.path.join(
377377
temp_dir, names.PICKLED_MAIN_SESSION_FILE)
378378
pickler.dump_session(pickled_session_file)
379-
# for pickle_library: cloudpickle, dump_session is no op
380379
if os.path.exists(pickled_session_file):
381380
resources.append(
382381
Stager._create_file_stage_to_artifact(

sdks/python/apache_beam/runners/portability/stager_test.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def test_with_main_session(self):
200200
# (https://github.com/apache/beam/issues/21457): Remove the decorator once
201201
# cloudpickle is default pickle library
202202
@pytest.mark.no_xdist
203-
def test_main_session_not_staged_when_using_cloudpickle(self):
203+
def test_main_session_staged_when_using_cloudpickle(self):
204204
staging_dir = self.make_temp_dir()
205205
options = PipelineOptions()
206206

@@ -209,7 +209,10 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
209209
# session is saved when pickle_library==cloudpickle.
210210
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
211211
self.update_options(options)
212-
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
212+
self.assertEqual([
213+
names.PICKLED_MAIN_SESSION_FILE,
214+
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
215+
],
213216
self.stager.create_and_stage_job_resources(
214217
options, staging_location=staging_dir)[1])
215218

sdks/python/apache_beam/runners/worker/sdk_worker_main.py

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -113,26 +113,25 @@ def create_harness(environment, dry_run=False):
113113
_LOGGER.info('semi_persistent_directory: %s', semi_persistent_directory)
114114
_worker_id = environment.get('WORKER_ID', None)
115115

116-
if pickle_library != pickler.USE_CLOUDPICKLE:
117-
try:
118-
_load_main_session(semi_persistent_directory)
119-
except LoadMainSessionException:
120-
exception_details = traceback.format_exc()
121-
_LOGGER.error(
122-
'Could not load main session: %s', exception_details, exc_info=True)
123-
raise
124-
except Exception: # pylint: disable=broad-except
125-
summary = (
126-
"Could not load main session. Inspect which external dependencies "
127-
"are used in the main module of your pipeline. Verify that "
128-
"corresponding packages are installed in the pipeline runtime "
129-
"environment and their installed versions match the versions used in "
130-
"pipeline submission environment. For more information, see: https://"
131-
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
132-
_LOGGER.error(summary, exc_info=True)
133-
exception_details = traceback.format_exc()
134-
deferred_exception = LoadMainSessionException(
135-
f"{summary} {exception_details}")
116+
try:
117+
_load_main_session(semi_persistent_directory)
118+
except LoadMainSessionException:
119+
exception_details = traceback.format_exc()
120+
_LOGGER.error(
121+
'Could not load main session: %s', exception_details, exc_info=True)
122+
raise
123+
except Exception: # pylint: disable=broad-except
124+
summary = (
125+
"Could not load main session. Inspect which external dependencies "
126+
"are used in the main module of your pipeline. Verify that "
127+
"corresponding packages are installed in the pipeline runtime "
128+
"environment and their installed versions match the versions used in "
129+
"pipeline submission environment. For more information, see: https://"
130+
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
131+
_LOGGER.error(summary, exc_info=True)
132+
exception_details = traceback.format_exc()
133+
deferred_exception = LoadMainSessionException(
134+
f"{summary} {exception_details}")
136135

137136
_LOGGER.info(
138137
'Pipeline_options: %s',
@@ -356,6 +355,14 @@ class LoadMainSessionException(Exception):
356355

357356
def _load_main_session(semi_persistent_directory):
358357
"""Loads a pickled main session from the path specified."""
358+
if pickler.is_currently_dill():
359+
warn_msg = ' Functions defined in __main__ (interactive session) may fail.'
360+
err_msg = ' Functions defined in __main__ (interactive session) will ' \
361+
'almost certainly fail.'
362+
elif pickler.is_currently_cloudpickle():
363+
warn_msg = ' User registered objects (e.g. schema, logical type) through' \
364+
'registeries may not be effective'
365+
err_msg = ''
359366
if semi_persistent_directory:
360367
session_file = os.path.join(
361368
semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
@@ -365,21 +372,18 @@ def _load_main_session(semi_persistent_directory):
365372
# This can happen if the worker fails to download the main session.
366373
# Raise a fatal error and crash this worker, forcing a restart.
367374
if os.path.getsize(session_file) == 0:
368-
# Potenitally transient error, unclear if still happening.
369-
raise LoadMainSessionException(
370-
'Session file found, but empty: %s. Functions defined in __main__ '
371-
'(interactive session) will almost certainly fail.' %
372-
(session_file, ))
373-
pickler.load_session(session_file)
375+
if pickler.is_currently_dill():
376+
# Potenitally transient error, unclear if still happening.
377+
raise LoadMainSessionException(
378+
'Session file found, but empty: %s.%s' % (session_file, err_msg))
379+
else:
380+
_LOGGER.warning('Empty session file: %s.%s', warn_msg, session_file)
381+
else:
382+
pickler.load_session(session_file)
374383
else:
375-
_LOGGER.warning(
376-
'No session file found: %s. Functions defined in __main__ '
377-
'(interactive session) may fail.',
378-
session_file)
384+
_LOGGER.warning('No session file found: %s.%s', warn_msg, session_file)
379385
else:
380-
_LOGGER.warning(
381-
'No semi_persistent_directory found: Functions defined in __main__ '
382-
'(interactive session) may fail.')
386+
_LOGGER.warning('No semi_persistent_directory found: %s', warn_msg)
383387

384388

385389
if __name__ == '__main__':

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,11 @@ def copy(self):
706706
copy.by_language_type.update(self.by_language_type)
707707
return copy
708708

709+
def load(self, another):
710+
self.by_urn.update(another.by_urn)
711+
self.by_logical_type.update(another.by_logical_type)
712+
self.by_language_type.update(another.by_language_type)
713+
709714

710715
LanguageT = TypeVar('LanguageT')
711716
RepresentationT = TypeVar('RepresentationT')

0 commit comments

Comments
 (0)