-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
File path generated in open_writer method is not according to target filesystem, because
os.path.join is used and not FileSystems.join.
apache_beam\io\filebasedsink.py extract:
def _create_temp_dir(self, file_path_prefix):
base_path, last_component = FileSystems.split(file_path_prefix)
if not last_component:
# Trying to re-split the base_path to check if it's a root.
new_base_path,
_ = FileSystems.split(base_path)
if base_path == new_base_path:
raise ValueError('Cannot create
a temporary directory for root path '
'prefix %s. Please specify a file path
prefix with '
'at least two components.' % file_path_prefix)
path_components
= [base_path,
'beam-temp-' + last_component + '-' + uuid.uuid1().hex]
return
FileSystems.join(*path_components)
@check_accessible(['file_path_prefix', 'file_name_suffix'])
def
open_writer(self, init_result, uid):
# A proper suffix is needed for AUTO compression detection.
# We also ensure there will be no collisions with uid and a
# (possibly unsharded) file_path_prefix
and a (possibly empty)
# file_name_suffix.
file_path_prefix = self.file_path_prefix.get()
file_name_suffix
= self.file_name_suffix.get()
suffix = (
'.' + os.path.basename(file_path_prefix) + file_name_suffix)
return FileBasedSinkWriter(self, os.path.join(init_result, uid) + suffix)
This created incompatibilities between, for example, Windows and GCS.
Expected: gs://bucket/beam-temp-result-uuid\uid.result
Actual: gs://bucket/beam-temp-result-uuid/uid.result
Replacing os.path.join with FileSystems.join fixes the issue
Imported from Jira BEAM-6821. Original Jira may contain additional context.
Reported by: gkovelman.
Reactions are currently unavailable