Skip to content

Commit 484e863

Browse files
bugfix/correctly set output blob path based on input path (#494)
1 parent df52ae5 commit 484e863

File tree

8 files changed

+148
-39
lines changed

8 files changed

+148
-39
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
## 1.0.29-dev1
2+
3+
* **Fixed issue in the blob storage destination connector where files with the same name were overwriting each other**
4+
15
## 1.0.29
26

37
### Fixes

test/integration/connectors/databricks/test_volumes_native.py

Lines changed: 79 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import json
22
import os
33
import uuid
4-
from contextlib import contextmanager
4+
from contextlib import contextmanager, suppress
55
from dataclasses import dataclass
66
from pathlib import Path
77
from unittest import mock
@@ -205,33 +205,39 @@ def databricks_destination_context(
205205
yield client
206206
finally:
207207
# Cleanup
208-
try:
209-
for file in client.files.list_directory_contents(
210-
directory_path=_get_volume_path(env_data.catalog, volume, volume_path)
211-
):
212-
client.files.delete(file.path)
213-
client.files.delete_directory(_get_volume_path(env_data.catalog, volume, volume_path))
214-
except NotFound:
215-
# Directory was never created, don't need to delete
216-
pass
217-
218-
219-
def validate_upload(client: WorkspaceClient, catalog: str, volume: str, volume_path: str):
220-
files = list(
221-
client.files.list_directory_contents(
222-
directory_path=_get_volume_path(catalog, volume, volume_path)
223-
)
224-
)
208+
with suppress(NotFound):
209+
client.workspace.delete(
210+
path=_get_volume_path(env_data.catalog, volume, volume_path), recursive=True
211+
)
212+
213+
214+
def list_files_recursively(client: WorkspaceClient, path: str):
215+
files = []
216+
objects = client.files.list_directory_contents(path)
217+
for obj in objects:
218+
full_path = obj.path
219+
if obj.is_directory:
220+
files.extend(list_files_recursively(client, full_path))
221+
else:
222+
files.append(full_path)
223+
return files
224+
225+
226+
def validate_upload(
227+
client: WorkspaceClient, catalog: str, volume: str, volume_path: str, num_files: int
228+
):
229+
files = list_files_recursively(client, _get_volume_path(catalog, volume, volume_path))
225230

226-
assert len(files) == 1
231+
assert len(files) == num_files
227232

228-
resp = client.files.download(files[0].path)
229-
data = json.loads(resp.contents.read())
233+
for i in range(num_files):
234+
resp = client.files.download(files[i])
235+
data = json.loads(resp.contents.read())
230236

231-
assert len(data) == 22
232-
element_types = {v["type"] for v in data}
233-
assert len(element_types) == 1
234-
assert "CompositeElement" in element_types
237+
assert len(data) == 22
238+
element_types = {v["type"] for v in data}
239+
assert len(element_types) == 1
240+
assert "CompositeElement" in element_types
235241

236242

237243
@pytest.mark.asyncio
@@ -267,4 +273,52 @@ async def test_volumes_native_destination(upload_file: Path):
267273
catalog=env_data.catalog,
268274
volume="test-platform",
269275
volume_path=volume_path,
276+
num_files=1,
277+
)
278+
279+
280+
@pytest.mark.asyncio
281+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
282+
@requires_env(
283+
"DATABRICKS_HOST", "DATABRICKS_CLIENT_ID", "DATABRICKS_CLIENT_SECRET", "DATABRICKS_CATALOG"
284+
)
285+
async def test_volumes_native_destination_same_filenames_different_folder(upload_file: Path):
286+
env_data = get_basic_auth_env_data()
287+
volume_path = f"databricks-volumes-test-output-{uuid.uuid4()}"
288+
file_data_1 = FileData(
289+
source_identifiers=SourceIdentifiers(
290+
fullpath=f"folder1/{upload_file.name}", filename=upload_file.name
291+
),
292+
connector_type=CONNECTOR_TYPE,
293+
identifier="mock file data",
294+
)
295+
file_data_2 = FileData(
296+
source_identifiers=SourceIdentifiers(
297+
fullpath=f"folder2/{upload_file.name}", filename=upload_file.name
298+
),
299+
connector_type=CONNECTOR_TYPE,
300+
identifier="mock file data",
301+
)
302+
with databricks_destination_context(
303+
volume="test-platform", volume_path=volume_path, env_data=env_data
304+
) as workspace_client:
305+
connection_config = env_data.get_connection_config()
306+
uploader = DatabricksNativeVolumesUploader(
307+
connection_config=connection_config,
308+
upload_config=DatabricksNativeVolumesUploaderConfig(
309+
volume="test-platform",
310+
volume_path=volume_path,
311+
catalog=env_data.catalog,
312+
),
313+
)
314+
uploader.precheck()
315+
uploader.run(path=upload_file, file_data=file_data_1)
316+
uploader.run(path=upload_file, file_data=file_data_2)
317+
318+
validate_upload(
319+
client=workspace_client,
320+
catalog=env_data.catalog,
321+
volume="test-platform",
322+
volume_path=volume_path,
323+
num_files=2,
270324
)

test/integration/connectors/test_onedrive.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ async def test_onedrive_source(temp_dir):
113113

114114
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
115115
@requires_env("MS_CLIENT_CRED", "MS_CLIENT_ID", "MS_TENANT_ID", "MS_USER_PNAME")
116-
def xtest_onedrive_destination(upload_file: Path, onedrive_test_folder: str):
116+
def test_onedrive_destination(upload_file: Path, onedrive_test_folder: str):
117117
"""
118118
Integration test for the OneDrive destination connector.
119119
@@ -137,7 +137,7 @@ def xtest_onedrive_destination(upload_file: Path, onedrive_test_folder: str):
137137

138138
file_data = FileData(
139139
source_identifiers=SourceIdentifiers(
140-
fullpath=destination_fullpath,
140+
fullpath=upload_file.name,
141141
filename=upload_file.name,
142142
),
143143
connector_type=CONNECTOR_TYPE,

test/integration/connectors/test_s3.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,3 +182,50 @@ async def test_s3_destination(upload_file: Path):
182182
assert len(uploaded_files) == 1
183183
finally:
184184
s3fs.rm(path=destination_path, recursive=True)
185+
186+
187+
@pytest.mark.asyncio
188+
@pytest.mark.tags(CONNECTOR_TYPE, DESTINATION_TAG, BLOB_STORAGE_TAG)
189+
@requires_env("S3_INGEST_TEST_ACCESS_KEY", "S3_INGEST_TEST_SECRET_KEY")
190+
async def test_s3_destination_same_filename_different_folders(upload_file: Path):
191+
aws_credentials = get_aws_credentials()
192+
s3_bucket = "s3://utic-ingest-test-fixtures"
193+
destination_path = f"{s3_bucket}/destination/{uuid.uuid4()}"
194+
connection_config = S3ConnectionConfig(
195+
access_config=S3AccessConfig(
196+
key=aws_credentials["aws_access_key_id"],
197+
secret=aws_credentials["aws_secret_access_key"],
198+
),
199+
)
200+
upload_config = S3UploaderConfig(remote_url=destination_path)
201+
uploader = S3Uploader(connection_config=connection_config, upload_config=upload_config)
202+
s3fs = uploader.fs
203+
file_data_1 = FileData(
204+
source_identifiers=SourceIdentifiers(
205+
fullpath="folder1/" + upload_file.name, filename=upload_file.name
206+
),
207+
connector_type=CONNECTOR_TYPE,
208+
identifier="mock file data",
209+
)
210+
file_data_2 = FileData(
211+
source_identifiers=SourceIdentifiers(
212+
fullpath="folder2/" + upload_file.name, filename=upload_file.name
213+
),
214+
connector_type=CONNECTOR_TYPE,
215+
identifier="mock file data",
216+
)
217+
218+
try:
219+
uploader.precheck()
220+
if uploader.is_async():
221+
await uploader.run_async(path=upload_file, file_data=file_data_1)
222+
await uploader.run_async(path=upload_file, file_data=file_data_2)
223+
else:
224+
uploader.run(path=upload_file, file_data=file_data_1)
225+
uploader.run(path=upload_file, file_data=file_data_2)
226+
uploaded_files = [
227+
Path(file) for file in s3fs.ls(path=destination_path) if Path(file).name != "_empty"
228+
]
229+
assert len(uploaded_files) == 2
230+
finally:
231+
s3fs.rm(path=destination_path, recursive=True)

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "1.0.29" # pragma: no cover
1+
__version__ = "1.0.29-dev1" # pragma: no cover

unstructured_ingest/processes/connectors/databricks/volumes.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -196,9 +196,14 @@ class DatabricksVolumesUploader(Uploader, ABC):
196196
connection_config: DatabricksVolumesConnectionConfig
197197

198198
def get_output_path(self, file_data: FileData) -> str:
199-
return os.path.join(
200-
self.upload_config.path, f"{file_data.source_identifiers.filename}.json"
201-
)
199+
if file_data.source_identifiers.fullpath:
200+
return os.path.join(
201+
self.upload_config.path, f"{file_data.source_identifiers.fullpath}.json"
202+
)
203+
else:
204+
return os.path.join(
205+
self.upload_config.path, f"{file_data.source_identifiers.filename}.json"
206+
)
202207

203208
def precheck(self) -> None:
204209
try:

unstructured_ingest/processes/connectors/fsspec/fsspec.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ def precheck(self) -> None:
344344

345345
def get_upload_path(self, file_data: FileData) -> Path:
346346
upload_path = (
347-
Path(self.upload_config.path_without_protocol)
348-
/ file_data.source_identifiers.relative_path
347+
Path(self.upload_config.path_without_protocol) / file_data.source_identifiers.fullpath
349348
)
350349
updated_upload_path = upload_path.parent / f"{upload_path.name}.json"
351350
return updated_upload_path
@@ -358,8 +357,8 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
358357
client.upload(lpath=path_str, rpath=upload_path.as_posix())
359358

360359
async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
361-
upload_path = self.get_upload_path(file_data=file_data)
362360
path_str = str(path.resolve())
361+
upload_path = self.get_upload_path(file_data=file_data)
363362
# Odd that fsspec doesn't run exists() as async even when client support async
364363
logger.debug(f"writing local file {path_str} to {upload_path}")
365364
with self.connection_config.get_client(protocol=self.upload_config.protocol) as client:

unstructured_ingest/processes/connectors/onedrive.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -370,14 +370,14 @@ def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
370370
# Use the remote_url from upload_config as the base destination folder
371371
base_destination_folder = self.upload_config.url
372372

373-
# Use the file's relative path to maintain directory structure, if needed
374-
if file_data.source_identifiers and file_data.source_identifiers.rel_path:
375-
# Combine the base destination folder with the file's relative path
373+
# Use the file's full path to maintain directory structure, if needed
374+
if file_data.source_identifiers and file_data.source_identifiers.fullpath:
375+
# Combine the base destination folder with the file's full path
376376
destination_path = Path(base_destination_folder) / Path(
377-
f"{file_data.source_identifiers.rel_path}.json"
377+
f"{file_data.source_identifiers.fullpath}.json"
378378
)
379379
else:
380-
# If no relative path is provided, upload directly to the base destination folder
380+
# If no full path is provided, upload directly to the base destination folder
381381
destination_path = Path(base_destination_folder) / f"{path.name}.json"
382382

383383
destination_folder = destination_path.parent

0 commit comments

Comments
 (0)