Skip to content

Commit a8d6364

Browse files
fix/Databricks example. (#193)
1 parent 6891b87 commit a8d6364

File tree

7 files changed

+83
-274
lines changed

7 files changed

+83
-274
lines changed

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 0.1.1-dev4
1+
## 0.1.1-dev5
22

33
### Enhancements
44

@@ -11,7 +11,9 @@
1111
* **Add Delta Table destination to v2**
1212
* **Migrate Slack Source Connector to V2**
1313

14-
>>>>>>> 9214214 (bump changelog)
14+
### Fixes
15+
16+
* **Fix Databricks Volumes destination** Fix for filenames to not be hashes.
1517

1618
## 0.1.0
1719

test/integration/connectors/databricks_tests/test_volumes_native.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
source_connector_validation,
1717
)
1818
from test.integration.utils import requires_env
19-
from unstructured_ingest.v2.interfaces import FileData
19+
from unstructured_ingest.v2.interfaces import FileData, SourceIdentifiers
2020
from unstructured_ingest.v2.processes.connectors.databricks.volumes_native import (
2121
CONNECTOR_TYPE,
2222
DatabricksNativeVolumesAccessConfig,
@@ -139,7 +139,11 @@ def validate_upload(client: WorkspaceClient, catalog: str, volume: str, volume_p
139139
async def test_volumes_native_destination(upload_file: Path):
140140
env_data = get_env_data()
141141
volume_path = f"databricks-volumes-test-output-{uuid.uuid4()}"
142-
mock_file_data = FileData(identifier="mock file data", connector_type=CONNECTOR_TYPE)
142+
file_data = FileData(
143+
source_identifiers=SourceIdentifiers(fullpath=upload_file.name, filename=upload_file.name),
144+
connector_type=CONNECTOR_TYPE,
145+
identifier="mock file data",
146+
)
143147
with databricks_destination_context(
144148
volume="test-platform", volume_path=volume_path, env_data=env_data
145149
) as workspace_client:
@@ -153,9 +157,9 @@ async def test_volumes_native_destination(upload_file: Path):
153157
),
154158
)
155159
if uploader.is_async():
156-
await uploader.run_async(path=upload_file, file_data=mock_file_data)
160+
await uploader.run_async(path=upload_file, file_data=file_data)
157161
else:
158-
uploader.run(path=upload_file, file_data=mock_file_data)
162+
uploader.run(path=upload_file, file_data=file_data)
159163

160164
validate_upload(
161165
client=workspace_client,

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.1.1-dev4" # pragma: no cover
1+
__version__ = "0.1.1-dev5" # pragma: no cover

unstructured_ingest/v2/examples/databricks_volumes.py renamed to unstructured_ingest/v2/examples/databricks_volumes_dest.py

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,17 @@
55
from unstructured_ingest.v2.logger import logger
66
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
77
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
8-
from unstructured_ingest.v2.processes.connectors.databricks_volumes import (
8+
from unstructured_ingest.v2.processes.connectors.databricks.volumes_native import (
99
CONNECTOR_TYPE,
10-
DatabricksVolumesAccessConfig,
11-
DatabricksVolumesConnectionConfig,
12-
DatabricksVolumesUploaderConfig,
10+
DatabricksNativeVolumesAccessConfig,
11+
DatabricksNativeVolumesConnectionConfig,
12+
DatabricksNativeVolumesUploaderConfig,
1313
)
1414
from unstructured_ingest.v2.processes.connectors.local import (
1515
LocalConnectionConfig,
1616
LocalDownloaderConfig,
1717
LocalIndexerConfig,
1818
)
19-
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
2019
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
2120

2221
base_path = Path(__file__).parent.parent.parent.parent
@@ -29,25 +28,26 @@
2928
logger.info(f"writing all content in: {work_dir.resolve()}")
3029
Pipeline.from_configs(
3130
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
32-
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/multisimple/"),
31+
indexer_config=LocalIndexerConfig(input_path=str(docs_path.resolve()) + "/fake-text.txt"),
3332
downloader_config=LocalDownloaderConfig(download_dir=download_path),
3433
source_connection_config=LocalConnectionConfig(),
3534
partitioner_config=PartitionerConfig(strategy="fast"),
3635
chunker_config=ChunkerConfig(
37-
chunking_strategy="by_title",
38-
chunk_include_orig_elements=False,
39-
chunk_max_characters=1500,
40-
chunk_multipage_sections=True,
36+
chunking_strategy="basic",
4137
),
42-
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
43-
destination_connection_config=DatabricksVolumesConnectionConfig(
44-
access_config=DatabricksVolumesAccessConfig(
45-
username=os.environ["DATABRICKS_USERNAME"],
46-
password=os.environ["DATABRICKS_PASSWORD"],
38+
embedder_config=None,
39+
destination_connection_config=DatabricksNativeVolumesConnectionConfig(
40+
access_config=DatabricksNativeVolumesAccessConfig(
41+
client_id=os.environ["DATABRICKS_CLIENT_ID"],
42+
client_secret=os.environ["DATABRICKS_CLIENT_SECRET"],
4743
),
4844
host=os.environ["DATABRICKS_HOST"],
45+
catalog=os.environ["DATABRICKS_CATALOG"],
46+
volume=os.environ["DATABRICKS_VOLUME"],
47+
volume_path=os.environ["DATABRICKS_VOLUME_PATH"],
4948
),
50-
uploader_config=DatabricksVolumesUploaderConfig(
49+
uploader_config=DatabricksNativeVolumesUploaderConfig(
50+
overwrite=True,
5151
catalog=os.environ["DATABRICKS_CATALOG"],
5252
volume=os.environ["DATABRICKS_VOLUME"],
5353
volume_path=os.environ["DATABRICKS_VOLUME_PATH"],
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
import os
2+
from pathlib import Path
3+
4+
from unstructured_ingest.v2.interfaces import ProcessorConfig
5+
from unstructured_ingest.v2.logger import logger
6+
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
7+
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
8+
from unstructured_ingest.v2.processes.connectors.databricks.volumes_native import (
9+
CONNECTOR_TYPE,
10+
DatabricksNativeVolumesAccessConfig,
11+
DatabricksNativeVolumesConnectionConfig,
12+
DatabricksNativeVolumesDownloaderConfig,
13+
DatabricksNativeVolumesIndexerConfig,
14+
)
15+
from unstructured_ingest.v2.processes.connectors.local import (
16+
LocalUploaderConfig,
17+
)
18+
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
19+
20+
base_path = Path(__file__).parent.parent.parent.parent
21+
docs_path = base_path / "example-docs"
22+
work_dir = base_path / "tmp_ingest" / CONNECTOR_TYPE
23+
output_path = work_dir / "output"
24+
download_path = work_dir / "download"
25+
26+
if __name__ == "__main__":
27+
logger.info(f"writing all content in: {work_dir.resolve()}")
28+
Pipeline.from_configs(
29+
context=ProcessorConfig(work_dir=str(work_dir.resolve())),
30+
indexer_config=DatabricksNativeVolumesIndexerConfig(
31+
host=os.environ["DATABRICKS_HOST"],
32+
catalog=os.environ["DATABRICKS_CATALOG"],
33+
volume=os.environ["DATABRICKS_VOLUME"],
34+
volume_path=os.environ["DATABRICKS_VOLUME_PATH"],
35+
),
36+
downloader_config=DatabricksNativeVolumesDownloaderConfig(download_dir=download_path),
37+
source_connection_config=DatabricksNativeVolumesConnectionConfig(
38+
access_config=DatabricksNativeVolumesAccessConfig(
39+
client_id=os.environ["DATABRICKS_CLIENT_ID"],
40+
client_secret=os.environ["DATABRICKS_CLIENT_SECRET"],
41+
),
42+
host=os.environ["DATABRICKS_HOST"],
43+
catalog=os.environ["DATABRICKS_CATALOG"],
44+
volume=os.environ["DATABRICKS_VOLUME"],
45+
volume_path=os.environ["DATABRICKS_VOLUME_PATH"],
46+
),
47+
partitioner_config=PartitionerConfig(strategy="fast"),
48+
chunker_config=ChunkerConfig(
49+
chunking_strategy="basic",
50+
),
51+
embedder_config=None,
52+
uploader_config=LocalUploaderConfig(output_dir=str(output_path.resolve())),
53+
).run()

unstructured_ingest/v2/processes/connectors/databricks/volumes.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ def precheck(self) -> None:
166166
raise DestinationConnectionError(f"failed to validate connection: {e}")
167167

168168
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
169-
output_path = os.path.join(self.upload_config.path, path.name)
169+
output_path = os.path.join(self.upload_config.path, file_data.source_identifiers.filename)
170170
with open(path, "rb") as elements_file:
171171
self.connection_config.get_client().files.upload(
172172
file_path=output_path,

0 commit comments

Comments
 (0)