Skip to content

Commit f846083

Browse files
Florian-BACHOFlorian Bacho
andauthored
Move IngestionPipeline docstore document insertion after transformations (#19849)
* Move IngestionPipeline docstore document insertion after transformations * Fix type check --------- Co-authored-by: Florian Bacho <[email protected]>
1 parent de8ceeb commit f846083

File tree

2 files changed

+51
-20
lines changed

2 files changed

+51
-20
lines changed

llama-index-core/llama_index/core/ingestion/pipeline.py

Lines changed: 24 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,6 @@ def _prepare_inputs(
380380
def _handle_duplicates(
381381
self,
382382
nodes: Sequence[BaseNode],
383-
store_doc_text: bool = True,
384383
) -> Sequence[BaseNode]:
385384
"""Handle docstore duplicates by checking all hashes."""
386385
assert self.docstore is not None
@@ -394,14 +393,11 @@ def _handle_duplicates(
394393
nodes_to_run.append(node)
395394
current_hashes.append(node.hash)
396395

397-
self.docstore.add_documents(nodes_to_run, store_text=store_doc_text)
398-
399396
return nodes_to_run
400397

401398
def _handle_upserts(
402399
self,
403400
nodes: Sequence[BaseNode],
404-
store_doc_text: bool = True,
405401
) -> Sequence[BaseNode]:
406402
"""Handle docstore upserts by checking hashes and ids."""
407403
assert self.docstore is not None
@@ -437,11 +433,7 @@ def _handle_upserts(
437433
if self.vector_store is not None:
438434
self.vector_store.delete(ref_doc_id)
439435

440-
nodes_to_run = list(deduped_nodes_to_run.values())
441-
self.docstore.set_document_hashes({n.id_: n.hash for n in nodes_to_run})
442-
self.docstore.add_documents(nodes_to_run, store_text=store_doc_text)
443-
444-
return nodes_to_run
436+
return list(deduped_nodes_to_run.values())
445437

446438
@staticmethod
447439
def _node_batcher(
@@ -452,6 +444,23 @@ def _node_batcher(
452444
for i in range(0, len(nodes), batch_size):
453445
yield nodes[i : i + batch_size]
454446

447+
def _update_docstore(
448+
self, nodes: Sequence[BaseNode], store_doc_text: bool = True
449+
) -> None:
450+
"""Update the document store with the given nodes."""
451+
assert self.docstore is not None
452+
453+
if self.docstore_strategy in (
454+
DocstoreStrategy.UPSERTS,
455+
DocstoreStrategy.UPSERTS_AND_DELETE,
456+
):
457+
self.docstore.set_document_hashes({n.id_: n.hash for n in nodes})
458+
self.docstore.add_documents(nodes, store_text=store_doc_text)
459+
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
460+
self.docstore.add_documents(nodes, store_text=store_doc_text)
461+
else:
462+
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
463+
455464
@dispatcher.span
456465
def run(
457466
self,
@@ -493,13 +502,9 @@ def run(
493502
DocstoreStrategy.UPSERTS,
494503
DocstoreStrategy.UPSERTS_AND_DELETE,
495504
):
496-
nodes_to_run = self._handle_upserts(
497-
input_nodes, store_doc_text=store_doc_text
498-
)
505+
nodes_to_run = self._handle_upserts(input_nodes)
499506
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
500-
nodes_to_run = self._handle_duplicates(
501-
input_nodes, store_doc_text=store_doc_text
502-
)
507+
nodes_to_run = self._handle_duplicates(input_nodes)
503508
else:
504509
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
505510
elif self.docstore is not None and self.vector_store is None:
@@ -515,10 +520,7 @@ def run(
515520
"Switching to duplicates_only strategy."
516521
)
517522
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
518-
nodes_to_run = self._handle_duplicates(
519-
input_nodes, store_doc_text=store_doc_text
520-
)
521-
523+
nodes_to_run = self._handle_duplicates(input_nodes)
522524
else:
523525
nodes_to_run = input_nodes
524526

@@ -564,6 +566,9 @@ def run(
564566
if nodes_with_embeddings:
565567
self.vector_store.add(nodes_with_embeddings)
566568

569+
if self.docstore is not None:
570+
self._update_docstore(nodes_to_run, store_doc_text=store_doc_text)
571+
567572
return nodes
568573

569574
# ------ async methods ------

llama-index-core/tests/ingestion/test_pipeline.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
from multiprocessing import cpu_count
2+
from typing import Sequence, Any
23

4+
import pytest
35
from llama_index.core.embeddings.mock_embed_model import MockEmbedding
46
from llama_index.core.extractors import KeywordExtractor
57
from llama_index.core.ingestion.pipeline import IngestionPipeline
68
from llama_index.core.llms.mock import MockLLM
79
from llama_index.core.node_parser import SentenceSplitter, MarkdownElementNodeParser
810
from llama_index.core.readers import ReaderConfig, StringIterableReader
9-
from llama_index.core.schema import Document
11+
from llama_index.core.schema import Document, TransformComponent, BaseNode
1012
from llama_index.core.storage.docstore import SimpleDocumentStore
1113

1214

@@ -260,3 +262,27 @@ def test_pipeline_parallel() -> None:
260262
assert len(nodes) == 20
261263
assert pipeline.docstore is not None
262264
assert len(pipeline.docstore.docs) == 2
265+
266+
267+
def test_pipeline_with_transform_error() -> None:
268+
class RaisingTransform(TransformComponent):
269+
def __call__(
270+
self, nodes: Sequence[BaseNode], **kwargs: Any
271+
) -> Sequence[BaseNode]:
272+
raise RuntimeError
273+
274+
document1 = Document.example()
275+
document1.id_ = "1"
276+
277+
pipeline = IngestionPipeline(
278+
transformations=[
279+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
280+
RaisingTransform(),
281+
],
282+
docstore=SimpleDocumentStore(),
283+
)
284+
285+
with pytest.raises(RuntimeError):
286+
pipeline.run(documents=[document1])
287+
288+
assert pipeline.docstore.get_node("1", raise_error=False) is None

0 commit comments

Comments
 (0)