diff --git a/deepsearch/cps/cli/data_indices_typer.py b/deepsearch/cps/cli/data_indices_typer.py index 3c735a21..16c0e54f 100644 --- a/deepsearch/cps/cli/data_indices_typer.py +++ b/deepsearch/cps/cli/data_indices_typer.py @@ -156,14 +156,28 @@ def upload_files( typer.echo(ERROR_MSG) raise typer.Abort() - coords = ElasticProjectDataCollectionSource(proj_key=proj_key, index_key=index_key) - utils.upload_files( - api=api, - coords=coords, - url=urls, - local_file=local_file, - s3_coordinates=cos_coordinates, - ) + # get indices of the project + indices = api.data_indices.list(proj_key) + + # get specific index to add attachment + index = next((x for x in indices if x.source.index_key == index_key), None) + + if index is not None: + try: + index.upload_files( + api=api, + url=urls, + local_file=local_file, + s3_coordinates=cos_coordinates, + ) + except ValueError as e: + typer.echo(f"Error occurred: {e}") + typer.echo(ERROR_MSG) + raise typer.Abort() + return + else: + typer.echo("Index key not found") + raise typer.Abort() @app.command( diff --git a/deepsearch/cps/client/components/data_indices.py b/deepsearch/cps/client/components/data_indices.py index fe5c351b..97a12d0c 100644 --- a/deepsearch/cps/client/components/data_indices.py +++ b/deepsearch/cps/client/components/data_indices.py @@ -4,7 +4,7 @@ import os from dataclasses import dataclass from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union import requests from pydantic import BaseModel @@ -20,6 +20,14 @@ from deepsearch.cps.client import CpsApi +DATA_INDEX_TYPE = Literal[ + "Document", + "DB Record", + "Generic", + "Experiment", +] + + class CpsApiDataIndices: def __init__(self, api: CpsApi) -> None: self.api = api @@ -30,7 +38,12 @@ def list(self, proj_key: str) -> List[DataIndex]: sw_client.ProjectDataIndexWithStatus ] = self.sw_api.get_project_data_indices(proj_key=proj_key) - return [DataIndex.parse_obj(item.to_dict()) for item in response] + # filter out saved searchs index + return [ + DataIndex.parse_obj(item.to_dict()) + for item in response + if item.to_dict()["type"] != "View" + ] def create( self, @@ -102,15 +115,26 @@ def delete( def upload_file( self, coords: ElasticProjectDataCollectionSource, - body: Union[Dict[str, List[str]], Dict[str, Dict[str, S3Coordinates]]], + index_type: DATA_INDEX_TYPE, + body: Union[ + Dict[str, List[str]], Dict[str, str], Dict[str, Dict[str, S3Coordinates]] + ], ) -> str: """ Call api for converting and uploading file to a project's data index. """ - task_id = self.sw_api.ccs_convert_upload_file_project_data_index( - proj_key=coords.proj_key, index_key=coords.index_key, body=body - ).task_id - return task_id + if index_type == "Document": + task_id = self.sw_api.ccs_convert_upload_file_project_data_index( + proj_key=coords.proj_key, index_key=coords.index_key, body=body + ).task_id + return task_id + elif index_type == "Generic" or index_type == "DB Record": + task_id = self.sw_api.upload_project_data_index_file( + proj_key=coords.proj_key, index_key=coords.index_key, params=body + ).task_id + return task_id + else: + raise NotImplementedError class ElasticProjectDataCollectionSource(BaseModel): @@ -130,7 +154,7 @@ class DataIndex(BaseModel): health: str status: str schema_key: str - type: str + type: DATA_INDEX_TYPE def add_item_attachment( self, @@ -193,6 +217,46 @@ def add_item_attachment( params=params, ) + def upload_files( + self, + api: CpsApi, + url: Optional[Union[str, List[str]]] = None, + local_file: Optional[Union[str, Path]] = None, + s3_coordinates: Optional[S3Coordinates] = None, + ) -> None: + """ + Method to upload files to an index. + Input + ----- + api : CpsApi + CpsApi Class + url : string | list[string], OPTIONAL + single url string or list of urls string + local_file : string | Path, OPTIONAL + path to file on local folder + s3_coordinates : S3Coordinates, OPTIONAL + coordinates of COS to use files on the bucket to convert and upload. Params: + """ + + # Avoid circular imports + from deepsearch.cps.data_indices.utils import upload_files + + if ( + self.type == "Generic" + or self.type == "Document" + or self.type == "DB Record" + ): + upload_files( + api=api, + coords=self.source, + index_type=self.type, + url=url, + local_file=local_file, + s3_coordinates=s3_coordinates, + ) + else: + raise NotImplementedError + @dataclass class CpsApiDataIndex(ApiConnectedObject): diff --git a/deepsearch/cps/data_indices/utils.py b/deepsearch/cps/data_indices/utils.py index a8ca44ce..125a2059 100644 --- a/deepsearch/cps/data_indices/utils.py +++ b/deepsearch/cps/data_indices/utils.py @@ -10,11 +10,16 @@ from tqdm import tqdm from deepsearch.cps.client.api import CpsApi -from deepsearch.cps.client.components.data_indices import S3Coordinates +from deepsearch.cps.client.components.data_indices import DATA_INDEX_TYPE, S3Coordinates from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource from deepsearch.documents.core import convert, input_process from deepsearch.documents.core.common_routines import progressbar, success_message -from deepsearch.documents.core.utils import cleanup, create_root_dir +from deepsearch.documents.core.utils import ( + ALLOWED_FILE_EXTENSIONS, + ALLOWED_JSON_TYPE_FILE_EXTENSION, + cleanup, + create_root_dir, +) logger = logging.getLogger(__name__) @@ -22,6 +27,7 @@ def upload_files( api: CpsApi, coords: ElasticProjectDataCollectionSource, + index_type: DATA_INDEX_TYPE, url: Optional[Union[str, List[str]]] = None, local_file: Optional[Union[str, Path]] = None, s3_coordinates: Optional[S3Coordinates] = None, @@ -41,16 +47,22 @@ def upload_files( else: urls = url - return process_url_input(api=api, coords=coords, urls=urls) + if index_type == "Document": + return process_url_input( + api=api, coords=coords, index_type=index_type, urls=urls + ) + else: + raise ValueError("Url is only allowed on index with type Document.") elif url is None and local_file is not None and s3_coordinates is None: return process_local_file( api=api, coords=coords, + index_type=index_type, local_file=Path(local_file), ) elif url is None and local_file is None and s3_coordinates is not None: return process_external_cos( - api=api, coords=coords, s3_coordinates=s3_coordinates + api=api, coords=coords, index_type=index_type, s3_coordinates=s3_coordinates ) raise ValueError( "Please provide only one input: url, local file, or coordinates to COS." @@ -60,6 +72,7 @@ def upload_files( def process_url_input( api: CpsApi, coords: ElasticProjectDataCollectionSource, + index_type: DATA_INDEX_TYPE, urls: List[str], progress_bar: bool = False, ): @@ -67,12 +80,24 @@ def process_url_input( Individual urls are uploaded for conversion and storage in data index. """ - root_dir = create_root_dir() + # filter urls to match allowed extensions + filtered_urls = [] + if index_type == "Document": + filtered_urls = [ + url for url in urls if "." + url.split(".")[-1] in ALLOWED_FILE_EXTENSIONS + ] + else: + raise ValueError("Url is only allowed on index type Document.") + + if len(filtered_urls) == 0: + raise ValueError( + f"Please provide at least one url with allowed file extension '{' , '.join(ALLOWED_FILE_EXTENSIONS)}'." + ) # container list for task_ids task_ids = [] # submit urls - count_urls = len(urls) + count_urls = len(filtered_urls) with tqdm( total=count_urls, desc=f"{'Submitting input:': <{progressbar.padding}}", @@ -80,10 +105,12 @@ def process_url_input( colour=progressbar.colour, bar_format=progressbar.bar_format, ) as progress: - for url in urls: + for url in filtered_urls: file_url_array = [url] payload = {"file_url": file_url_array} - task_id = api.data_indices.upload_file(coords=coords, body=payload) + task_id = api.data_indices.upload_file( + coords=coords, index_type=index_type, body=payload + ) task_ids.append(task_id) progress.update(1) @@ -100,101 +127,158 @@ def process_url_input( def process_local_file( api: CpsApi, coords: ElasticProjectDataCollectionSource, + index_type: DATA_INDEX_TYPE, local_file: Path, progress_bar: bool = False, ): """ Individual files are uploaded for conversion and storage in data index. """ + if index_type == "Generic" or index_type == "DB Record": + if ( + os.path.isfile(local_file) + and Path(local_file).suffix in ALLOWED_JSON_TYPE_FILE_EXTENSION + ): + # container for task_ids + task_ids = [] - # process multiple files from local directory - root_dir = create_root_dir() - # batch individual pdfs into zips and add them to root_dir - batched_files = input_process.batch_single_files( - source_path=local_file, root_dir=root_dir - ) + with tqdm( + total=1, + desc=f"{'Submitting input:': <{progressbar.padding}}", + disable=not (progress_bar), + colour=progressbar.colour, + bar_format=progressbar.bar_format, + ) as progress: + # upload file + private_download_url = convert.upload_single_file( + api=api, cps_proj_key=coords.proj_key, source_path=Path(local_file) + ) + payload_generic = {"file_url": private_download_url} + task_id = api.data_indices.upload_file( + coords=coords, index_type=index_type, body=payload_generic + ) + task_ids.append(task_id) + progress.update(1) - # collect'em all - files_zip: List[Any] = [] - if os.path.isdir(local_file): - files_zip = glob.glob(os.path.join(local_file, "**/*.zip"), recursive=True) - elif os.path.isfile(local_file): - file_extension = Path(local_file).suffix - if file_extension == ".zip": - files_zip = [local_file] - - if root_dir is not None: - files_tmpzip = glob.glob( - os.path.join(root_dir, "tmpzip/**/*.zip"), recursive=True + # check status of running tasks + # TODO: add failure handling + statuses = convert.check_cps_status_running_tasks( + api=api, cps_proj_key=coords.proj_key, task_ids=task_ids + ) + + print(success_message) + return + else: + raise ValueError( + f"""For an index type {index_type}, + please provide a file with allowed extensions: + {' , '.join(ALLOWED_JSON_TYPE_FILE_EXTENSION)}.""" + ) + + elif index_type == "Document": + # process multiple files from local directory + root_dir = create_root_dir() + # batch individual pdfs into zips and add them to root_dir + batched_files = input_process.batch_single_files( + source_path=local_file, root_dir=root_dir ) - files_zip = files_zip + files_tmpzip - count_total_files = len(files_zip) - # container for task_ids - task_ids = [] + # collect'em all + files_zip: List[Any] = [] + if os.path.isdir(local_file): + files_zip = glob.glob(os.path.join(local_file, "**/*.zip"), recursive=True) + elif os.path.isfile(local_file): + file_extension = Path(local_file).suffix + if file_extension == ".zip": + files_zip = [local_file] - # start loop - with tqdm( - total=count_total_files, - desc=f"{'Submitting input:': <{progressbar.padding}}", - disable=not (progress_bar), - colour=progressbar.colour, - bar_format=progressbar.bar_format, - ) as progress: - # loop over all files - for single_zip in files_zip: - # upload file - private_download_url = convert.upload_single_file( - api=api, cps_proj_key=coords.proj_key, source_path=Path(single_zip) + if root_dir is not None: + files_tmpzip = glob.glob( + os.path.join(root_dir, "tmpzip/**/*.zip"), recursive=True ) - file_url_array = [private_download_url] - payload = {"file_url": file_url_array} - task_id = api.data_indices.upload_file(coords=coords, body=payload) - task_ids.append(task_id) - progress.update(1) + files_zip = files_zip + files_tmpzip - # check status of running tasks - # TODO: add failure handling - statuses = convert.check_cps_status_running_tasks( - api=api, cps_proj_key=coords.proj_key, task_ids=task_ids - ) - print(success_message) - cleanup(root_dir=root_dir) - return + count_total_files = len(files_zip) + + # container for task_ids + task_ids = [] + + # start loop + with tqdm( + total=count_total_files, + desc=f"{'Submitting input:': <{progressbar.padding}}", + disable=not (progress_bar), + colour=progressbar.colour, + bar_format=progressbar.bar_format, + ) as progress: + # loop over all files + for single_zip in files_zip: + # upload file + private_download_url = convert.upload_single_file( + api=api, cps_proj_key=coords.proj_key, source_path=Path(single_zip) + ) + file_url_array = [private_download_url] + payload = {"file_url": file_url_array} + task_id = api.data_indices.upload_file( + coords=coords, index_type=index_type, body=payload + ) + task_ids.append(task_id) + progress.update(1) + + # check status of running tasks + # TODO: add failure handling + statuses = convert.check_cps_status_running_tasks( + api=api, cps_proj_key=coords.proj_key, task_ids=task_ids + ) + print(success_message) + cleanup(root_dir=root_dir) + return + else: + raise ValueError( + "Only index with type Document or Generic are supported to upload local files." + ) def process_external_cos( api: CpsApi, coords: ElasticProjectDataCollectionSource, + index_type: DATA_INDEX_TYPE, s3_coordinates: S3Coordinates, progress_bar=False, ): """ Individual files are processed before upload. """ - # container for task_ids - task_ids = [] + if index_type == "Document": + # container for task_ids + task_ids = [] - with tqdm( - total=1, - desc=f"{'Submitting input:': <{progressbar.padding}}", - disable=not (progress_bar), - colour=progressbar.colour, - bar_format=progressbar.bar_format, - ) as progress: - # upload using coordinates - payload = {"s3_source": {"coordinates": s3_coordinates}} - task_id = api.data_indices.upload_file( - coords=coords, - body=payload, + with tqdm( + total=1, + desc=f"{'Submitting input:': <{progressbar.padding}}", + disable=not (progress_bar), + colour=progressbar.colour, + bar_format=progressbar.bar_format, + ) as progress: + # upload using coordinates + payload = {"s3_source": {"coordinates": s3_coordinates}} + task_id = api.data_indices.upload_file( + coords=coords, + index_type=index_type, + body=payload, + ) + task_ids.append(task_id) + progress.update(1) + + # check status of running tasks + # TODO: add failure handling + statuses = convert.check_cps_status_running_tasks( + api=api, cps_proj_key=coords.proj_key, task_ids=task_ids ) - task_ids.append(task_id) - progress.update(1) + print(success_message) + return - # check status of running tasks - # TODO: add failure handling - statuses = convert.check_cps_status_running_tasks( - api=api, cps_proj_key=coords.proj_key, task_ids=task_ids - ) - print(success_message) - return + else: + raise ValueError( + "COS coordinates upload is only allowed on index type Document." + ) diff --git a/deepsearch/documents/core/utils.py b/deepsearch/documents/core/utils.py index 46c1d20e..8697bc02 100644 --- a/deepsearch/documents/core/utils.py +++ b/deepsearch/documents/core/utils.py @@ -15,6 +15,7 @@ from .common_routines import progressbar ALLOWED_FILE_EXTENSIONS = [".pdf", ".jpg", ".jpeg", ".tiff", ".tif", ".png", ".gif"] +ALLOWED_JSON_TYPE_FILE_EXTENSION = [".json", ".jsonl"] class URLNavigator: diff --git a/docs/guide/data_indices.md b/docs/guide/data_indices.md index 12394ecc..ff76f684 100644 --- a/docs/guide/data_indices.md +++ b/docs/guide/data_indices.md @@ -90,7 +90,8 @@ To delete a data index, you need to specify an index via its `INDEX_KEY`. [Listi ## Adding documents to a project -Documents can be converted and added, directly, to a data index in a project. Briefly, documents can be on a local machine or on the remote files. Local documents can be in PDF format, ZIP archives, or directory containing both (`PATH_DOCS`). The web address of a remote document is input directly or multiple web addresses can be stored in a text file (`PATH_URL`). The specification of documents is same as in [Document Conversion](../guide/convert-doc.md). +Documents can be converted and added, directly, to a data index with type `Document` in a project. Briefly, documents can be on a local machine or on the remote files. Local documents can be in PDF format, ZIP archives, or directory containing both (`PATH_DOCS`). The web address of a remote document is input directly or multiple web addresses can be stored in a text file (`PATH_URL`). The COS coordinates of a remote COS with files to upload and convert is stored on local `.json` file. The specification of documents is same as in [Document Conversion](../guide/convert-doc.md). +To add documents to a data index with type `Generic` or `DB Records`, documents have to be on local machine and be format `.json` or `.jsonl`. Add documents to a data index with type `Experiment` is not supported. === "CLI" @@ -102,28 +103,43 @@ Documents can be converted and added, directly, to a data index in a project. Br // for online documents $ deepsearch cps data-indices upload -p PROJ_KEY -x INDEX_KEY -u PATH_URL + + // for COS documents + $ deepsearch cps data-indices upload -p PROJ_KEY -x INDEX_KEY -c PATH_COS_COORDINATES ``` === "Python" ```python - from deepsearch.cps.client.components.elastic import ElasticProjectDataCollectionSource - from deepsearch.cps.data_indices import utils as data_indices_utils - - # Specify index - coords = ElasticProjectDataCollectionSource(proj_key=PROJ_KEY, index_key=INDEX_KEY) - - # For local documents - data_indices_utils.upload_files(api=api, coords=coords, local_file=PATH_DOCS) + from deepsearch.cps.client.components.data_indices import DataIndex + import json - # For online documents + # For local documents, + local_file=PATH_DOCS + # For online documents, # load the urls from the file to a list input_urls = open(PATH_URL).readlines() # or, define a list directly #input_urls = ["https:///URL1", "https://URL2", "https://URL3"] - data_indices_utils.upload_files(api=api, coords=coords, url=input_urls) + # For COS documents + with open(PATH_COS_COORDS): + s3_coordinates = json.load(coords_file) + + # get indices of the project + indices = api.data_indices.list(PROJ_KEY) + + # get specific index to add documents + index = next((x for x in indices if x.source.index_key == index_key), None) + + # add document to index + index.upload_files( + api=api, + url=input_urls, # Optional, required if local file and s3_coordinates are None + local_file=local_file, # Optional, required if url is and s3_coordinates are None + s3_coordinates=s3_coordinates, # Optional, required if url is and local_file are None + ) ``` ---