Skip to content

Commit d7cdbaa

Browse files
authored
bugfix/close SSHClient in sftp connector (#309)
* Add context manager for all fsspec clients, manually close SSHClient in sftp implementation * fix s3 connector * fix async upload * Add get client for all child connector configs to also check requirements * fix get client in child connector configs
1 parent 7a8c8fc commit d7cdbaa

File tree

9 files changed

+122
-247
lines changed

9 files changed

+122
-247
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
1-
## 0.3.10-dev0
1+
## 0.3.10-dev1
22

33
### Enhancements
44

55
* **Support more concrete FileData content for batch support**
66

7+
### Fixes
8+
9+
* **Fix closing SSHClient in sftp connector**
10+
711
## 0.3.9
812

913
### Enhancements

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.3.10-dev0" # pragma: no cover
1+
__version__ = "0.3.10-dev1" # pragma: no cover

unstructured_ingest/v2/processes/connectors/fsspec/azure.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from __future__ import annotations
22

3+
from contextlib import contextmanager
34
from dataclasses import dataclass, field
4-
from pathlib import Path
55
from time import time
6-
from typing import Any, Generator, Optional
6+
from typing import TYPE_CHECKING, Any, Generator, Optional
77

88
from pydantic import Field, Secret
99

1010
from unstructured_ingest.utils.dep_check import requires_dependencies
11-
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
11+
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
1212
from unstructured_ingest.v2.processes.connector_registry import (
1313
DestinationRegistryEntry,
1414
SourceRegistryEntry,
@@ -25,6 +25,9 @@
2525
)
2626
from unstructured_ingest.v2.processes.connectors.fsspec.utils import json_serial, sterilize_dict
2727

28+
if TYPE_CHECKING:
29+
from adlfs import AzureBlobFileSystem
30+
2831
CONNECTOR_TYPE = "azure"
2932

3033

@@ -89,24 +92,22 @@ def get_access_config(self) -> dict[str, Any]:
8992
}
9093
return access_configs
9194

95+
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
96+
@contextmanager
97+
def get_client(self, protocol: str) -> Generator["AzureBlobFileSystem", None, None]:
98+
with super().get_client(protocol=protocol) as client:
99+
yield client
100+
92101

93102
@dataclass
94103
class AzureIndexer(FsspecIndexer):
95104
connection_config: AzureConnectionConfig
96105
index_config: AzureIndexerConfig
97106
connector_type: str = CONNECTOR_TYPE
98107

99-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
100-
def precheck(self) -> None:
101-
super().precheck()
102-
103108
def sterilize_info(self, file_data: dict) -> dict:
104109
return sterilize_dict(data=file_data, default=azure_json_serial)
105110

106-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
107-
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
108-
return super().run(**kwargs)
109-
110111
def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
111112
path = file_data["name"]
112113
date_created = (
@@ -149,14 +150,6 @@ class AzureDownloader(FsspecDownloader):
149150
connector_type: str = CONNECTOR_TYPE
150151
download_config: Optional[AzureDownloaderConfig] = field(default_factory=AzureDownloaderConfig)
151152

152-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
153-
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
154-
return super().run(file_data=file_data, **kwargs)
155-
156-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
157-
async def run_async(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
158-
return await super().run_async(file_data=file_data, **kwargs)
159-
160153

161154
class AzureUploaderConfig(FsspecUploaderConfig):
162155
pass
@@ -168,22 +161,6 @@ class AzureUploader(FsspecUploader):
168161
connection_config: AzureConnectionConfig
169162
upload_config: AzureUploaderConfig = field(default=None)
170163

171-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
172-
def __post_init__(self):
173-
super().__post_init__()
174-
175-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
176-
def precheck(self) -> None:
177-
super().precheck()
178-
179-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
180-
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
181-
return super().run(path=path, file_data=file_data, **kwargs)
182-
183-
@requires_dependencies(["adlfs", "fsspec"], extras="azure")
184-
async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
185-
return await super().run_async(path=path, file_data=file_data, **kwargs)
186-
187164

188165
azure_source_entry = SourceRegistryEntry(
189166
indexer=AzureIndexer,

unstructured_ingest/v2/processes/connectors/fsspec/box.py

Lines changed: 12 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,16 @@
11
from __future__ import annotations
22

3+
from contextlib import contextmanager
34
from dataclasses import dataclass, field
4-
from pathlib import Path
55
from time import time
6-
from typing import Annotated, Any, Generator, Optional
6+
from typing import TYPE_CHECKING, Annotated, Any, Generator, Optional
77

88
from dateutil import parser
99
from pydantic import Field, Secret
1010
from pydantic.functional_validators import BeforeValidator
1111

1212
from unstructured_ingest.utils.dep_check import requires_dependencies
13-
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
13+
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
1414
from unstructured_ingest.v2.processes.connector_registry import (
1515
DestinationRegistryEntry,
1616
SourceRegistryEntry,
@@ -28,6 +28,9 @@
2828
)
2929
from unstructured_ingest.v2.processes.connectors.utils import conform_string_to_dict
3030

31+
if TYPE_CHECKING:
32+
from boxfs import BoxFileSystem
33+
3134
CONNECTOR_TYPE = "box"
3235

3336

@@ -72,21 +75,19 @@ def get_access_config(self) -> dict[str, Any]:
7275

7376
return access_kwargs_with_oauth
7477

78+
@requires_dependencies(["boxfs"], extras="box")
79+
@contextmanager
80+
def get_client(self, protocol: str) -> Generator["BoxFileSystem", None, None]:
81+
with super().get_client(protocol=protocol) as client:
82+
yield client
83+
7584

7685
@dataclass
7786
class BoxIndexer(FsspecIndexer):
7887
connection_config: BoxConnectionConfig
7988
index_config: BoxIndexerConfig
8089
connector_type: str = CONNECTOR_TYPE
8190

82-
@requires_dependencies(["boxfs"], extras="box")
83-
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
84-
return super().run(**kwargs)
85-
86-
@requires_dependencies(["boxfs"], extras="box")
87-
def precheck(self) -> None:
88-
super().precheck()
89-
9091
def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
9192
path = file_data["name"]
9293
date_created = None
@@ -126,14 +127,6 @@ class BoxDownloader(FsspecDownloader):
126127
connector_type: str = CONNECTOR_TYPE
127128
download_config: Optional[BoxDownloaderConfig] = field(default_factory=BoxDownloaderConfig)
128129

129-
@requires_dependencies(["boxfs"], extras="box")
130-
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
131-
return super().run(file_data=file_data, **kwargs)
132-
133-
@requires_dependencies(["boxfs"], extras="box")
134-
async def run_async(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
135-
return await super().run_async(file_data=file_data, **kwargs)
136-
137130

138131
class BoxUploaderConfig(FsspecUploaderConfig):
139132
pass
@@ -145,22 +138,6 @@ class BoxUploader(FsspecUploader):
145138
connection_config: BoxConnectionConfig
146139
upload_config: BoxUploaderConfig = field(default=None)
147140

148-
@requires_dependencies(["boxfs"], extras="box")
149-
def __post_init__(self):
150-
super().__post_init__()
151-
152-
@requires_dependencies(["boxfs"], extras="box")
153-
def precheck(self) -> None:
154-
super().precheck()
155-
156-
@requires_dependencies(["boxfs"], extras="box")
157-
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
158-
return super().run(path=path, file_data=file_data, **kwargs)
159-
160-
@requires_dependencies(["boxfs"], extras="box")
161-
async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
162-
return await super().run_async(path=path, file_data=file_data, **kwargs)
163-
164141

165142
box_source_entry = SourceRegistryEntry(
166143
indexer=BoxIndexer,

unstructured_ingest/v2/processes/connectors/fsspec/dropbox.py

Lines changed: 15 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
from __future__ import annotations
22

3+
from contextlib import contextmanager
34
from dataclasses import dataclass, field
4-
from pathlib import Path
55
from time import time
6-
from typing import Any, Generator, Optional
6+
from typing import TYPE_CHECKING, Generator, Optional
77

88
from pydantic import Field, Secret
99

1010
from unstructured_ingest.utils.dep_check import requires_dependencies
11-
from unstructured_ingest.v2.interfaces import DownloadResponse, FileData, FileDataSourceMetadata
11+
from unstructured_ingest.v2.interfaces import FileDataSourceMetadata
1212
from unstructured_ingest.v2.processes.connector_registry import (
1313
DestinationRegistryEntry,
1414
SourceRegistryEntry,
@@ -24,11 +24,16 @@
2424
FsspecUploaderConfig,
2525
)
2626

27+
if TYPE_CHECKING:
28+
from dropboxdrivefs import DropboxDriveFileSystem
29+
2730
CONNECTOR_TYPE = "dropbox"
2831

2932

3033
class DropboxIndexerConfig(FsspecIndexerConfig):
31-
pass
34+
def model_post_init(self, __context):
35+
if not self.path_without_protocol.startswith("/"):
36+
self.path_without_protocol = "/" + self.path_without_protocol
3237

3338

3439
class DropboxAccessConfig(FsspecAccessConfig):
@@ -42,6 +47,12 @@ class DropboxConnectionConfig(FsspecConnectionConfig):
4247
)
4348
connector_type: str = Field(default=CONNECTOR_TYPE, init=False)
4449

50+
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
51+
@contextmanager
52+
def get_client(self, protocol: str) -> Generator["DropboxDriveFileSystem", None, None]:
53+
with super().get_client(protocol=protocol) as client:
54+
yield client
55+
4556

4657
@dataclass
4758
class DropboxIndexer(FsspecIndexer):
@@ -83,20 +94,6 @@ def get_metadata(self, file_data: dict) -> FileDataSourceMetadata:
8394
filesize_bytes=file_size,
8495
)
8596

86-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
87-
def __post_init__(self):
88-
# dropbox expects the path to start with a /
89-
if not self.index_config.path_without_protocol.startswith("/"):
90-
self.index_config.path_without_protocol = "/" + self.index_config.path_without_protocol
91-
92-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
93-
def precheck(self) -> None:
94-
super().precheck()
95-
96-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
97-
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
98-
return super().run(**kwargs)
99-
10097

10198
class DropboxDownloaderConfig(FsspecDownloaderConfig):
10299
pass
@@ -111,14 +108,6 @@ class DropboxDownloader(FsspecDownloader):
111108
default_factory=DropboxDownloaderConfig
112109
)
113110

114-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
115-
def run(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
116-
return super().run(file_data=file_data, **kwargs)
117-
118-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
119-
async def run_async(self, file_data: FileData, **kwargs: Any) -> DownloadResponse:
120-
return await super().run_async(file_data=file_data, **kwargs)
121-
122111

123112
class DropboxUploaderConfig(FsspecUploaderConfig):
124113
pass
@@ -130,22 +119,6 @@ class DropboxUploader(FsspecUploader):
130119
connection_config: DropboxConnectionConfig
131120
upload_config: DropboxUploaderConfig = field(default=None)
132121

133-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
134-
def __post_init__(self):
135-
super().__post_init__()
136-
137-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
138-
def precheck(self) -> None:
139-
super().precheck()
140-
141-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
142-
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
143-
return super().run(path=path, file_data=file_data, **kwargs)
144-
145-
@requires_dependencies(["dropboxdrivefs", "fsspec"], extras="dropbox")
146-
async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
147-
return await super().run_async(path=path, file_data=file_data, **kwargs)
148-
149122

150123
dropbox_source_entry = SourceRegistryEntry(
151124
indexer=DropboxIndexer,

0 commit comments

Comments
 (0)