Skip to content

Commit a6d3239

Browse files
authored
feat/support parent mutliprocessing fan out (#36)
* support parent mutliprocessing fan out * Remove mention of num_processes from pinecone connector * Create new implementation of BatchPipelineStep * fix method signatures on fsspec connectors * Remove --num-processes from pinecone dest ingest test * fix tidy * fix tidy * fix import error
1 parent 8d33642 commit a6d3239

28 files changed

+145
-206
lines changed

CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
## 0.0.7-dev0
1+
## 0.0.7-dev1
2+
3+
### Enhancements
4+
5+
* **support sharing parent multiprocessing for uploaders** If an uploader needs to fan out it's process using multiprocessing, support that using the parent pipeline approach rather than handling it explicitly by the connector logic.
26

37
## 0.0.6
48

test_e2e/dest/pinecone.sh

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ OUTPUT_FOLDER_NAME=s3-pinecone-dest
99
OUTPUT_DIR=$SCRIPT_DIR/structured-output/$OUTPUT_FOLDER_NAME
1010
WORK_DIR=$SCRIPT_DIR/workdir/$OUTPUT_FOLDER_NAME
1111
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}
12-
writer_processes=$(((max_processes - 1) > 1 ? (max_processes - 1) : 2))
1312

1413
if [ -z "$PINECONE_API_KEY" ]; then
1514
echo "Skipping Pinecone ingest test because PINECONE_API_KEY env var is not set."
@@ -101,8 +100,7 @@ PYTHONPATH=. ./unstructured_ingest/main.py \
101100
pinecone \
102101
--api-key "$PINECONE_API_KEY" \
103102
--index-name "$PINECONE_INDEX" \
104-
--batch-size 80 \
105-
--num-processes "$writer_processes"
103+
--batch-size 80
106104

107105
# It can take some time for the index to catch up with the content that was written, this check between 10s sleeps
108106
# to give it that time process the writes. Will timeout after checking for a minute.

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.0.7-dev0" # pragma: no cover
1+
__version__ = "0.0.7-dev1" # pragma: no cover

unstructured_ingest/v2/interfaces/uploader.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
from abc import ABC, abstractmethod
1+
from abc import ABC
22
from dataclasses import dataclass
33
from pathlib import Path
44
from typing import Any, TypeVar
@@ -31,9 +31,14 @@ class Uploader(BaseProcess, BaseConnector, ABC):
3131
def is_async(self) -> bool:
3232
return False
3333

34-
@abstractmethod
35-
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
36-
pass
34+
def is_batch(self) -> bool:
35+
return False
36+
37+
def run_batch(self, contents: list[UploadContent], **kwargs: Any) -> None:
38+
raise NotImplementedError()
39+
40+
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
41+
raise NotImplementedError()
3742

3843
async def run_async(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
3944
return self.run(contents=[UploadContent(path=path, file_data=file_data)], **kwargs)

unstructured_ingest/v2/pipeline/interfaces.py

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import asyncio
22
import logging
33
import multiprocessing as mp
4-
from abc import ABC
4+
from abc import ABC, abstractmethod
55
from concurrent.futures import ThreadPoolExecutor
66
from dataclasses import dataclass
77
from functools import wraps
@@ -12,7 +12,7 @@
1212
from tqdm import tqdm
1313
from tqdm.asyncio import tqdm as tqdm_asyncio
1414

15-
from unstructured_ingest.v2.interfaces import BaseProcess, ProcessorConfig
15+
from unstructured_ingest.v2.interfaces import BaseProcess, ProcessorConfig, Uploader
1616
from unstructured_ingest.v2.logger import logger, make_default_logger
1717

1818
BaseProcessT = TypeVar("BaseProcessT", bound=BaseProcess)
@@ -167,3 +167,27 @@ async def run_async(self, _fn: Optional[Callable] = None, **kwargs: Any) -> Opti
167167
@property
168168
def cache_dir(self) -> Path:
169169
return Path(self.context.work_dir) / self.identifier
170+
171+
172+
@dataclass
173+
class BatchPipelineStep(PipelineStep, ABC):
174+
process: Uploader
175+
176+
@timed
177+
def __call__(self, iterable: Optional[iterable_input] = None) -> Any:
178+
if self.context.mp_supported and self.process.is_batch():
179+
return self.run_batch(contents=iterable)
180+
super().__call__(iterable=iterable)
181+
182+
@abstractmethod
183+
def _run_batch(self, contents: iterable_input, **kwargs) -> Any:
184+
pass
185+
186+
def run_batch(self, contents: iterable_input, **kwargs) -> Any:
187+
try:
188+
return self._run_batch(contents=contents, **kwargs)
189+
except Exception as e:
190+
self.context.status[self.identifier] = {"step_error": str(e)}
191+
if self.context.raise_on_error:
192+
raise e
193+
return None

unstructured_ingest/v2/pipeline/pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from time import time
55
from typing import Any, Optional, Union
66

7-
from unstructured_ingest.v2.interfaces import ProcessorConfig
7+
from unstructured_ingest.v2.interfaces import ProcessorConfig, Uploader
88
from unstructured_ingest.v2.logger import logger, make_default_logger
99
from unstructured_ingest.v2.pipeline.steps.chunk import Chunker, ChunkStep
1010
from unstructured_ingest.v2.pipeline.steps.download import DownloaderT, DownloadStep
@@ -14,7 +14,7 @@
1414
from unstructured_ingest.v2.pipeline.steps.partition import Partitioner, PartitionStep
1515
from unstructured_ingest.v2.pipeline.steps.stage import UploadStager, UploadStageStep
1616
from unstructured_ingest.v2.pipeline.steps.uncompress import Uncompressor, UncompressStep
17-
from unstructured_ingest.v2.pipeline.steps.upload import Uploader, UploadStep
17+
from unstructured_ingest.v2.pipeline.steps.upload import UploadStep
1818
from unstructured_ingest.v2.processes.chunker import ChunkerConfig
1919
from unstructured_ingest.v2.processes.connector_registry import (
2020
ConnectionConfig,

unstructured_ingest/v2/pipeline/steps/upload.py

Lines changed: 5 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,9 @@
44
from typing import Callable, Optional, TypedDict
55

66
from unstructured_ingest.v2.interfaces import FileData
7-
from unstructured_ingest.v2.interfaces.uploader import UploadContent, Uploader
7+
from unstructured_ingest.v2.interfaces.uploader import UploadContent
88
from unstructured_ingest.v2.logger import logger
9-
from unstructured_ingest.v2.pipeline.interfaces import PipelineStep, iterable_input, timed
9+
from unstructured_ingest.v2.pipeline.interfaces import BatchPipelineStep
1010

1111
STEP_ID = "upload"
1212

@@ -17,8 +17,7 @@ class UploadStepContent(TypedDict):
1717

1818

1919
@dataclass
20-
class UploadStep(PipelineStep):
21-
process: Uploader
20+
class UploadStep(BatchPipelineStep):
2221
identifier: str = STEP_ID
2322

2423
def __str__(self):
@@ -34,25 +33,12 @@ def __post_init__(self):
3433
f"connection configs: {connection_config}"
3534
)
3635

37-
def process_whole(self, iterable: iterable_input):
38-
self.run(contents=iterable)
39-
40-
@timed
41-
def __call__(self, iterable: iterable_input):
42-
logger.info(
43-
f"Calling {self.__class__.__name__} " f"with {len(iterable)} docs", # type: ignore
44-
)
45-
if self.process.is_async():
46-
self.process_async(iterable=iterable)
47-
else:
48-
self.process_whole(iterable=iterable)
49-
50-
def _run(self, fn: Callable, contents: list[UploadStepContent]):
36+
def _run_batch(self, contents: list[UploadStepContent]) -> None:
5137
upload_contents = [
5238
UploadContent(path=Path(c["path"]), file_data=FileData.from_file(c["file_data_path"]))
5339
for c in contents
5440
]
55-
fn(contents=upload_contents)
41+
self.process.run_batch(contents=upload_contents)
5642

5743
async def _run_async(self, path: str, file_data_path: str, fn: Optional[Callable] = None):
5844
fn = fn or self.process.run_async

unstructured_ingest/v2/processes/connectors/astradb.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
AccessConfig,
1515
ConnectionConfig,
1616
FileData,
17-
UploadContent,
1817
Uploader,
1918
UploaderConfig,
2019
UploadStager,
@@ -139,13 +138,9 @@ def get_collection(self) -> "AstraDBCollection":
139138
)
140139
return astra_db_collection
141140

142-
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
143-
elements_dict = []
144-
for content in contents:
145-
with open(content.path) as elements_file:
146-
elements = json.load(elements_file)
147-
elements_dict.extend(elements)
148-
141+
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
142+
with path.open("r") as file:
143+
elements_dict = json.load(file)
149144
logger.info(
150145
f"writing {len(elements_dict)} objects to destination "
151146
f"collection {self.upload_config.collection_name}"

unstructured_ingest/v2/processes/connectors/azure_cognitive_search.py

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from unstructured_ingest.v2.interfaces import (
1313
AccessConfig,
1414
ConnectionConfig,
15-
UploadContent,
15+
FileData,
1616
Uploader,
1717
UploaderConfig,
1818
UploadStager,
@@ -192,14 +192,9 @@ def precheck(self) -> None:
192192
def write_dict_wrapper(self, elements_dict):
193193
return self.write_dict(elements_dict=elements_dict)
194194

195-
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
196-
197-
elements_dict = []
198-
for content in contents:
199-
with open(content.path) as elements_file:
200-
elements = json.load(elements_file)
201-
elements_dict.extend(elements)
202-
195+
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
196+
with path.open("r") as file:
197+
elements_dict = json.load(file)
203198
logger.info(
204199
f"writing document batches to destination"
205200
f" endpoint at {str(self.connection_config.endpoint)}"

unstructured_ingest/v2/processes/connectors/chroma.py

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
AccessConfig,
1616
ConnectionConfig,
1717
FileData,
18-
UploadContent,
1918
Uploader,
2019
UploaderConfig,
2120
UploadStager,
@@ -186,13 +185,9 @@ def prepare_chroma_list(chunk: tuple[dict[str, Any]]) -> dict[str, list[Any]]:
186185
)
187186
return chroma_dict
188187

189-
def run(self, contents: list[UploadContent], **kwargs: Any) -> None:
190-
191-
elements_dict = []
192-
for content in contents:
193-
with open(content.path) as elements_file:
194-
elements = json.load(elements_file)
195-
elements_dict.extend(elements)
188+
def run(self, path: Path, file_data: FileData, **kwargs: Any) -> None:
189+
with path.open("r") as file:
190+
elements_dict = json.load(file)
196191

197192
logger.info(
198193
f"writing {len(elements_dict)} objects to destination "

0 commit comments

Comments
 (0)