Skip to content

Commit 5bfae4e

Browse files
committed
Save Logical Type Registry and Schema Registry on cloudpickle save main session
1 parent da6f7b4 commit 5bfae4e

File tree

6 files changed

+74
-42
lines changed

6 files changed

+74
-42
lines changed

CHANGES.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,9 @@
9494
* PulsarIO has now changed support status from incomplete to experimental. Both read and writes should now minimally
9595
function (un-partitioned topics, without schema support, timestamp ordered messages for read) (Java)
9696
([#36141](https://github.com/apache/beam/issues/36141)).
97+
* (Python) Logical type and schema registry are saved for pipeline cloudpickle with `save_main_session` pipeline option.
98+
enabled. This fixes a side effect of switching to cloudpickle as default pickler in Beam 2.65.0. Previously
99+
`--save_main_session` was not honored for cloudpickle ([#35738](https://github.com/apache/beam/issues/35738)).
97100

98101
## Known Issues
99102

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,12 +196,28 @@ def _lock_reducer(obj):
196196

197197

198198
def dump_session(file_path):
199-
# It is possible to dump session with cloudpickle. However, since references
200-
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
201-
pass
199+
# Since References are saved (https://s.apache.org/beam-picklers), we only
200+
# dump known Beam Registries
201+
from apache_beam.typehints.schemas import LogicalType
202+
from apache_beam.typehints import schema_registry
203+
204+
with _pickle_lock, open(file_path, 'wb') as file:
205+
pickler = cloudpickle.CloudPickler(file)
206+
# Logical Type registry
207+
pickler.dump(LogicalType._known_logical_types)
208+
# Schema registry
209+
pickler.dump(schema_registry.SCHEMA_REGISTRY)
202210

203211

204212
def load_session(file_path):
205213
# It is possible to load_session with cloudpickle. However, since references
206214
# are saved it should not be necessary. See https://s.apache.org/beam-picklers
207-
pass
215+
with _pickle_lock, open(file_path, 'rb') as file:
216+
from apache_beam.typehints.schemas import LogicalType
217+
from apache_beam.typehints import schema_registry
218+
219+
known_logical_type = cloudpickle.load(file)
220+
schema_registry = cloudpickle.load(file)
221+
222+
LogicalType._known_logical_types = known_logical_type
223+
schema_registry.SCHEMA_REGISTRY = schema_registry

sdks/python/apache_beam/internal/pickler.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,14 @@ def load_session(file_path):
7676
return desired_pickle_lib.load_session(file_path)
7777

7878

79+
def is_currently_dill():
80+
return desired_pickle_lib == dill_pickler
81+
82+
83+
def is_currently_cloudpickle():
84+
return desired_pickle_lib == cloudpickle_pickler
85+
86+
7987
def set_library(selected_library=DEFAULT_PICKLE_LIB):
8088
""" Sets pickle library that will be used. """
8189
global desired_pickle_lib
@@ -93,12 +101,11 @@ def set_library(selected_library=DEFAULT_PICKLE_LIB):
93101
"Pipeline option pickle_library=dill_unsafe is set, but dill is not "
94102
"installed. Install dill in job submission and runtime environments.")
95103

96-
is_currently_dill = (desired_pickle_lib == dill_pickler)
97104
dill_is_requested = (
98105
selected_library == USE_DILL or selected_library == USE_DILL_UNSAFE)
99106

100107
# If switching to or from dill, update the pickler hook overrides.
101-
if is_currently_dill != dill_is_requested:
108+
if is_currently_dill() != dill_is_requested:
102109
dill_pickler.override_pickler_hooks(selected_library == USE_DILL)
103110

104111
if dill_is_requested:

sdks/python/apache_beam/runners/portability/stager.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -376,7 +376,6 @@ def create_job_resources(
376376
pickled_session_file = os.path.join(
377377
temp_dir, names.PICKLED_MAIN_SESSION_FILE)
378378
pickler.dump_session(pickled_session_file)
379-
# for pickle_library: cloudpickle, dump_session is no op
380379
if os.path.exists(pickled_session_file):
381380
resources.append(
382381
Stager._create_file_stage_to_artifact(

sdks/python/apache_beam/runners/portability/stager_test.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def test_with_main_session(self):
200200
# (https://github.com/apache/beam/issues/21457): Remove the decorator once
201201
# cloudpickle is default pickle library
202202
@pytest.mark.no_xdist
203-
def test_main_session_not_staged_when_using_cloudpickle(self):
203+
def test_main_session_staged_when_using_cloudpickle(self):
204204
staging_dir = self.make_temp_dir()
205205
options = PipelineOptions()
206206

@@ -209,7 +209,10 @@ def test_main_session_not_staged_when_using_cloudpickle(self):
209209
# session is saved when pickle_library==cloudpickle.
210210
options.view_as(SetupOptions).pickle_library = pickler.USE_CLOUDPICKLE
211211
self.update_options(options)
212-
self.assertEqual([stager.SUBMISSION_ENV_DEPENDENCIES_FILE],
212+
self.assertEqual([
213+
names.PICKLED_MAIN_SESSION_FILE,
214+
stager.SUBMISSION_ENV_DEPENDENCIES_FILE
215+
],
213216
self.stager.create_and_stage_job_resources(
214217
options, staging_location=staging_dir)[1])
215218

sdks/python/apache_beam/runners/worker/sdk_worker_main.py

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -113,26 +113,25 @@ def create_harness(environment, dry_run=False):
113113
_LOGGER.info('semi_persistent_directory: %s', semi_persistent_directory)
114114
_worker_id = environment.get('WORKER_ID', None)
115115

116-
if pickle_library != pickler.USE_CLOUDPICKLE:
117-
try:
118-
_load_main_session(semi_persistent_directory)
119-
except LoadMainSessionException:
120-
exception_details = traceback.format_exc()
121-
_LOGGER.error(
122-
'Could not load main session: %s', exception_details, exc_info=True)
123-
raise
124-
except Exception: # pylint: disable=broad-except
125-
summary = (
126-
"Could not load main session. Inspect which external dependencies "
127-
"are used in the main module of your pipeline. Verify that "
128-
"corresponding packages are installed in the pipeline runtime "
129-
"environment and their installed versions match the versions used in "
130-
"pipeline submission environment. For more information, see: https://"
131-
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
132-
_LOGGER.error(summary, exc_info=True)
133-
exception_details = traceback.format_exc()
134-
deferred_exception = LoadMainSessionException(
135-
f"{summary} {exception_details}")
116+
try:
117+
_load_main_session(semi_persistent_directory)
118+
except LoadMainSessionException:
119+
exception_details = traceback.format_exc()
120+
_LOGGER.error(
121+
'Could not load main session: %s', exception_details, exc_info=True)
122+
raise
123+
except Exception: # pylint: disable=broad-except
124+
summary = (
125+
"Could not load main session. Inspect which external dependencies "
126+
"are used in the main module of your pipeline. Verify that "
127+
"corresponding packages are installed in the pipeline runtime "
128+
"environment and their installed versions match the versions used in "
129+
"pipeline submission environment. For more information, see: https://"
130+
"beam.apache.org/documentation/sdks/python-pipeline-dependencies/")
131+
_LOGGER.error(summary, exc_info=True)
132+
exception_details = traceback.format_exc()
133+
deferred_exception = LoadMainSessionException(
134+
f"{summary} {exception_details}")
136135

137136
_LOGGER.info(
138137
'Pipeline_options: %s',
@@ -352,6 +351,14 @@ class LoadMainSessionException(Exception):
352351

353352
def _load_main_session(semi_persistent_directory):
354353
"""Loads a pickled main session from the path specified."""
354+
if pickler.is_currently_dill():
355+
warn_msg = ' Functions defined in __main__ (interactive session) may fail.'
356+
err_msg = ' Functions defined in __main__ (interactive session) will ' \
357+
'almost certainly fail.'
358+
elif pickler.is_currently_cloudpickle():
359+
warn_msg = ' User registered objects (e.g. schema, logical type) through' \
360+
'registeries may not be effective'
361+
err_msg = ''
355362
if semi_persistent_directory:
356363
session_file = os.path.join(
357364
semi_persistent_directory, 'staged', names.PICKLED_MAIN_SESSION_FILE)
@@ -361,21 +368,18 @@ def _load_main_session(semi_persistent_directory):
361368
# This can happen if the worker fails to download the main session.
362369
# Raise a fatal error and crash this worker, forcing a restart.
363370
if os.path.getsize(session_file) == 0:
364-
# Potenitally transient error, unclear if still happening.
365-
raise LoadMainSessionException(
366-
'Session file found, but empty: %s. Functions defined in __main__ '
367-
'(interactive session) will almost certainly fail.' %
368-
(session_file, ))
369-
pickler.load_session(session_file)
371+
if pickler.is_currently_dill():
372+
# Potenitally transient error, unclear if still happening.
373+
raise LoadMainSessionException(
374+
'Session file found, but empty: %s.%s' % (session_file, err_msg))
375+
else:
376+
_LOGGER.warning('Empty session file: %s.%s', warn_msg, session_file)
377+
else:
378+
pickler.load_session(session_file)
370379
else:
371-
_LOGGER.warning(
372-
'No session file found: %s. Functions defined in __main__ '
373-
'(interactive session) may fail.',
374-
session_file)
380+
_LOGGER.warning('No session file found: %s.%s', warn_msg, session_file)
375381
else:
376-
_LOGGER.warning(
377-
'No semi_persistent_directory found: Functions defined in __main__ '
378-
'(interactive session) may fail.')
382+
_LOGGER.warning('No semi_persistent_directory found: %s', warn_msg)
379383

380384

381385
if __name__ == '__main__':

0 commit comments

Comments
 (0)