Skip to content

Commit 1692239

Browse files
Update IngestionPipeline async document store insertion (#19868)
1 parent 24f9420 commit 1692239

File tree

2 files changed

+197
-7
lines changed

2 files changed

+197
-7
lines changed

llama-index-core/llama_index/core/ingestion/pipeline.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -572,6 +572,23 @@ def run(
572572
return nodes
573573

574574
# ------ async methods ------
575+
async def _aupdate_docstore(
576+
self, nodes: Sequence[BaseNode], store_doc_text: bool = True
577+
) -> None:
578+
"""Update the document store with the given nodes."""
579+
assert self.docstore is not None
580+
581+
if self.docstore_strategy in (
582+
DocstoreStrategy.UPSERTS,
583+
DocstoreStrategy.UPSERTS_AND_DELETE,
584+
):
585+
await self.docstore.aset_document_hashes({n.id_: n.hash for n in nodes})
586+
await self.docstore.async_add_documents(nodes, store_text=store_doc_text)
587+
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
588+
await self.docstore.async_add_documents(nodes, store_text=store_doc_text)
589+
else:
590+
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
591+
575592
async def _ahandle_duplicates(
576593
self,
577594
nodes: Sequence[BaseNode],
@@ -589,8 +606,6 @@ async def _ahandle_duplicates(
589606
nodes_to_run.append(node)
590607
current_hashes.append(node.hash)
591608

592-
await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)
593-
594609
return nodes_to_run
595610

596611
async def _ahandle_upserts(
@@ -632,11 +647,7 @@ async def _ahandle_upserts(
632647
if self.vector_store is not None:
633648
await self.vector_store.adelete(ref_doc_id)
634649

635-
nodes_to_run = list(deduped_nodes_to_run.values())
636-
await self.docstore.async_add_documents(nodes_to_run, store_text=store_doc_text)
637-
await self.docstore.aset_document_hashes({n.id_: n.hash for n in nodes_to_run})
638-
639-
return nodes_to_run
650+
return list(deduped_nodes_to_run.values())
640651

641652
@dispatcher.span
642653
async def arun(
@@ -757,4 +768,7 @@ async def arun(
757768
if nodes_with_embeddings:
758769
await self.vector_store.async_add(nodes_with_embeddings)
759770

771+
if self.docstore is not None:
772+
await self._aupdate_docstore(nodes_to_run, store_doc_text=store_doc_text)
773+
760774
return nodes

llama-index-core/tests/ingestion/test_pipeline.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,3 +286,179 @@ def __call__(
286286
pipeline.run(documents=[document1])
287287

288288
assert pipeline.docstore.get_node("1", raise_error=False) is None
289+
290+
291+
@pytest.mark.asyncio
292+
async def test_arun_pipeline() -> None:
293+
pipeline = IngestionPipeline(
294+
readers=[
295+
ReaderConfig(
296+
reader=StringIterableReader(),
297+
reader_kwargs={"texts": ["This is a test."]},
298+
)
299+
],
300+
documents=[Document.example()],
301+
transformations=[
302+
SentenceSplitter(),
303+
KeywordExtractor(llm=MockLLM()),
304+
],
305+
)
306+
307+
nodes = await pipeline.arun()
308+
309+
assert len(nodes) == 2
310+
assert len(nodes[0].metadata) > 0
311+
312+
313+
@pytest.mark.asyncio
314+
async def test_arun_pipeline_with_ref_doc_id():
315+
documents = [
316+
Document(text="one", doc_id="1"),
317+
]
318+
pipeline = IngestionPipeline(
319+
documents=documents,
320+
transformations=[
321+
MarkdownElementNodeParser(),
322+
SentenceSplitter(),
323+
MockEmbedding(embed_dim=8),
324+
],
325+
)
326+
327+
nodes = await pipeline.arun()
328+
329+
assert len(nodes) == 1
330+
assert nodes[0].ref_doc_id == "1"
331+
332+
333+
@pytest.mark.asyncio
334+
async def test_async_pipeline_update_text_content() -> None:
335+
document1 = Document.example()
336+
document1.id_ = "1"
337+
338+
pipeline = IngestionPipeline(
339+
transformations=[
340+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
341+
],
342+
docstore=SimpleDocumentStore(),
343+
)
344+
345+
nodes = await pipeline.arun(documents=[document1])
346+
assert len(nodes) == 19
347+
assert pipeline.docstore is not None
348+
assert len(pipeline.docstore.docs) == 1
349+
350+
# adjust document content
351+
document1 = Document(text="test", doc_id="1")
352+
353+
# run pipeline again
354+
nodes = pipeline.run(documents=[document1])
355+
356+
assert len(nodes) == 1
357+
assert pipeline.docstore is not None
358+
assert len(pipeline.docstore.docs) == 1
359+
assert next(iter(pipeline.docstore.docs.values())).text == "test" # type: ignore
360+
361+
362+
@pytest.mark.asyncio
363+
async def test_async_pipeline_update_metadata() -> None:
364+
"""Test that IngestionPipeline updates document metadata, if it changed."""
365+
old_metadata = {"filename": "README.md", "category": "codebase"}
366+
document1 = Document.example()
367+
document1.metadata = old_metadata
368+
document1.id_ = "1"
369+
370+
pipeline = IngestionPipeline(
371+
transformations=[
372+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
373+
],
374+
docstore=SimpleDocumentStore(),
375+
)
376+
377+
nodes = await pipeline.arun(documents=[document1])
378+
assert len(nodes) >= 1
379+
assert pipeline.docstore is not None
380+
assert len(pipeline.docstore.docs) == 1
381+
for node in nodes:
382+
assert node.metadata == old_metadata
383+
384+
# adjust document metadata
385+
new_metadata = {"filename": "README.md", "category": "documentation"}
386+
document1.metadata = new_metadata
387+
388+
# run pipeline again
389+
nodes_new = pipeline.run(documents=[document1])
390+
391+
assert len(nodes_new) == len(nodes)
392+
assert pipeline.docstore is not None
393+
assert len(pipeline.docstore.docs) == 1
394+
assert next(iter(pipeline.docstore.docs.values())).metadata == new_metadata # type: ignore
395+
for node in nodes_new:
396+
assert node.metadata == new_metadata
397+
398+
399+
@pytest.mark.asyncio
400+
async def test_async_pipeline_dedup_duplicates_only() -> None:
401+
documents = [
402+
Document(text="one", doc_id="1"),
403+
Document(text="two", doc_id="2"),
404+
Document(text="three", doc_id="3"),
405+
]
406+
407+
pipeline = IngestionPipeline(
408+
transformations=[
409+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
410+
],
411+
docstore=SimpleDocumentStore(),
412+
)
413+
414+
nodes = await pipeline.arun(documents=documents)
415+
assert len(nodes) == 3
416+
417+
nodes = await pipeline.arun(documents=documents)
418+
assert len(nodes) == 0
419+
420+
421+
async def test_async_pipeline_parallel() -> None:
422+
document1 = Document.example()
423+
document1.id_ = "1"
424+
document2 = Document(text="One\n\n\nTwo\n\n\nThree.", doc_id="2")
425+
426+
pipeline = IngestionPipeline(
427+
transformations=[
428+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
429+
],
430+
docstore=SimpleDocumentStore(),
431+
)
432+
433+
num_workers = min(2, cpu_count())
434+
nodes = await pipeline.arun(
435+
documents=[document1, document2], num_workers=num_workers
436+
)
437+
assert len(nodes) == 20
438+
assert pipeline.docstore is not None
439+
assert len(pipeline.docstore.docs) == 2
440+
441+
442+
@pytest.mark.asyncio
443+
async def test_async_pipeline_with_transform_error() -> None:
444+
class RaisingTransform(TransformComponent):
445+
def __call__(
446+
self, nodes: Sequence[BaseNode], **kwargs: Any
447+
) -> Sequence[BaseNode]:
448+
raise RuntimeError
449+
450+
document1 = Document.example()
451+
document1.id_ = "1"
452+
453+
pipeline = IngestionPipeline(
454+
transformations=[
455+
SentenceSplitter(chunk_size=25, chunk_overlap=0),
456+
RaisingTransform(),
457+
],
458+
docstore=SimpleDocumentStore(),
459+
)
460+
461+
with pytest.raises(RuntimeError):
462+
await pipeline.arun(documents=[document1])
463+
464+
assert pipeline.docstore.get_node("1", raise_error=False) is None

0 commit comments

Comments
 (0)