Skip to content

Commit 0091b24

Browse files
committed
Save Logical Type Registry and Coder Registry on cloudpickle save main session
1 parent da6f7b4 commit 0091b24

File tree

8 files changed

+107
-51
lines changed

8 files changed

+107
-51
lines changed

CHANGES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,10 @@
9494
* PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally
9595
function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java)
9696
([#36141](https://github.com/apache/beam/issues/36141)).
97+
* (Python) Logical type and coder registry are saved for pipelines with `save_main_session` pipeline option enabled in
98+
the case of default pickler. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0.
99+
Previously for cloudpickle `--save_main_session` pipeline option was not honored
100+
([#35738](https://github.com/apache/beam/issues/35738)).
97101

98102
## Known Issues
99103

sdks/python/apache_beam/coders/typecoders.py

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,14 @@ def _register_coder_internal(
114114
typehint_coder_class: Type[coders.Coder]) -> None:
115115
self._coders[typehint_type] = typehint_coder_class
116116

117+
@staticmethod
118+
def _normalize_typehint_type(typehint_type):
119+
if typehint_type.__module__ == '__main__':
120+
# See https://github.com/apache/beam/issues/21541
121+
# TODO(robertwb): Remove once all runners are portable.
122+
return getattr(typehint_type, '__name__', str(typehint_type))
123+
return typehint_type
124+
117125
def register_coder(
118126
self, typehint_type: Any,
119127
typehint_coder_class: Type[coders.Coder]) -> None:
@@ -123,11 +131,8 @@ def register_coder(
123131
'Received %r instead.' % typehint_coder_class)
124132
if typehint_type not in self.custom_types:
125133
self.custom_types.append(typehint_type)
126-
if typehint_type.__module__ == '__main__':
127-
# See https://github.com/apache/beam/issues/21541
128-
# TODO(robertwb): Remove once all runners are portable.
129-
typehint_type = getattr(typehint_type, '__name__', str(typehint_type))
130-
self._register_coder_internal(typehint_type, typehint_coder_class)
134+
self._register_coder_internal(
135+
self._normalize_typehint_type(typehint_type), typehint_coder_class)
131136

132137
def get_coder(self, typehint: Any) -> coders.Coder:
133138
if typehint and typehint.__module__ == '__main__':
@@ -170,9 +175,15 @@ def get_coder(self, typehint: Any) -> coders.Coder:
170175
coder = self._fallback_coder
171176
return coder.from_type_hint(typehint, self)
172177

173-
def get_custom_type_coder_tuples(self, types):
178+
def get_custom_type_coder_tuples(self, types=None):
174179
"""Returns type/coder tuples for all custom types passed in."""
175-
return [(t, self._coders[t]) for t in types if t in self.custom_types]
180+
return [(t, self._coders[self._normalize_typehint_type(t)])
181+
for t in self.custom_types if (types is None or t in types)]
182+
183+
def load_custom_type_coder_tuples(self, type_coder):
184+
"""Load type/coder tuples into coder registry."""
185+
for t, c in type_coder:
186+
self.register_coder(t, c)
176187

177188
def verify_deterministic(self, key_coder, op_name, silent=True):
178189
if not key_coder.is_deterministic():

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,35 @@ def _lock_reducer(obj):
196196

197197

198198
def dump_session(file_path):
199-
# It is possible to dump session with cloudpickle. However, since references
200-
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
201-
pass
199+
# Since References are saved (https://s.apache.org/beam-picklers), we only
200+
# dump supported Beam Registries (currently only logical type registry)
201+
from apache_beam.typehints import schemas
202+
from apache_beam.coders import typecoders
203+
204+
with _pickle_lock, open(file_path, 'wb') as file:
205+
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
206+
logicaltype_reg = schemas.LogicalType._known_logical_types.copy()
207+
208+
pickler = cloudpickle.CloudPickler(file)
209+
# TODO(https://github.com/apache/beam/issues/18500) add file system registry
210+
# once implemented
211+
pickler.dump({"coder": coder_reg, "logicaltype": logicaltype_reg})
202212

203213

204214
def load_session(file_path):
205-
# It is possible to load_session with cloudpickle. However, since references
206-
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
207-
pass
215+
from apache_beam.typehints import schemas
216+
from apache_beam.coders import typecoders
217+
218+
with _pickle_lock, open(file_path, 'rb') as file:
219+
registries = cloudpickle.load(file)
220+
if type(registries) != dict:
221+
raise ValueError(
222+
"Faled loading session: expected dict, get {}", type(registries))
223+
if "coder" in registries:
224+
typecoders.registry.load_custom_type_coder_tuples(registries["coder"])
225+
else:
226+
_LOGGER.warning('No coder registry found in saved session')
227+
if "logicaltype" in registries:
228+
schemas.LogicalType._known_logical_types.load(registries["logicaltype"])
229+
else:
230+
_LOGGER.warning('No logical type registry found in saved session')

sdks/python/apache_beam/internal/pickler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ def load_session(file_path):
7676
return desired_pickle_lib.load_session(file_path)
7777

7878

79+
def is_currently_dill():
80+
return desired_pickle_lib == dill_pickler
81+
82+
83+
def is_currently_cloudpickle():
84+
return desired_pickle_lib == cloudpickle_pickler
85+
86+
7987
def set_library(selected_library=DEFAULT_PICKLE_LIB):
8088
""" Sets pickle library that will be used. """
8189
global desired_pickle_lib
@@ -93,12 +101,11 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB):
93101
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
94102
"installed. Install dill in job submission and runtime environments.")
95103

96-
is_currently_dill = (desired_pickle_lib == dill_pickler)
97104
dill_is_requested = (
98105
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)
99106

100107
# If switching to or from dill, update the pickler hook overrides.
101-
if is_currently_dill != dill_is_requested:
108+
if is_currently_dill() != dill_is_requested:
102109
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)
103110

104111
if dill_is_requested:

sdks/python/apache_beam/runners/portability/stager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,6 @@ def create_job_resources(
376376
pickled_session_file = os.path.join(
377377
temp_dir, names.PICKLED_MAIN_SESSION_FILE)
378378
pickler.dump_session(pickled_session_file)
379-
# for pickle_library: cloudpickle, dump_session is no op
380379
if os.path.exists(pickled_session_file):
381380
resources.append(
382381
Stager._create_file_stage_to_artifact(

sdks/python/apache_beam/runners/portability/stager_test.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def test_with_main_session(self):
200200
# (https://github.com/apache/beam/issues/21457): Remove the decorator once
201201
# cloudpickle is default pickle library
202202
@pytest.mark.no_xdist
203-
def test_main_session_not_staged_when_using_cloudpickle(self):
203+
def test_main_session_staged_when_using_cloudpickle(self):
204204
staging_dir = self.make_temp_dir()
205205
options = PipelineOptions()
206206

@@ -209,7 +209,10 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
209209
# session is saved when pickle_library==cloudpickle.
210210
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
211211
self.update_options(options)
212-
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
212+
self.assertEqual([
213+
names.PICKLED_MAIN_SESSION_FILE,
214+
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
215+
],
213216
self.stager.create_and_stage_job_resources(
214217
options, staging_location=staging_dir)[1])
215218

sdks/python/apache_beam/runners/worker/sdk_worker_main.py

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -113,26 +113,25 @@ def create_harness(environment, dry_run=False):
113113
_LOGGER.info('semi_persistent_directory: %s', semi_persistent_directory)
114114
_worker_id = environment.get('WORKER_ID', None)
115115

116-
if pickle_library != pickler.USE_CLOUDPICKLE:
117-
try:
118-
_load_main_session(semi_persistent_directory)
119-
except LoadMainSessionException:
120-
exception_details = traceback.format_exc()
121-
_LOGGER.error(
122-
'Could not load main session: %s', exception_details, exc_info=True)
123-
raise
124-
except Exception: # pylint: disable=broad-except
125-
summary = (
126-
"Could not load main session. Inspect which external dependencies "
127-
"are used in the main module of your pipeline. Verify that "
128-
"corresponding packages are installed in the pipeline runtime "
129-
"environment and their installed versions match the versions used in "
130-
"pipeline submission environment. For more information, see: https://"
131-
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
132-
_LOGGER.error(summary, exc_info=True)
133-
exception_details = traceback.format_exc()
134-
deferred_exception = LoadMainSessionException(
135-
f"{summary} {exception_details}")
116+
try:
117+
_load_main_session(semi_persistent_directory)
118+
except LoadMainSessionException:
119+
exception_details = traceback.format_exc()
120+
_LOGGER.error(
121+
'Could not load main session: %s', exception_details, exc_info=True)
122+
raise
123+
except Exception: # pylint: disable=broad-except
124+
summary = (
125+
"Could not load main session. Inspect which external dependencies "
126+
"are used in the main module of your pipeline. Verify that "
127+
"corresponding packages are installed in the pipeline runtime "
128+
"environment and their installed versions match the versions used in "
129+
"pipeline submission environment. For more information, see: https://"
130+
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
131+
_LOGGER.error(summary, exc_info=True)
132+
exception_details = traceback.format_exc()
133+
deferred_exception = LoadMainSessionException(
134+
f"{summary} {exception_details}")
136135

137136
_LOGGER.info(
138137
'Pipeline_options: %s',
@@ -352,6 +351,14 @@ class LoadMainSessionException(Exception):
352351

353352
def _load_main_session(semi_persistent_directory):
354353
"""Loads a pickled main session from the path specified."""
354+
if pickler.is_currently_dill():
355+
warn_msg = ' Functions defined in __main__ (interactive session) may fail.'
356+
err_msg = ' Functions defined in __main__ (interactive session) will ' \
357+
'almost certainly fail.'
358+
elif pickler.is_currently_cloudpickle():
359+
warn_msg = ' User registered objects (e.g. schema, logical type) through' \
360+
'registeries may not be effective'
361+
err_msg = ''
355362
if semi_persistent_directory:
356363
session_file = os.path.join(
357364
semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
@@ -361,21 +368,18 @@ def _load_main_session(semi_persistent_directory):
361368
# This can happen if the worker fails to download the main session.
362369
# Raise a fatal error and crash this worker, forcing a restart.
363370
if os.path.getsize(session_file) == 0:
364-
# Potenitally transient error, unclear if still happening.
365-
raise LoadMainSessionException(
366-
'Session file found, but empty: %s. Functions defined in __main__ '
367-
'(interactive session) will almost certainly fail.' %
368-
(session_file, ))
369-
pickler.load_session(session_file)
371+
if pickler.is_currently_dill():
372+
# Potenitally transient error, unclear if still happening.
373+
raise LoadMainSessionException(
374+
'Session file found, but empty: %s.%s' % (session_file, err_msg))
375+
else:
376+
_LOGGER.warning('Empty session file: %s.%s', warn_msg, session_file)
377+
else:
378+
pickler.load_session(session_file)
370379
else:
371-
_LOGGER.warning(
372-
'No session file found: %s. Functions defined in __main__ '
373-
'(interactive session) may fail.',
374-
session_file)
380+
_LOGGER.warning('No session file found: %s.%s', warn_msg, session_file)
375381
else:
376-
_LOGGER.warning(
377-
'No semi_persistent_directory found: Functions defined in __main__ '
378-
'(interactive session) may fail.')
382+
_LOGGER.warning('No semi_persistent_directory found: %s', warn_msg)
379383

380384

381385
if __name__ == '__main__':

sdks/python/apache_beam/typehints/schemas.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -706,6 +706,11 @@ def copy(self):
706706
copy.by_language_type.update(self.by_language_type)
707707
return copy
708708

709+
def load(self, another):
710+
self.by_urn.update(another.by_urn)
711+
self.by_logical_type.update(another.by_logical_type)
712+
self.by_language_type.update(another.by_language_type)
713+
709714

710715
LanguageT = TypeVar('LanguageT')
711716
RepresentationT = TypeVar('RepresentationT')

0 commit comments

Comments
 (0)