@@ -113,26 +113,25 @@ def create_harness(environment, dry_run=False):
113113 _LOGGER .info ('semi_persistent_directory: %s' , semi_persistent_directory )
114114 _worker_id = environment .get ('WORKER_ID' , None )
115115
116- if pickle_library != pickler .USE_CLOUDPICKLE :
117- try :
118- _load_main_session (semi_persistent_directory )
119- except LoadMainSessionException :
120- exception_details = traceback .format_exc ()
121- _LOGGER .error (
122- 'Could not load main session: %s' , exception_details , exc_info = True )
123- raise
124- except Exception : # pylint: disable=broad-except
125- summary = (
126- "Could not load main session. Inspect which external dependencies "
127- "are used in the main module of your pipeline. Verify that "
128- "corresponding packages are installed in the pipeline runtime "
129- "environment and their installed versions match the versions used in "
130- "pipeline submission environment. For more information, see: https://"
131- "beam.apache.org/documentation/sdks/python-pipeline-dependencies/" )
132- _LOGGER .error (summary , exc_info = True )
133- exception_details = traceback .format_exc ()
134- deferred_exception = LoadMainSessionException (
135- f"{ summary } { exception_details } " )
116+ try :
117+ _load_main_session (semi_persistent_directory )
118+ except LoadMainSessionException :
119+ exception_details = traceback .format_exc ()
120+ _LOGGER .error (
121+ 'Could not load main session: %s' , exception_details , exc_info = True )
122+ raise
123+ except Exception : # pylint: disable=broad-except
124+ summary = (
125+ "Could not load main session. Inspect which external dependencies "
126+ "are used in the main module of your pipeline. Verify that "
127+ "corresponding packages are installed in the pipeline runtime "
128+ "environment and their installed versions match the versions used in "
129+ "pipeline submission environment. For more information, see: https://"
130+ "beam.apache.org/documentation/sdks/python-pipeline-dependencies/" )
131+ _LOGGER .error (summary , exc_info = True )
132+ exception_details = traceback .format_exc ()
133+ deferred_exception = LoadMainSessionException (
134+ f"{ summary } { exception_details } " )
136135
137136 _LOGGER .info (
138137 'Pipeline_options: %s' ,
@@ -352,6 +351,14 @@ class LoadMainSessionException(Exception):
352351
353352def _load_main_session (semi_persistent_directory ):
354353 """Loads a pickled main session from the path specified."""
354+ if pickler .is_currently_dill ():
355+ warn_msg = ' Functions defined in __main__ (interactive session) may fail.'
356+ err_msg = ' Functions defined in __main__ (interactive session) will ' \
357+ 'almost certainly fail.'
358+ elif pickler .is_currently_cloudpickle ():
359+ warn_msg = ' User registered objects (e.g. schema, logical type) through' \
360+ 'registeries may not be effective'
361+ err_msg = ''
355362 if semi_persistent_directory :
356363 session_file = os .path .join (
357364 semi_persistent_directory , 'staged' , names .PICKLED_MAIN_SESSION_FILE )
@@ -361,21 +368,18 @@ def _load_main_session(semi_persistent_directory):
361368 # This can happen if the worker fails to download the main session.
362369 # Raise a fatal error and crash this worker, forcing a restart.
363370 if os .path .getsize (session_file ) == 0 :
364- # Potenitally transient error, unclear if still happening.
365- raise LoadMainSessionException (
366- 'Session file found, but empty: %s. Functions defined in __main__ '
367- '(interactive session) will almost certainly fail.' %
368- (session_file , ))
369- pickler .load_session (session_file )
371+ if pickler .is_currently_dill ():
372+ # Potenitally transient error, unclear if still happening.
373+ raise LoadMainSessionException (
374+ 'Session file found, but empty: %s.%s' % (session_file , err_msg ))
375+ else :
376+ _LOGGER .warning ('Empty session file: %s.%s' , warn_msg , session_file )
377+ else :
378+ pickler .load_session (session_file )
370379 else :
371- _LOGGER .warning (
372- 'No session file found: %s. Functions defined in __main__ '
373- '(interactive session) may fail.' ,
374- session_file )
380+ _LOGGER .warning ('No session file found: %s.%s' , warn_msg , session_file )
375381 else :
376- _LOGGER .warning (
377- 'No semi_persistent_directory found: Functions defined in __main__ '
378- '(interactive session) may fail.' )
382+ _LOGGER .warning ('No semi_persistent_directory found: %s' , warn_msg )
379383
380384
381385if __name__ == '__main__' :
0 commit comments