Skip to content

Commit 6891b87

Browse files
authored
bugfix/support async indexing (#192)
* Support async indexing in pipeling * bump changelog * fix asyncgenerator typing
1 parent 43020b6 commit 6891b87

File tree

5 files changed

+37
-6
lines changed

5 files changed

+37
-6
lines changed

CHANGELOG.md

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
## 0.1.1-dev3
1+
## 0.1.1-dev4
22

33
### Enhancements
44

@@ -9,6 +9,9 @@
99
* **Migrate Slack Source Connector to V2**
1010
* **Migrate Slack Source Connector to V2**
1111
* **Add Delta Table destination to v2**
12+
* **Migrate Slack Source Connector to V2**
13+
14+
>>>>>>> 9214214 (bump changelog)
1215
1316
## 0.1.0
1417

unstructured_ingest/__version__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = "0.1.1-dev3" # pragma: no cover
1+
__version__ = "0.1.1-dev4" # pragma: no cover

unstructured_ingest/v2/interfaces/indexer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
from abc import ABC, abstractmethod
2-
from typing import Any, Generator, Optional, TypeVar
2+
from typing import Any, AsyncGenerator, Generator, Optional, TypeVar
33

44
from pydantic import BaseModel
55

@@ -25,3 +25,6 @@ def is_async(self) -> bool:
2525
@abstractmethod
2626
def run(self, **kwargs: Any) -> Generator[FileData, None, None]:
2727
pass
28+
29+
async def run_async(self, **kwargs: Any) -> AsyncGenerator[FileData, None]:
30+
raise NotImplementedError()

unstructured_ingest/v2/pipeline/pipeline.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from __future__ import annotations
22

3+
import asyncio
34
import logging
45
import multiprocessing as mp
56
import shutil
@@ -186,6 +187,14 @@ def apply_filter(self, records: list[dict]) -> list[dict]:
186187
filtered_records = [r for r in records if r["file_data_path"] in filtered_file_data_paths]
187188
return filtered_records
188189

190+
def get_indices(self) -> list[dict]:
191+
if self.indexer_step.process.is_async():
192+
indices = asyncio.run(self.indexer_step.run_async())
193+
else:
194+
indices = self.indexer_step.run()
195+
indices_inputs = [{"file_data_path": i} for i in indices]
196+
return indices_inputs
197+
189198
def _run(self):
190199
logger.info(
191200
f"running local pipeline: {self} with configs: " f"{self.context.model_dump_json()}"
@@ -197,8 +206,7 @@ def _run(self):
197206
self.context.status = {}
198207

199208
# Index into data source
200-
indices = self.indexer_step.run()
201-
indices_inputs = [{"file_data_path": i} for i in indices]
209+
indices_inputs = self.get_indices()
202210
if not indices_inputs:
203211
logger.info("No files to process after indexer, exiting")
204212
return

unstructured_ingest/v2/pipeline/steps/index.py

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import hashlib
22
import json
33
from dataclasses import dataclass
4-
from typing import Generator, Optional, TypeVar
4+
from typing import AsyncGenerator, Generator, Optional, TypeVar
55

66
from unstructured_ingest.v2.interfaces.indexer import Indexer
77
from unstructured_ingest.v2.logger import logger
@@ -52,6 +52,23 @@ def run(self) -> Generator[str, None, None]:
5252
raise e
5353
continue
5454

55+
async def run_async(self) -> AsyncGenerator[str, None]:
56+
async for file_data in self.process.run_async():
57+
logger.debug(f"generated file data: {file_data.to_dict()}")
58+
try:
59+
record_hash = self.get_hash(extras=[file_data.identifier])
60+
filename = f"{record_hash}.json"
61+
filepath = (self.cache_dir / filename).resolve()
62+
filepath.parent.mkdir(parents=True, exist_ok=True)
63+
with open(str(filepath), "w") as f:
64+
json.dump(file_data.to_dict(), f, indent=2)
65+
yield str(filepath)
66+
except Exception as e:
67+
logger.error(f"failed to create index for file data: {file_data}", exc_info=True)
68+
if self.context.raise_on_error:
69+
raise e
70+
continue
71+
5572
def get_hash(self, extras: Optional[list[str]]) -> str:
5673
index_config_dict = json.loads(
5774
serialize_base_model_json(model=self.process.index_config, sort_keys=True)

0 commit comments

Comments
 (0)