Skip to content

Commit c3e00ba

Browse files
committed
Set save_main_session default to true for cloudpickle and introduce overwrite flag
1 parent 992ceb0 commit c3e00ba

File tree

2 files changed

+24
-3
lines changed

2 files changed

+24
-3
lines changed

sdks/python/apache_beam/internal/cloudpickle_pickler.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -254,8 +254,8 @@ def _lock_reducer(obj):
254254
def dump_session(file_path):
255255
# Since References are saved (https://s.apache.org/beam-picklers), we only
256256
# dump supported Beam Registries (currently only logical type registry)
257-
from apache_beam.typehints import schemas
258257
from apache_beam.coders import typecoders
258+
from apache_beam.typehints import schemas
259259

260260
with _pickle_lock, open(file_path, 'wb') as file:
261261
coder_reg = typecoders.registry.get_custom_type_coder_tuples()
@@ -268,8 +268,8 @@ def dump_session(file_path):
268268

269269

270270
def load_session(file_path):
271-
from apache_beam.typehints import schemas
272271
from apache_beam.coders import typecoders
272+
from apache_beam.typehints import schemas
273273

274274
with _pickle_lock, open(file_path, 'rb') as file:
275275
registries = cloudpickle.load(file)

sdks/python/apache_beam/options/pipeline_options.py

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1672,14 +1672,22 @@ def _add_argparse_args(cls, parser):
16721672
choices=['cloudpickle', 'default', 'dill', 'dill_unsafe'])
16731673
parser.add_argument(
16741674
'--save_main_session',
1675-
default=False,
1675+
default=None,
16761676
action='store_true',
16771677
help=(
16781678
'Save the main session state so that pickled functions and classes '
16791679
'defined in __main__ (e.g. interactive session) can be unpickled. '
16801680
'Some workflows do not need the session state if for instance all '
16811681
'their functions/classes are defined in proper modules '
16821682
'(not __main__) and the modules are importable in the worker. '))
1683+
parser.add_argument(
1684+
'--disable_save_main_session',
1685+
action='store_false',
1686+
dest='save_main_session',
1687+
help=(
1688+
'Disable saving the main session state. It is enabled/disabled by'
1689+
'default for cloudpickle/dill pickler. See "save_main_session".'))
1690+
16831691
parser.add_argument(
16841692
'--sdk_location',
16851693
default='default',
@@ -1780,10 +1788,23 @@ def _add_argparse_args(cls, parser):
17801788
'If not specified, the default Maven Central repository will be '
17811789
'used.'))
17821790

1791+
def _handle_load_main_session(self, validator):
1792+
save_main_session = getattr(self, 'save_main_session')
1793+
if save_main_session is None:
1794+
# save_main_session default to False for dill, while default to true
1795+
# for cloudpickle
1796+
pickle_library = getattr(self, 'pickle_library')
1797+
if pickle_library in ['default', 'cloudpickle']:
1798+
setattr(self, 'save_main_session', True)
1799+
else:
1800+
setattr(self, 'save_main_session', False)
1801+
return []
1802+
17831803
def validate(self, validator):
17841804
errors = []
17851805
errors.extend(validator.validate_container_prebuilding_options(self))
17861806
errors.extend(validator.validate_pickle_library(self))
1807+
errors.extend(self._handle_load_main_session(validator))
17871808
return errors
17881809

17891810

0 commit comments

Comments
 (0)