Skip to content

Commit 17b68b8

Browse files
fix(core): preserve docstore_strategy across pipeline runs when no vector store is attached (#20824)
fix: don't mutate docstore_strategy when vector store is absent
1 parent e50bf2d commit 17b68b8

File tree

2 files changed

+101
-42
lines changed

2 files changed

+101
-42
lines changed

llama-index-core/llama_index/core/ingestion/pipeline.py

Lines changed: 62 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -459,21 +459,24 @@ def _node_batcher(
459459
yield nodes[i : i + batch_size]
460460

461461
def _update_docstore(
462-
self, nodes: Sequence[BaseNode], store_doc_text: bool = True
462+
self,
463+
nodes: Sequence[BaseNode],
464+
effective_strategy: DocstoreStrategy,
465+
store_doc_text: bool = True,
463466
) -> None:
464467
"""Update the document store with the given nodes."""
465468
assert self.docstore is not None
466469

467-
if self.docstore_strategy in (
470+
if effective_strategy in (
468471
DocstoreStrategy.UPSERTS,
469472
DocstoreStrategy.UPSERTS_AND_DELETE,
470473
):
471474
self.docstore.set_document_hashes({n.id_: n.hash for n in nodes})
472475
self.docstore.add_documents(nodes, store_text=store_doc_text)
473-
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
476+
elif effective_strategy == DocstoreStrategy.DUPLICATES_ONLY:
474477
self.docstore.add_documents(nodes, store_text=store_doc_text)
475478
else:
476-
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
479+
raise ValueError(f"Invalid docstore strategy: {effective_strategy}")
477480

478481
@dispatcher.span
479482
def run(
@@ -511,30 +514,34 @@ def run(
511514
"""
512515
input_nodes = self._prepare_inputs(documents, nodes)
513516

517+
effective_strategy = self.docstore_strategy
518+
if (
519+
self.docstore is not None
520+
and self.vector_store is None
521+
and self.docstore_strategy
522+
in (DocstoreStrategy.UPSERTS, DocstoreStrategy.UPSERTS_AND_DELETE)
523+
):
524+
warnings.warn(
525+
f"docstore_strategy='{self.docstore_strategy.value}' requires a vector store "
526+
"to apply upsert/delete semantics; falling back to 'duplicates_only' for this run. "
527+
"pipeline.docstore_strategy is unchanged.",
528+
UserWarning,
529+
stacklevel=3,
530+
)
531+
effective_strategy = DocstoreStrategy.DUPLICATES_ONLY
532+
514533
# check if we need to dedup
515534
if self.docstore is not None and self.vector_store is not None:
516-
if self.docstore_strategy in (
535+
if effective_strategy in (
517536
DocstoreStrategy.UPSERTS,
518537
DocstoreStrategy.UPSERTS_AND_DELETE,
519538
):
520539
nodes_to_run = self._handle_upserts(input_nodes)
521-
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
540+
elif effective_strategy == DocstoreStrategy.DUPLICATES_ONLY:
522541
nodes_to_run = self._handle_duplicates(input_nodes)
523542
else:
524-
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
543+
raise ValueError(f"Invalid docstore strategy: {effective_strategy}")
525544
elif self.docstore is not None and self.vector_store is None:
526-
if self.docstore_strategy == DocstoreStrategy.UPSERTS:
527-
logger.info(
528-
"Docstore strategy set to upserts, but no vector store. "
529-
"Switching to duplicates_only strategy."
530-
)
531-
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
532-
elif self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
533-
logger.info(
534-
"Docstore strategy set to upserts and delete, but no vector store. "
535-
"Switching to duplicates_only strategy."
536-
)
537-
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
538545
nodes_to_run = self._handle_duplicates(input_nodes)
539546
else:
540547
nodes_to_run = input_nodes
@@ -582,27 +589,34 @@ def run(
582589
self.vector_store.add(nodes_with_embeddings)
583590

584591
if self.docstore is not None:
585-
self._update_docstore(nodes_to_run, store_doc_text=store_doc_text)
592+
self._update_docstore(
593+
nodes_to_run,
594+
effective_strategy=effective_strategy,
595+
store_doc_text=store_doc_text,
596+
)
586597

587598
return nodes
588599

589600
# ------ async methods ------
590601
async def _aupdate_docstore(
591-
self, nodes: Sequence[BaseNode], store_doc_text: bool = True
602+
self,
603+
nodes: Sequence[BaseNode],
604+
effective_strategy: DocstoreStrategy,
605+
store_doc_text: bool = True,
592606
) -> None:
593607
"""Update the document store with the given nodes."""
594608
assert self.docstore is not None
595609

596-
if self.docstore_strategy in (
610+
if effective_strategy in (
597611
DocstoreStrategy.UPSERTS,
598612
DocstoreStrategy.UPSERTS_AND_DELETE,
599613
):
600614
await self.docstore.aset_document_hashes({n.id_: n.hash for n in nodes})
601615
await self.docstore.async_add_documents(nodes, store_text=store_doc_text)
602-
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
616+
elif effective_strategy == DocstoreStrategy.DUPLICATES_ONLY:
603617
await self.docstore.async_add_documents(nodes, store_text=store_doc_text)
604618
else:
605-
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
619+
raise ValueError(f"Invalid docstore strategy: {effective_strategy}")
606620

607621
async def _ahandle_duplicates(
608622
self,
@@ -700,38 +714,41 @@ async def arun(
700714
"""
701715
input_nodes = self._prepare_inputs(documents, nodes)
702716

717+
effective_strategy = self.docstore_strategy
718+
if (
719+
self.docstore is not None
720+
and self.vector_store is None
721+
and self.docstore_strategy
722+
in (DocstoreStrategy.UPSERTS, DocstoreStrategy.UPSERTS_AND_DELETE)
723+
):
724+
warnings.warn(
725+
f"docstore_strategy='{self.docstore_strategy.value}' requires a vector store "
726+
"to apply upsert/delete semantics; falling back to 'duplicates_only' for this run. "
727+
"pipeline.docstore_strategy is unchanged.",
728+
UserWarning,
729+
stacklevel=3,
730+
)
731+
effective_strategy = DocstoreStrategy.DUPLICATES_ONLY
732+
703733
# check if we need to dedup
704734
if self.docstore is not None and self.vector_store is not None:
705-
if self.docstore_strategy in (
735+
if effective_strategy in (
706736
DocstoreStrategy.UPSERTS,
707737
DocstoreStrategy.UPSERTS_AND_DELETE,
708738
):
709739
nodes_to_run = await self._ahandle_upserts(
710740
input_nodes, store_doc_text=store_doc_text
711741
)
712-
elif self.docstore_strategy == DocstoreStrategy.DUPLICATES_ONLY:
742+
elif effective_strategy == DocstoreStrategy.DUPLICATES_ONLY:
713743
nodes_to_run = await self._ahandle_duplicates(
714744
input_nodes, store_doc_text=store_doc_text
715745
)
716746
else:
717-
raise ValueError(f"Invalid docstore strategy: {self.docstore_strategy}")
747+
raise ValueError(f"Invalid docstore strategy: {effective_strategy}")
718748
elif self.docstore is not None and self.vector_store is None:
719-
if self.docstore_strategy == DocstoreStrategy.UPSERTS:
720-
logger.info(
721-
"Docstore strategy set to upserts, but no vector store. "
722-
"Switching to duplicates_only strategy."
723-
)
724-
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
725-
elif self.docstore_strategy == DocstoreStrategy.UPSERTS_AND_DELETE:
726-
logger.info(
727-
"Docstore strategy set to upserts and delete, but no vector store. "
728-
"Switching to duplicates_only strategy."
729-
)
730-
self.docstore_strategy = DocstoreStrategy.DUPLICATES_ONLY
731749
nodes_to_run = await self._ahandle_duplicates(
732750
input_nodes, store_doc_text=store_doc_text
733751
)
734-
735752
else:
736753
nodes_to_run = input_nodes
737754

@@ -785,6 +802,10 @@ async def arun(
785802
await self.vector_store.async_add(nodes_with_embeddings)
786803

787804
if self.docstore is not None:
788-
await self._aupdate_docstore(nodes_to_run, store_doc_text=store_doc_text)
805+
await self._aupdate_docstore(
806+
nodes_to_run,
807+
effective_strategy=effective_strategy,
808+
store_doc_text=store_doc_text,
809+
)
789810

790811
return nodes

llama-index-core/tests/ingestion/test_pipeline.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import pytest
66
from llama_index.core.embeddings.mock_embed_model import MockEmbedding
77
from llama_index.core.extractors import KeywordExtractor
8-
from llama_index.core.ingestion.pipeline import IngestionPipeline
8+
from llama_index.core.ingestion.pipeline import IngestionPipeline, DocstoreStrategy
99
from llama_index.core.llms.mock import MockLLM
1010
from llama_index.core.node_parser import SentenceSplitter, MarkdownElementNodeParser
1111
from llama_index.core.readers import ReaderConfig, StringIterableReader
@@ -86,6 +86,7 @@ def test_save_load_pipeline(tmp_path: Path, monkeypatch: pytest.MonkeyPatch) ->
8686
SentenceSplitter(chunk_size=25, chunk_overlap=0),
8787
],
8888
docstore=SimpleDocumentStore(),
89+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
8990
)
9091

9192
nodes = pipeline.run(documents=documents)
@@ -169,6 +170,7 @@ def test_pipeline_update_text_content() -> None:
169170
SentenceSplitter(chunk_size=25, chunk_overlap=0),
170171
],
171172
docstore=SimpleDocumentStore(),
173+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
172174
)
173175

174176
nodes = pipeline.run(documents=[document1])
@@ -200,6 +202,7 @@ def test_pipeline_update_metadata() -> None:
200202
SentenceSplitter(chunk_size=25, chunk_overlap=0),
201203
],
202204
docstore=SimpleDocumentStore(),
205+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
203206
)
204207

205208
nodes = pipeline.run(documents=[document1])
@@ -236,6 +239,7 @@ def test_pipeline_dedup_duplicates_only() -> None:
236239
SentenceSplitter(chunk_size=25, chunk_overlap=0),
237240
],
238241
docstore=SimpleDocumentStore(),
242+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
239243
)
240244

241245
nodes = pipeline.run(documents=documents)
@@ -255,6 +259,7 @@ def test_pipeline_parallel() -> None:
255259
SentenceSplitter(chunk_size=25, chunk_overlap=0),
256260
],
257261
docstore=SimpleDocumentStore(),
262+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
258263
)
259264

260265
num_workers = min(2, cpu_count())
@@ -280,6 +285,7 @@ def __call__(
280285
RaisingTransform(),
281286
],
282287
docstore=SimpleDocumentStore(),
288+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
283289
)
284290

285291
with pytest.raises(RuntimeError):
@@ -340,6 +346,7 @@ async def test_async_pipeline_update_text_content() -> None:
340346
SentenceSplitter(chunk_size=25, chunk_overlap=0),
341347
],
342348
docstore=SimpleDocumentStore(),
349+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
343350
)
344351

345352
nodes = await pipeline.arun(documents=[document1])
@@ -372,6 +379,7 @@ async def test_async_pipeline_update_metadata() -> None:
372379
SentenceSplitter(chunk_size=25, chunk_overlap=0),
373380
],
374381
docstore=SimpleDocumentStore(),
382+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
375383
)
376384

377385
nodes = await pipeline.arun(documents=[document1])
@@ -409,6 +417,7 @@ async def test_async_pipeline_dedup_duplicates_only() -> None:
409417
SentenceSplitter(chunk_size=25, chunk_overlap=0),
410418
],
411419
docstore=SimpleDocumentStore(),
420+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
412421
)
413422

414423
nodes = await pipeline.arun(documents=documents)
@@ -429,6 +438,7 @@ async def test_async_pipeline_parallel() -> None:
429438
SentenceSplitter(chunk_size=25, chunk_overlap=0),
430439
],
431440
docstore=SimpleDocumentStore(),
441+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
432442
)
433443

434444
num_workers = min(2, cpu_count())
@@ -457,9 +467,37 @@ def __call__(
457467
RaisingTransform(),
458468
],
459469
docstore=SimpleDocumentStore(),
470+
docstore_strategy=DocstoreStrategy.DUPLICATES_ONLY,
460471
)
461472

462473
with pytest.raises(RuntimeError):
463474
await pipeline.arun(documents=[document1])
464475

465476
assert pipeline.docstore.get_node("1", raise_error=False) is None
477+
478+
479+
def test_docstore_strategy_not_mutated_on_run_without_vector_store() -> None:
480+
for strategy in (DocstoreStrategy.UPSERTS, DocstoreStrategy.UPSERTS_AND_DELETE):
481+
pipeline = IngestionPipeline(
482+
transformations=[],
483+
docstore=SimpleDocumentStore(),
484+
docstore_strategy=strategy,
485+
)
486+
with pytest.warns(UserWarning, match="requires a vector store"):
487+
pipeline.run(documents=[Document.example()])
488+
489+
assert pipeline.docstore_strategy is strategy
490+
491+
492+
@pytest.mark.asyncio
493+
async def test_docstore_strategy_not_mutated_on_arun_without_vector_store() -> None:
494+
for strategy in (DocstoreStrategy.UPSERTS, DocstoreStrategy.UPSERTS_AND_DELETE):
495+
pipeline = IngestionPipeline(
496+
transformations=[],
497+
docstore=SimpleDocumentStore(),
498+
docstore_strategy=strategy,
499+
)
500+
with pytest.warns(UserWarning, match="requires a vector store"):
501+
await pipeline.arun(documents=[Document.example()])
502+
503+
assert pipeline.docstore_strategy is strategy

0 commit comments

Comments
 (0)