Skip to content

Commit 0510b4f

Browse files
authored
feat: add precheck to all connectors (#4)
* Support check connection via precheck method * Add back in precheck for all connectors * Add entry to changelog * Fix sql precheck
1 parent c91398b commit 0510b4f

File tree

20 files changed

+176
-41
lines changed

20 files changed

+176
-41
lines changed

.github/workflows/release.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,6 @@ on:
44
release:
55
types:
66
- published
7-
- created
8-
- released
97

108
env:
119
PYTHON_VERSION: "3.10"

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
1-
## 0.0.2-dev0
1+
## 0.0.2-dev1
22

33
### Enhancements
44

55
* **Use uuid for s3 identifiers** Update unique id to use uuid derived from file path rather than the filepath itself.
6+
* **V2 connectors precheck support** All steps in the v2 pipeline support an optional precheck call, which encompasses the previous check connection functionality.
67

78
## 0.0.1
89

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.2-dev0" # pragma: no cover
1+
__version__ = "0.0.2-dev1" # pragma: no cover

unstructured_ingest/v2/examples/example_sql.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,8 @@
1313
)
1414
from unstructured_ingest.v2.processes.connectors.sql import (
1515
DatabaseType,
16-
SimpleSqlConfig,
1716
SQLAccessConfig,
17+
SQLConnectionConfig,
1818
SQLUploaderConfig,
1919
SQLUploadStagerConfig,
2020
)
@@ -67,7 +67,7 @@
6767

6868
# sqlite test first
6969
Pipeline.from_configs(
70-
destination_connection_config=SimpleSqlConfig(
70+
destination_connection_config=SQLConnectionConfig(
7171
db_type=DatabaseType.SQLITE,
7272
database=SQLITE_DB,
7373
access_config=SQLAccessConfig(),
@@ -77,7 +77,7 @@
7777

7878
# now, pg with pgvector
7979
Pipeline.from_configs(
80-
destination_connection_config=SimpleSqlConfig(
80+
destination_connection_config=SQLConnectionConfig(
8181
db_type=DatabaseType.POSTGRESQL,
8282
database="elements",
8383
host="localhost",

unstructured_ingest/v2/interfaces/process.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@ class BaseProcess(ABC):
88
def is_async(self) -> bool:
99
return False
1010

11+
def precheck(self) -> None:
12+
pass
13+
1114
@abstractmethod
1215
def run(self, **kwargs: Any) -> Any:
1316
pass

unstructured_ingest/v2/pipeline/pipeline.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ def log_statuses(self):
109109
def run(self):
110110
try:
111111
start_time = time()
112+
self._run_prechecks()
112113
self._run()
113114
logger.info(f"Finished ingest process in {time() - start_time}s")
114115
finally:
@@ -130,6 +131,27 @@ def clean_results(self, results: Optional[list[Union[Any, list[Any]]]]) -> Optio
130131
final = [f for f in flat if f]
131132
return final or None
132133

134+
def _run_prechecks(self):
135+
steps = [self.indexer_step, self.downloader_step, self.partitioner_step, self.uploader_step]
136+
if self.chunker_step:
137+
steps.append(self.chunker_step)
138+
if self.embedder_step:
139+
steps.append(self.embedder_step)
140+
if self.uncompress_step:
141+
steps.append(self.uncompress_step)
142+
if self.stager_step:
143+
steps.append(self.stager_step)
144+
failures = {}
145+
for step in steps:
146+
try:
147+
step.process.precheck()
148+
except Exception as e:
149+
failures[step.process.__class__.__name__] = f"[{type(e).__name__}] {e}"
150+
if failures:
151+
for k, v in failures.items():
152+
logger.error(f"Step precheck failure: {k}: {v}")
153+
raise PipelineError("Precheck failed")
154+
133155
def _run(self):
134156
logger.info(
135157
f"Running local pipline: {self} with configs: "

unstructured_ingest/v2/processes/connectors/astra.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from unstructured.__version__ import __version__ as integration_version
88

99
from unstructured_ingest.enhanced_dataclass import enhanced_field
10+
from unstructured_ingest.error import DestinationConnectionError
1011
from unstructured_ingest.utils.data_prep import batch_generator
1112
from unstructured_ingest.utils.dep_check import requires_dependencies
1213
from unstructured_ingest.v2.interfaces import (
@@ -94,6 +95,13 @@ class AstraUploader(Uploader):
9495
upload_config: AstraUploaderConfig
9596
connector_type: str = CONNECTOR_TYPE
9697

98+
def precheck(self) -> None:
99+
try:
100+
self.get_collection()
101+
except Exception as e:
102+
logger.error(f"Failed to validate connection {e}", exc_info=True)
103+
raise DestinationConnectionError(f"failed to validate connection: {e}")
104+
97105
@requires_dependencies(["astrapy"], extras="astra")
98106
def get_collection(self) -> "AstraDBCollection":
99107
from astrapy.db import AstraDB

unstructured_ingest/v2/processes/connectors/azure_cognitive_search.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,6 +175,14 @@ def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs)
175175
),
176176
)
177177

178+
def precheck(self) -> None:
179+
try:
180+
client = self.connection_config.generate_client()
181+
client.get_document_count()
182+
except Exception as e:
183+
logger.error(f"failed to validate connection: {e}", exc_info=True)
184+
raise DestinationConnectionError(f"failed to validate connection: {e}")
185+
178186
def write_dict_wrapper(self, elements_dict):
179187
return self.write_dict(elements_dict=elements_dict)
180188

unstructured_ingest/v2/processes/connectors/chroma.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -111,10 +111,13 @@ class ChromaUploader(Uploader):
111111
connector_type: str = CONNECTOR_TYPE
112112
upload_config: ChromaUploaderConfig
113113
connection_config: ChromaConnectionConfig
114-
client: Optional["Client"] = field(init=False)
115114

116-
def __post_init__(self):
117-
self.client = self.create_client()
115+
def precheck(self) -> None:
116+
try:
117+
self.create_client()
118+
except Exception as e:
119+
logger.error(f"failed to validate connection: {e}", exc_info=True)
120+
raise DestinationConnectionError(f"failed to validate connection: {e}")
118121

119122
@requires_dependencies(["chromadb"], extras="chroma")
120123
def create_client(self) -> "Client":
@@ -187,10 +190,9 @@ def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
187190
f"collection {self.connection_config.collection_name} "
188191
f"at {self.connection_config.host}",
189192
)
193+
client = self.create_client()
190194

191-
collection = self.client.get_or_create_collection(
192-
name=self.connection_config.collection_name
193-
)
195+
collection = client.get_or_create_collection(name=self.connection_config.collection_name)
194196
for chunk in batch_generator(elements_dict, self.upload_config.batch_size):
195197
self.upsert_batch(collection, self.prepare_chroma_list(chunk))
196198

unstructured_ingest/v2/processes/connectors/databricks_volumes.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import TYPE_CHECKING, Any, Optional
44

55
from unstructured_ingest.enhanced_dataclass import enhanced_field
6+
from unstructured_ingest.error import DestinationConnectionError
67
from unstructured_ingest.utils.dep_check import requires_dependencies
78
from unstructured_ingest.v2.interfaces import (
89
AccessConfig,
@@ -11,6 +12,7 @@
1112
Uploader,
1213
UploaderConfig,
1314
)
15+
from unstructured_ingest.v2.logger import logger
1416
from unstructured_ingest.v2.processes.connector_registry import DestinationRegistryEntry
1517

1618
if TYPE_CHECKING:
@@ -78,6 +80,13 @@ def __post_init__(self) -> "WorkspaceClient":
7880
host=self.connection_config.host, **self.connection_config.access_config.to_dict()
7981
)
8082

83+
def precheck(self) -> None:
84+
try:
85+
assert self.client.current_user.me().active
86+
except Exception as e:
87+
logger.error(f"failed to validate connection: {e}", exc_info=True)
88+
raise DestinationConnectionError(f"failed to validate connection: {e}")
89+
8190
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
8291
for content in contents:
8392
with open(content.path, "rb") as elements_file:

0 commit comments

Comments
 (0)