Skip to content

Commit 7d30d9b

Browse files
szymondudycztryptofanik
authored andcommitted
Adding indexing status to the list of documents in DocumentStore (#8484)
Co-authored-by: Albert Roethel <albert@pathway.com> GitOrigin-RevId: 387de3e5e70f829ba4c1d60ec2e33c369fa9e410
1 parent fc85a7f commit 7d30d9b

File tree

4 files changed

+102
-12
lines changed

4 files changed

+102
-12
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
77

88
### Changed
99
- Input connectors now throttle parsing error messages if their share is more than 10% of the parsing attempts.
10+
- New flag `return_status` for `inputs_query` method in `pw.xpacks.llm.DocumentStore`. If set to True, DocumentStore returns the status of indexing for each file.
1011

1112
## [0.21.0] - 2025-03-19
1213

python/pathway/xpacks/llm/document_store.py

Lines changed: 98 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,8 @@
99
import json
1010
import warnings
1111
from collections.abc import Callable
12-
from typing import TYPE_CHECKING, Iterable, TypeAlias
12+
from enum import Enum
13+
from typing import TYPE_CHECKING, Iterable
1314

1415
import jmespath
1516
import requests
@@ -46,6 +47,11 @@ def _get_jmespath_filter(metadata_filter: str, filepath_globpattern: str) -> str
4647
return None
4748

4849

50+
class IndexingStatus(str, Enum):
51+
INDEXED = "INDEXED"
52+
INGESTED = "INGESTED"
53+
54+
4955
class DocumentStore:
5056
"""
5157
Builds a document indexing pipeline for processing documents and querying closest documents
@@ -213,7 +219,17 @@ class FilterSchema(pw.Schema):
213219
default_value=None, description="An optional Glob pattern for the file path"
214220
)
215221

216-
InputsQuerySchema: TypeAlias = FilterSchema
222+
class InputsQuerySchema(pw.Schema):
223+
metadata_filter: str | None = pw.column_definition(
224+
default_value=None, description="Metadata filter in JMESPath format"
225+
)
226+
filepath_globpattern: str | None = pw.column_definition(
227+
default_value=None, description="An optional Glob pattern for the file path"
228+
)
229+
return_status: bool = pw.column_definition(
230+
default_value=False,
231+
description="Flag whether _indexing_status should be returned for each file",
232+
)
217233

218234
class InputsResultSchema(pw.Schema):
219235
result: list[pw.Json]
@@ -298,9 +314,18 @@ def build_pipeline(self):
298314

299315
docs = pw.Table.concat_reindex(*cleaned_tables)
300316

317+
@pw.udf
318+
def add_file_id(metadata: pw.Json, id) -> dict:
319+
metadata_dict = metadata.as_dict()
320+
id = str(id)
321+
metadata_dict["_file_id"] = id
322+
return metadata_dict
323+
301324
# rename columns to be consistent with the rest of the pipeline
302325
self.input_docs: pw.Table[DocumentStore._RawDocumentSchema] = docs.select(
303-
text=pw.this.data, metadata=pw.this._metadata
326+
text=pw.this.data,
327+
metadata=add_file_id(pw.this._metadata, pw.this.id),
328+
# path=pw.this.metadata["path"].as_str(),
304329
)
305330

306331
# PARSING
@@ -327,6 +352,32 @@ def build_pipeline(self):
327352
metadata_column=self.chunked_docs.metadata,
328353
)
329354

355+
progress_table = self.input_docs.select(
356+
file_id=pw.this.metadata["_file_id"].as_str(),
357+
metadata=pw.this.metadata,
358+
)
359+
chunked_stats = (
360+
self.chunked_docs.with_columns(
361+
file_id=pw.this.metadata["_file_id"].as_str()
362+
)
363+
.groupby(pw.this.file_id)
364+
.reduce(
365+
file_id=pw.this.file_id,
366+
chunks=pw.reducers.count(),
367+
)
368+
)
369+
self.progress_table = (
370+
progress_table.join_left(
371+
chunked_stats,
372+
progress_table.file_id == chunked_stats.file_id,
373+
)
374+
.select(
375+
*pw.left,
376+
chunks=pw.right.chunks,
377+
)
378+
.with_columns(is_parsed=pw.this.chunks.is_not_none())
379+
)
380+
330381
parsed_docs_with_metadata = self.parsed_docs.with_columns(
331382
modified=pw.this.metadata["modified_at"].as_int(),
332383
indexed=pw.this.metadata["seen_at"].as_int(),
@@ -394,34 +445,68 @@ def inputs_query(
394445
# TODO: compare this approach to first joining queries to dicuments, then filtering,
395446
# then grouping to get each response.
396447
# The "dumb" tuple approach has more work precomputed for an all inputs query
397-
all_metas = self.input_docs.reduce(
398-
metadatas=pw.reducers.tuple(pw.this.metadata)
448+
all_metas = self.progress_table.reduce(
449+
metadatas=pw.reducers.tuple(pw.this.metadata),
450+
is_parsed=pw.reducers.tuple(pw.this.is_parsed),
399451
)
400452

401453
input_queries = self.merge_filters(input_queries)
402454

403455
@pw.udf
404456
def format_inputs(
405-
metadatas: list[pw.Json] | None, metadata_filter: str | None
457+
metadatas: list[pw.Json] | None,
458+
metadata_filter: str | None,
459+
return_status: bool,
460+
is_parsed: list[bool],
406461
) -> list[pw.Json]:
407462
metadatas = metadatas if metadatas is not None else []
408463
assert metadatas is not None
464+
465+
def remove_id(m):
466+
metadata_dict = m.as_dict()
467+
del metadata_dict["_file_id"]
468+
return pw.Json(metadata_dict)
469+
470+
metadatas = [remove_id(m) for m in metadatas]
409471
if metadata_filter:
410472
metadatas = [
411473
m
412474
for m in metadatas
413475
if jmespath.search(
414-
metadata_filter, m.value, options=_knn_lsh._glob_options
476+
metadata_filter, m.as_dict(), options=_knn_lsh._glob_options
415477
)
416478
]
417479

480+
if return_status:
481+
metadatas = [
482+
pw.Json(
483+
{
484+
"_indexing_status": (
485+
IndexingStatus.INDEXED
486+
if status
487+
else IndexingStatus.INGESTED
488+
),
489+
**m.as_dict(),
490+
}
491+
)
492+
for (m, status) in zip(metadatas, is_parsed)
493+
]
494+
418495
return metadatas
419496

420497
input_results = input_queries.join_left(all_metas, id=input_queries.id).select(
421-
all_metas.metadatas, input_queries.metadata_filter
498+
all_metas.metadatas,
499+
input_queries.metadata_filter,
500+
input_queries.return_status,
501+
all_metas.is_parsed,
422502
)
423503
input_results = input_results.select(
424-
result=format_inputs(pw.this.metadatas, pw.this.metadata_filter)
504+
result=format_inputs(
505+
pw.this.metadatas,
506+
pw.this.metadata_filter,
507+
pw.this.return_status,
508+
pw.this.is_parsed,
509+
)
425510
)
426511
return input_results
427512

@@ -623,6 +708,7 @@ def get_input_files(
623708
self,
624709
metadata_filter: str | None = None,
625710
filepath_globpattern: str | None = None,
711+
return_status: bool = False,
626712
):
627713
"""
628714
Fetch information on documents in the the vector store.
@@ -633,13 +719,16 @@ def get_input_files(
633719
satisfying this filtering.
634720
filepath_globpattern: optional glob pattern specifying which documents
635721
will be searched for this query.
722+
return_status: flag telling whether `_indexing_status` should be returned
723+
for each document
636724
"""
637725
url = self.url + "/v1/inputs"
638726
response = requests.post(
639727
url,
640728
json={
641729
"metadata_filter": metadata_filter,
642730
"filepath_globpattern": filepath_globpattern,
731+
"return_status": return_status,
643732
},
644733
headers=self._get_request_headers(),
645734
timeout=self.timeout,

python/pathway/xpacks/llm/question_answering.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import json
33
from abc import ABC, abstractmethod
44
from dataclasses import dataclass, field
5-
from typing import TYPE_CHECKING, Callable
5+
from typing import TYPE_CHECKING, Any, Callable
66
from warnings import warn
77

88
import requests
@@ -1118,7 +1118,7 @@ def list_documents(self, filters: str | None = None, keys: list[str] = ["path"])
11181118
Defaults to ``["path"]``. Setting to ``None`` will retrieve all available metadata.
11191119
"""
11201120
api_url = f"{self.url}/v2/list_documents"
1121-
payload = {}
1121+
payload: dict[str, Any] = {"return_status": True}
11221122

11231123
if filters:
11241124
payload["metadata_filter"] = filters

python/pathway/xpacks/llm/tests/test_document_store.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ def _test_vs(fake_embeddings_model):
6666
input_queries = pw.debug.table_from_rows(
6767
schema=DocumentStore.InputsQuerySchema,
6868
rows=[
69-
(None, "**/*.py"),
69+
(None, "**/*.py", False),
7070
],
7171
)
7272

0 commit comments

Comments
 (0)