Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions deepsearch/cps/cli/cli_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,3 +98,7 @@
"-t",
help="""Provide path to file containing task ids generated during document conversion.""",
)

QUERY_STRING = typer.Option("*", "-q", "--query-string", help="Query string")

MAX_ITEMS = typer.Option(10, "-mi", "--max-items", help="Max items to list")
40 changes: 40 additions & 0 deletions deepsearch/cps/cli/data_indices_typer.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
COORDINATES_PATH,
INDEX_ITEM_ID,
INDEX_KEY,
MAX_ITEMS,
PROJ_KEY,
QUERY_STRING,
SOURCE_PATH,
TARGET_SETTINGS,
URL,
Expand Down Expand Up @@ -237,5 +239,43 @@ def add_attachment(
raise typer.Abort()


@app.command(name="list", help="List/search items in an index", no_args_is_help=True)
@cli_handler()
def list_items(
proj_key: str = PROJ_KEY,
index_key: str = INDEX_KEY,
query_string: str = QUERY_STRING,
max_items: int = MAX_ITEMS,
):
"""
List/search items in an index"
"""
api = CpsApi.from_env()

# get indices of the project
indices = api.data_indices.list(proj_key)

# get specific index to add attachment
index = next((x for x in indices if x.source.index_key == index_key), None)

if index is not None:
try:
items = index.list_items(
api=api,
query_string=query_string,
max_items=max_items,
)
for item in items:
typer.echo(item)
except ValueError as e:
typer.echo(f"Error occurred: {e}")
typer.echo(ERROR_MSG)
raise typer.Abort()
return
else:
typer.echo("Index key not found")
raise typer.Abort()


if __name__ == "__main__":
app()
123 changes: 113 additions & 10 deletions deepsearch/cps/client/components/data_indices.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from __future__ import annotations

import ast
import os
from dataclasses import dataclass
from pathlib import Path
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union
from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional, Union
from urllib.parse import urlparse

import requests
Expand All @@ -17,6 +16,9 @@
from deepsearch.cps.apis.public.models.task import Task
from deepsearch.cps.apis.public.models.token_response import TokenResponse
from deepsearch.cps.client.components.api_object import ApiConnectedObject
from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource
from deepsearch.cps.client.queries.query import Query
from deepsearch.cps.queries import DataQuery

if TYPE_CHECKING:
from deepsearch.cps.client import CpsApi
Expand Down Expand Up @@ -179,14 +181,6 @@ def upload(
return task


class ElasticProjectDataCollectionSource(BaseModel):
proj_key: str
index_key: str

def to_resource(self) -> Dict[str, Any]:
return {"type": "elastic", "proj_key": self.proj_key, "index": self.index_key}


class DataIndex(BaseModel):

source: ElasticProjectDataCollectionSource
Expand Down Expand Up @@ -259,6 +253,109 @@ def add_item_attachment(
params=params,
)

def list_items(
self,
api: CpsApi,
query_string: str = "*",
page_size: int = 10,
max_items: int = 100,
) -> Generator[dict]:
"""
Method to list/search documents in an index.

Input
-----
api : CpsApi
CpsApi Class
query_string: str
string to search documents, defaults to all ("*")
page_size : int
page size in query pagination, defaults to 10
max_items : int
maximum items to list, defaults to 100
"""

if max_items < page_size:
page_size = max_items

query = DataQuery(
search_query=query_string,
limit=page_size,
coordinates=ElasticProjectDataCollectionSource(
proj_key=self.source.proj_key, index_key=self.source.index_key
),
)

# Run task.
cursor = api.queries.run_paginated_query(query)
pages_loaded = 0
for result in cursor:
for row in result.outputs["data_outputs"]:
yield {
"name": row["_source"]["_name"],
"id": row["_source"]["file-info"]["document-hash"],
}

pages_loaded += 1

if pages_loaded * page_size >= max_items:
break

def get_item_urls(
self,
api: CpsApi,
index_item_id: str,
) -> DataIndexItemUrls:
"""
Method to get document urls.

Input
-----
api : CpsApi
CpsApi Class
index_item_id : string
id of document in index
"""

query_tasks = Query()

lookup = query_tasks.add(
"ElasticQuery",
task_id="elastic-search",
parameters={
"elastic_query": {
"bool": {"filter": {"terms": {"_id": [index_item_id]}}}
},
"limit": 1,
},
coordinates=ElasticProjectDataCollectionSource(
proj_key=self.source.proj_key, index_key=self.source.index_key
),
)
lookup.output("items").output_as("result")

# Run task.
response = api.queries.run(query_tasks)

s3_data: dict = (
response.outputs.get("result", [{}])[0]
.get("_source", {})
.get("_s3_data", {})
)

def get_url(document: str) -> str:
doc_info: Union[dict, list] = s3_data.get(document, {})
if isinstance(doc_info, list):
return doc_info[0].get("url", "")
else:
return doc_info.get("url", "")

return DataIndexItemUrls(
pdf_url=get_url("pdf-document"),
md_url=get_url("markdown-document"),
json_url=get_url("json-document"),
)


@dataclass
class CpsApiDataIndex(ApiConnectedObject):
Expand All @@ -275,3 +372,9 @@ class S3Coordinates(BaseModel):
bucket: str
location: str
key_prefix: str = ""


class DataIndexItemUrls(BaseModel):
pdf_url: str
md_url: str
json_url: str
4 changes: 2 additions & 2 deletions deepsearch/cps/client/components/documents.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
SemanticIngestSourcePublicDataDocument,
)
from deepsearch.cps.apis.public_v2.models.source4 import Source4
from deepsearch.cps.client.components.data_indices import (
from deepsearch.cps.client.components.elastic import (
ElasticDataCollectionSource,
ElasticProjectDataCollectionSource,
)
from deepsearch.cps.client.components.elastic import ElasticDataCollectionSource
from deepsearch.cps.client.components.projects import Project

if TYPE_CHECKING:
Expand Down
11 changes: 8 additions & 3 deletions deepsearch/cps/client/components/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from pydantic import BaseModel

from deepsearch.cps.apis import public as sw_client
from deepsearch.cps.client.components.data_indices import (
ElasticProjectDataCollectionSource,
)

if TYPE_CHECKING:
from deepsearch.cps.client import CpsApi
Expand Down Expand Up @@ -58,6 +55,14 @@ class ElasticDataCollectionMetadata(BaseModel):
version: str


class ElasticProjectDataCollectionSource(BaseModel):
proj_key: str
index_key: str

def to_resource(self) -> Dict[str, Any]:
return {"type": "elastic", "proj_key": self.proj_key, "index": self.index_key}


class ElasticDataCollection(BaseModel):

source: Union[ElasticDataCollectionSource, ElasticProjectDataCollectionSource]
Expand Down
33 changes: 33 additions & 0 deletions docs/guide/data_indices.md
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,36 @@ Attachments can be added to an index item in a project. Briefly, attachments hav
attachment_key=attachment_key, # optional
)
```

---

## List documents in an index

Listing documents in an index can be done by calling method 'list_items' in 'DataIndex' class. It also accepts a query string to list specific document(s).

=== "CLI"
<div class="termy">

```console
$ deepsearch cps data-indices list -p PROJ_KEY -x INDEX_KEY -q QUERY_STRING
```

</div>
=== "Python"
```python
from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource

# get indices of the project
indices = api.data_indices.list(PROJ_KEY)

# get specific index to list document
index = next((x for x in indices if x.source.index_key == index_key), None)

# if the index exists, list items
if index is not None:
items = index.list_items(api)
for item in items:
print(item)
```

---