Skip to content

Commit af7a63a

Browse files
authored
Databricks Volumes v2 source connector (#288)
1 parent 622e1cb commit af7a63a

File tree

10 files changed

+268
-29
lines changed

10 files changed

+268
-29
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
---
2+
title: Databricks Volumes
3+
---
4+
5+
import NewDocument from '/snippets/general-shared-text/new-document.mdx';
6+
7+
<NewDocument />
8+
9+
import SharedContentDatabricksVolumes from '/snippets/sc-shared-text/databricks-volumes-cli-api.mdx';
10+
import SharedAPIKeyURL from '/snippets/general-shared-text/api-key-url.mdx';
11+
12+
<SharedContentDatabricksVolumes/>
13+
<SharedAPIKeyURL/>
14+
15+
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector:
16+
17+
import DatabricksVolumesAPISh from '/snippets/source_connectors/databricks-volumes.sh.mdx';
18+
import DatabricksVolumesAPIPyV2 from '/snippets/source_connectors/databricks-volumes.v2.py.mdx';
19+
20+
<CodeGroup>
21+
<DatabricksVolumesAPISh />
22+
<DatabricksVolumesAPIPyV2 />
23+
</CodeGroup>

mint.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@
150150
"open-source/ingest/source-connectors/box",
151151
"open-source/ingest/source-connectors/confluence",
152152
"open-source/ingest/source-connectors/couchbase",
153+
"open-source/ingest/source-connectors/databricks-volumes",
153154
"open-source/ingest/source-connectors/delta-table",
154155
"open-source/ingest/source-connectors/discord",
155156
"open-source/ingest/source-connectors/dropbox",
@@ -300,6 +301,7 @@
300301
"api-reference/ingest/source-connectors/box",
301302
"api-reference/ingest/source-connectors/confluence",
302303
"api-reference/ingest/source-connectors/couchbase",
304+
"api-reference/ingest/source-connectors/databricks-volumes",
303305
"api-reference/ingest/source-connectors/delta-table",
304306
"api-reference/ingest/source-connectors/discord",
305307
"api-reference/ingest/source-connectors/dropbox",
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
---
2+
title: Databricks Volumes
3+
---
4+
5+
import NewDocument from '/snippets/general-shared-text/new-document.mdx';
6+
7+
<NewDocument />
8+
9+
import SharedContentDatabricksVolumes from '/snippets/sc-shared-text/databricks-volumes-cli-api.mdx';
10+
11+
<SharedContentDatabricksVolumes/>
12+
13+
Now call the Unstructured Ingest CLI or the Unstructured Ingest Python library. The destination connector can be any of the ones supported. This example uses the local destination connector.
14+
15+
This example sends data to Unstructured API services for processing by default. To process data locally instead, see the instructions at the end of this page.
16+
17+
import DatabricksVolumesSh from '/snippets/source_connectors/databricks-volumes.sh.mdx';
18+
import DatabricksVolumesPyV2 from '/snippets/source_connectors/databricks-volumes.v2.py.mdx';
19+
20+
<CodeGroup>
21+
<DatabricksVolumesSh />
22+
<DatabricksVolumesPyV2 />
23+
</CodeGroup>
24+
25+
import SharedPartitionByAPIOSS from '/snippets/ingest-configuration-shared/partition-by-api-oss.mdx';
26+
27+
<SharedPartitionByAPIOSS/>

snippets/destination_connectors/databricks_volumes.sh.mdx

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,11 @@ unstructured-ingest \
1111
--partition-endpoint $UNSTRUCTURED_API_URL \
1212
--strategy hi_res \
1313
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}" \
14-
--chunk-by-api \
1514
--chunking-strategy by_title \
16-
--chunk-api-key $UNSTRUCTURED_API_KEY \
17-
--chunking-endpoint $UNSTRUCTURED_API_URL \
1815
--embedding-provider huggingface \
19-
--embedding-model-name sentence-transformers/all-mpnet-base-v2 \
2016
databricks-volumes \
17+
--profile $DATABRICKS_PROFILE \
2118
--host $DATABRICKS_HOST \
22-
--token $DATABRICKS_TOKEN \
23-
--cluster-id $DATABRICKS_CLUSTER_ID \
2419
--catalog $DATABRICKS_CATALOG \
2520
--schema $DATABRICKS_SCHEMA \
2621
--volume $DATABRICKS_VOLUME \

snippets/destination_connectors/databricks_volumes.v1.py.mdx

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,7 @@ def get_writer() -> Writer:
2626
connector_config=SimpleDatabricksVolumesConfig(
2727
host=os.getenv("DATABRICKS_HOST"),
2828
access_config=DatabricksVolumesAccessConfig(
29-
token=os.getenv("DATABRICKS_TOKEN"),
30-
cluster_id=os.getenv("DATABRICKS_CLUSTER_ID")
29+
token=os.getenv("DATABRICKS_TOKEN")
3130
),
3231
),
3332
write_config=DatabricksVolumesWriteConfig(
@@ -63,8 +62,7 @@ if __name__ == "__main__":
6362
chunking_strategy="by_title",
6463
),
6564
embedding_config=EmbeddingConfig(
66-
provider="huggingface",
67-
model_name="sentence-transformers/all-mpnet-base-v2",
65+
provider="huggingface"
6866
),
6967
writer=writer,
7068
writer_kwargs={},

snippets/destination_connectors/databricks_volumes.v2.py.mdx

Lines changed: 76 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,34 @@ import os
44
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
55
from unstructured_ingest.v2.interfaces import ProcessorConfig
66

7+
# For all supported Databricks authentication types, you can import this:
78
from unstructured_ingest.v2.processes.connectors.databricks_volumes import (
89
DatabricksVolumesConnectionConfig,
910
DatabricksVolumesAccessConfig,
1011
DatabricksVolumesUploaderConfig
1112
)
13+
14+
# Alternatively, for supported Databricks on AWS authentication types only, you can import this:
15+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_aws import (
16+
# DatabricksAWSVolumesConnectionConfig,
17+
# DatabricksAWSVolumesAccessConfig,
18+
# DatabricksAWSVolumesUploaderConfig
19+
# )
20+
21+
# Alternatively, for supported Azure Databricks authentication types only, you can import this:
22+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_azure import (
23+
# DatabricksAzureVolumesConnectionConfig,
24+
# DatabricksAzureVolumesAccessConfig,
25+
# DatabricksAzureVolumesUploaderConfig
26+
# )
27+
28+
# Alternatively, for supported Databricks on Google Cloud authentication types only, you can import this:
29+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_gcp import (
30+
# DatabricksGoogleVolumesConnectionConfig,
31+
# DatabricksGoogleVolumesAccessConfig,
32+
# DatabricksGoogleVolumesUploaderConfig
33+
# )
34+
1235
from unstructured_ingest.v2.processes.connectors.local import (
1336
LocalIndexerConfig,
1437
LocalDownloaderConfig,
@@ -37,28 +60,63 @@ if __name__ == "__main__":
3760
"split_pdf_concurrency_level": 15
3861
}
3962
),
40-
chunker_config=ChunkerConfig(
41-
chunk_by_api=True,
42-
chunk_api_key=os.getenv("UNSTRUCTURED_API_KEY"),
43-
chunking_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
44-
chunking_strategy="by_title"
45-
),
46-
embedder_config=EmbedderConfig(
47-
embedding_provider="huggingface",
48-
embedding_model_name="sentence-transformers/all-mpnet-base-v2"
49-
),
63+
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
64+
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
65+
# For specifying a Databricks configuration profile:
5066
destination_connection_config=DatabricksVolumesConnectionConfig(
51-
access_config=DatabricksVolumesAccessConfig(
52-
token=os.getenv("DATABRICKS_TOKEN"),
53-
cluster_id=os.getenv("DATABRICKS_CLUSTER_ID")
54-
),
55-
host=os.getenv("DATABRICKS_HOST")
56-
),
57-
uploader_config=DatabricksVolumesUploaderConfig(
67+
access_config=DatabricksVolumesAccessConfig(profile=os.getenv("DATABRICKS_PROFILE")),
68+
host=os.getenv("DATABRICKS_HOST"),
5869
catalog=os.getenv("DATABRICKS_CATALOG"),
5970
schema=os.getenv("DATABRICKS_SCHEMA"),
6071
volume=os.getenv("DATABRICKS_VOLUME"),
6172
volume_path=os.getenv("DATABRICKS_VOLUME_PATH")
62-
)
73+
),
74+
uploader_config=DatabricksVolumesUploaderConfig(overwrite=True)
75+
# Other examples:
76+
#
77+
# For Databricks on AWS, with Databricks personal access token authentication:
78+
# destination_connection_config=DatabricksAWSVolumesConnectionConfig(
79+
# access_config=DatabricksAWSVolumesAccessConfig(token=os.getenv("DATABRICKS_TOKEN")),
80+
# host=os.getenv("DATABRICKS_HOST")
81+
# ),
82+
# uploader_config=DatabricksAWSVolumesUploaderConfig(
83+
# catalog=os.getenv("DATABRICKS_CATALOG"),
84+
# schema=os.getenv("DATABRICKS_SCHEMA"),
85+
# volume=os.getenv("DATABRICKS_VOLUME"),
86+
# volume_path=os.getenv("DATABRICKS_VOLUME_PATH"),
87+
# overwrite=True
88+
# )
89+
#
90+
# For Azure Databricks, with Microsoft Entra ID service principal authentication:
91+
# destination_connection_config=DatabricksAzureVolumesConnectionConfig(
92+
# access_config=DatabricksAzureVolumesAccessConfig(
93+
# azure_client_id=os.getenv("ARM_CLIENT_ID"),
94+
# azure_client_secret=os.getenv("ARM_CLIENT_SECRET"),
95+
# azure_tenant_id=os.getenv("ARM_TENANT_ID")
96+
# ),
97+
# host=os.getenv("DATABRICKS_HOST")
98+
# ),
99+
# uploader_config=DatabricksAzureVolumesUploaderConfig(
100+
# catalog=os.getenv("DATABRICKS_CATALOG"),
101+
# schema=os.getenv("DATABRICKS_SCHEMA"),
102+
# volume=os.getenv("DATABRICKS_VOLUME"),
103+
# volume_path=os.getenv("DATABRICKS_VOLUME_PATH"),
104+
# overwrite=True
105+
# )
106+
#
107+
# For Databricks on Google Cloud, with Google Cloud Platform credentials authentication:
108+
# destination_connection_config=DatabricksGoogleVolumesConnectionConfig(
109+
# access_config=DatabricksGoogleVolumesAccessConfig(
110+
# google_service_account=os.getenv("GOOGLE_CREDENTIALS")
111+
# ),
112+
# host=os.getenv("DATABRICKS_HOST")
113+
# ),
114+
# uploader_config=DatabricksAWSVolumesUploaderConfig(
115+
# catalog=os.getenv("DATABRICKS_CATALOG"),
116+
# schema=os.getenv("DATABRICKS_SCHEMA"),
117+
# volume=os.getenv("DATABRICKS_VOLUME"),
118+
# volume_path=os.getenv("DATABRICKS_VOLUME_PATH"),
119+
# overwrite=True
120+
# )
63121
).run()
64122
```

snippets/general-shared-text/databricks-volumes-cli-api.mdx

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ import AdditionalIngestDependencies from '/snippets/general-shared-text/ingest-d
1111
The following environment variables:
1212

1313
- `DATABRICKS_HOST` - The Databricks host URL, represented by `--host` (CLI) or `host` (Python).
14-
- `DATABRICKS_CLUSTER_ID` - The Databricks compute resource ID, represented by `--cluster-id` (CLI) or `cluster_id` (Python).
1514
- `DATABRICKS_CATALOG` - The Databricks catalog name for the Volume, represented by `--catalog` (CLI) or `catalog` (Python).
1615
- `DATABRICKS_SCHEMA` - The Databricks schema name for the Volume, represented by `--schema` (CLI) or `schema` (Python). If not specified, `default` is used.
1716
- `DATABRICKS_VOLUME` - The Databricks Volume name, represented by `--volume` (CLI) or `volume` (Python).
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
Connect Databricks Volumes to your preprocessing pipeline, and use the Unstructured Ingest CLI or the Unstructured Ingest Python library to batch process all your documents and store structured outputs locally on your filesystem.
2+
3+
You will need:
4+
5+
import SharedDatabricksVolumes from '/snippets/general-shared-text/databricks-volumes.mdx';
6+
import SharedDatabricksVolumesCLIAPI from '/snippets/general-shared-text/databricks-volumes-cli-api.mdx';
7+
8+
<SharedDatabricksVolumes />
9+
<SharedDatabricksVolumesCLIAPI />
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
```bash CLI
2+
#!/usr/bin/env bash
3+
4+
# Chunking and embedding are optional.
5+
6+
unstructured-ingest \
7+
databricks-volumes \
8+
--profile $DATABRICKS_PROFILE \
9+
--host $DATABRICKS_HOST \
10+
--catalog $DATABRICKS_CATALOG \
11+
--schema $DATABRICKS_SCHEMA \
12+
--volume $DATABRICKS_VOLUME \
13+
--volume-path $DATABRICKS_VOLUME_PATH \
14+
--partition-by-api \
15+
--api-key $UNSTRUCTURED_API_KEY \
16+
--partition-endpoint $UNSTRUCTURED_API_URL \
17+
--strategy hi_res \
18+
--additional-partition-args="{\"split_pdf_page\":\"true\", \"split_pdf_allow_failed\":\"true\", \"split_pdf_concurrency_level\": 15}" \
19+
--chunking-strategy by_title \
20+
--embedding-provider huggingface \
21+
local \
22+
--output-dir $LOCAL_FILE_OUTPUT_DIR
23+
```
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
```python Python Ingest v2
2+
import os
3+
4+
from unstructured_ingest.v2.pipeline.pipeline import Pipeline
5+
from unstructured_ingest.v2.interfaces import ProcessorConfig
6+
7+
# For all supported Databricks authentication types, you can import this:
8+
from unstructured_ingest.v2.processes.connectors.databricks_volumes import (
9+
DatabricksVolumesIndexerConfig,
10+
DatabricksVolumesConnectionConfig,
11+
DatabricksVolumesAccessConfig,
12+
DatabricksVolumesDownloaderConfig
13+
)
14+
15+
# Alternatively, for supported Databricks on AWS authentication types only, you can import this:
16+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_aws import (
17+
# DatabricksAWSVolumesIndexerConfig,
18+
# DatabricksAWSVolumesConnectionConfig,
19+
# DatabricksAWSVolumesAccessConfig,
20+
# DatabricksAWSVolumesDownloaderConfig
21+
# )
22+
23+
# Alternatively, for supported Azure Databricks authentication types only, you can import this:
24+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_azure import (
25+
# DatabricksAzureVolumesIndexerConfig,
26+
# DatabricksAzureVolumesConnectionConfig,
27+
# DatabricksAzureVolumesAccessConfig,
28+
# DatabricksAzureVolumesDownloaderConfig
29+
# )
30+
31+
# Alternatively, for supported Databricks on Google Cloud authentication types only, you can import this:
32+
# from unstructured_ingest.v2.processes.connectors.databricks.volumes_gcp import (
33+
# DatabricksGoogleVolumesIndexerConfig,
34+
# DatabricksGoogleVolumesConnectionConfig,
35+
# DatabricksGoogleVolumesAccessConfig,
36+
# DatabricksGoogleVolumesDownloaderConfig
37+
# )
38+
39+
from unstructured_ingest.v2.processes.connectors.local import (
40+
LocalConnectionConfig,
41+
LocalUploaderConfig
42+
)
43+
from unstructured_ingest.v2.processes.partitioner import PartitionerConfig
44+
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
45+
from unstructured_ingest.v2.processes.embedder import EmbedderConfig
46+
47+
# Chunking and embedding are optional.
48+
49+
if __name__ == "__main__":
50+
Pipeline.from_configs(
51+
context=ProcessorConfig(reprocess=True),
52+
indexer_config=DatabricksVolumesIndexerConfig(recursive=True),
53+
# For specifying a Databricks configuration profile:
54+
downloader_config=DatabricksVolumesDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
55+
source_connection_config=DatabricksVolumesConnectionConfig(
56+
access_config=DatabricksVolumesAccessConfig(profile=os.getenv("DATABRICKS_PROFILE")),
57+
host=os.getenv("DATABRICKS_HOST"),
58+
catalog=os.getenv("DATABRICKS_CATALOG"),
59+
schema=os.getenv("DATABRICKS_SCHEMA"),
60+
volume=os.getenv("DATABRICKS_VOLUME"),
61+
volume_path=os.getenv("DATABRICKS_VOLUME_PATH")
62+
),
63+
# Other examples:
64+
#
65+
# For Databricks on AWS, with Databricks personal access token authentication:
66+
# downloader_config=DatabricksAWSVolumesDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
67+
# source_connection_config=DatabricksAWSVolumesConnectionConfig(
68+
# access_config=DatabricksAWSVolumesAccessConfig(token=os.getenv("DATABRICKS_TOKEN")),
69+
# host=os.getenv("DATABRICKS_HOST")
70+
# ),
71+
#
72+
# For Azure Databricks, with Microsoft Entra ID service principal authentication:
73+
# downloader_config=DatabricksAzureVolumesDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
74+
# source_connection_config=DatabricksAzureVolumesConnectionConfig(
75+
# access_config=DatabricksAzureVolumesAccessConfig(
76+
# azure_client_id=os.getenv("ARM_CLIENT_ID"),
77+
# azure_client_secret=os.getenv("ARM_CLIENT_SECRET"),
78+
# azure_tenant_id=os.getenv("ARM_TENANT_ID")
79+
# ),
80+
# host=os.getenv("DATABRICKS_HOST")
81+
# ),
82+
#
83+
# For Databricks on Google Cloud, with Google Cloud Platform credentials authentication:
84+
# downloader_config=DatabricksGoogleVolumesDownloaderConfig(download_dir=os.getenv("LOCAL_FILE_DOWNLOAD_DIR")),
85+
# source_connection_config=DatabricksGoogleVolumesConnectionConfig(
86+
# access_config=DatabricksGoogleVolumesAccessConfig(
87+
# google_service_account=os.getenv("GOOGLE_CREDENTIALS")
88+
# ),
89+
# host=os.getenv("DATABRICKS_HOST")
90+
# ),
91+
partitioner_config=PartitionerConfig(
92+
partition_by_api=True,
93+
api_key=os.getenv("UNSTRUCTURED_API_KEY"),
94+
partition_endpoint=os.getenv("UNSTRUCTURED_API_URL"),
95+
additional_partition_args={
96+
"split_pdf_page": True,
97+
"split_pdf_allow_failed": True,
98+
"split_pdf_concurrency_level": 15
99+
}
100+
),
101+
chunker_config=ChunkerConfig(chunking_strategy="by_title"),
102+
embedder_config=EmbedderConfig(embedding_provider="huggingface"),
103+
uploader_config=LocalUploaderConfig(output_dir=os.getenv("LOCAL_FILE_OUTPUT_DIR"))
104+
).run()
105+
```

0 commit comments

Comments
 (0)