Skip to content

Commit b2a3e77

Browse files
fix output path in fsspec connector (#528)
the output path in destination connector should use relative path, instead of full path. If relative path is not available, use filename instead.
1 parent b19de53 commit b2a3e77

File tree

7 files changed

+110
-13
lines changed

7 files changed

+110
-13
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.0.35
2+
3+
* **Fix output path in blob storage destination connector**
4+
15
## 1.0.34
26

37
* **Improve Confluence Indexer's precheck** - validate access to each space

test/integration/connectors/databricks/test_volumes_native.py

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,18 @@ def list_files_recursively(client: WorkspaceClient, path: str):
224224

225225

226226
def validate_upload(
227-
client: WorkspaceClient, catalog: str, volume: str, volume_path: str, num_files: int
227+
client: WorkspaceClient,
228+
catalog: str,
229+
volume: str,
230+
volume_path: str,
231+
num_files: int,
232+
filepath_in_destination: list[str] = None,
228233
):
229234
files = list_files_recursively(client, _get_volume_path(catalog, volume, volume_path))
230235

231236
assert len(files) == num_files
237+
if filepath_in_destination:
238+
assert files == filepath_in_destination
232239

233240
for i in range(num_files):
234241
resp = client.files.download(files[i])
@@ -322,3 +329,48 @@ async def test_volumes_native_destination_same_filenames_different_folder(upload
322329
volume_path=volume_path,
323330
num_files=2,
324331
)
332+
333+
334+
@pytest.mark.asyncio
335+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
336+
@requires_env(
337+
"DATABRICKS_HOST", "DATABRICKS_CLIENT_ID", "DATABRICKS_CLIENT_SECRET", "DATABRICKS_CATALOG"
338+
)
339+
async def test_volumes_native_destination_different_fullpath_relative_path(upload_file: Path):
340+
env_data = get_basic_auth_env_data()
341+
volume_path = f"databricks-volumes-test-output-{uuid.uuid4()}"
342+
file_data = FileData(
343+
source_identifiers=SourceIdentifiers(
344+
relative_path=f"folder2/{upload_file.name}",
345+
fullpath=f"folder1/folder2/{upload_file.name}",
346+
filename=upload_file.name,
347+
),
348+
connector_type=CONNECTOR_TYPE,
349+
identifier="mock file data",
350+
)
351+
with databricks_destination_context(
352+
volume="test-platform", volume_path=volume_path, env_data=env_data
353+
) as workspace_client:
354+
connection_config = env_data.get_connection_config()
355+
uploader = DatabricksNativeVolumesUploader(
356+
connection_config=connection_config,
357+
upload_config=DatabricksNativeVolumesUploaderConfig(
358+
volume="test-platform",
359+
volume_path=volume_path,
360+
catalog=env_data.catalog,
361+
),
362+
)
363+
uploader.precheck()
364+
uploader.run(path=upload_file, file_data=file_data)
365+
366+
filepath_in_destination = [
367+
f"/Volumes/utic-dev-tech-fixtures/default/test-platform/{volume_path}/folder1/folder2/DA-1p-with-duplicate-pages.pdf.json.json"
368+
]
369+
validate_upload(
370+
client=workspace_client,
371+
catalog=env_data.catalog,
372+
volume="test-platform",
373+
volume_path=volume_path,
374+
num_files=1,
375+
filepath_in_destination=filepath_in_destination,
376+
)

test/integration/connectors/test_s3.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,3 +229,43 @@ async def test_s3_destination_same_filename_different_folders(upload_file: Path)
229229
assert len(uploaded_files) == 2
230230
finally:
231231
s3fs.rm(path=destination_path, recursive=True)
232+
233+
234+
@pytest.mark.asyncio
235+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
236+
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
237+
async def test_s3_destination_different_relative_path_and_full_path(upload_file: Path):
238+
aws_credentials = get_aws_credentials()
239+
s3_bucket = "s3://utic-ingest-test-fixtures"
240+
destination_path = f"{s3_bucket}/destination/{uuid.uuid4()}"
241+
connection_config = S3ConnectionConfig(
242+
access_config=S3AccessConfig(
243+
key=aws_credentials["aws_access_key_id"],
244+
secret=aws_credentials["aws_secret_access_key"],
245+
),
246+
)
247+
upload_config = S3UploaderConfig(remote_url=destination_path)
248+
uploader = S3Uploader(connection_config=connection_config, upload_config=upload_config)
249+
s3fs = uploader.fs
250+
file_data = FileData(
251+
source_identifiers=SourceIdentifiers(
252+
relative_path=f"folder2/{upload_file.name}",
253+
fullpath=f"folder1/folder2/{upload_file.name}",
254+
filename=upload_file.name,
255+
),
256+
connector_type=CONNECTOR_TYPE,
257+
identifier="mock file data",
258+
)
259+
try:
260+
uploader.precheck()
261+
if uploader.is_async():
262+
await uploader.run_async(path=upload_file, file_data=file_data)
263+
else:
264+
uploader.run(path=upload_file, file_data=file_data)
265+
uploaded_files = [
266+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
267+
]
268+
assert len(uploaded_files) == 1
269+
assert uploaded_files[0].as_posix() == f"{destination_path.lstrip('s3://')}/folder1"
270+
finally:
271+
s3fs.rm(path=destination_path, recursive=True)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.34" # pragma: no cover
1+
__version__ = "1.0.35" # pragma: no cover

unstructured_ingest/processes/connectors/databricks/volumes.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,10 @@ class DatabricksVolumesUploader(Uploader, ABC):
196196
connection_config: DatabricksVolumesConnectionConfig
197197

198198
def get_output_path(self, file_data: FileData) -> str:
199-
if file_data.source_identifiers.fullpath:
199+
if file_data.source_identifiers.relative_path:
200200
return os.path.join(
201-
self.upload_config.path, f"{file_data.source_identifiers.fullpath}.json"
201+
self.upload_config.path,
202+
f"{file_data.source_identifiers.relative_path.lstrip('/')}.json",
202203
)
203204
else:
204205
return os.path.join(

unstructured_ingest/processes/connectors/fsspec/fsspec.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -343,9 +343,10 @@ def precheck(self) -> None:
343343
raise self.wrap_error(e=e)
344344

345345
def get_upload_path(self, file_data: FileData) -> Path:
346-
upload_path = Path(
347-
self.upload_config.path_without_protocol
348-
) / file_data.source_identifiers.fullpath.lstrip("/")
346+
upload_path = (
347+
Path(self.upload_config.path_without_protocol)
348+
/ file_data.source_identifiers.relative_path.lstrip("/")
349+
)
349350
updated_upload_path = upload_path.parent / f"{upload_path.name}.json"
350351
return updated_upload_path
351352

unstructured_ingest/processes/connectors/onedrive.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -369,15 +369,14 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
369369

370370
# Use the remote_url from upload_config as the base destination folder
371371
base_destination_folder = self.upload_config.url
372-
373-
# Use the file's full path to maintain directory structure, if needed
374-
if file_data.source_identifiers and file_data.source_identifiers.fullpath:
375-
# Combine the base destination folder with the file's full path
372+
# Use the file's relative path to maintain directory structure, if needed
373+
if file_data.source_identifiers and file_data.source_identifiers.relative_path:
374+
# Combine the base destination folder with the file's relative path
376375
destination_path = Path(base_destination_folder) / Path(
377-
f"{file_data.source_identifiers.fullpath}.json"
376+
f"{file_data.source_identifiers.relative_path}.json"
378377
)
379378
else:
380-
# If no full path is provided, upload directly to the base destination folder
379+
# If no relative path is provided, upload directly to the base destination folder
381380
destination_path = Path(base_destination_folder) / f"{path.name}.json"
382381

383382
destination_folder = destination_path.parent

0 commit comments

Comments
 (0)