From 51afca777480f964eb849cc0c03a681bba95a3ab Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 23 Sep 2025 13:41:15 -0700 Subject: [PATCH 1/5] . --- .../basic_search/dr_basic_search_3_reduce.py | 4 +- backend/onyx/agents/agent_search/dr/utils.py | 4 +- backend/onyx/context/search/models.py | 92 +++++++++++++++++++ backend/onyx/context/search/utils.py | 34 ------- backend/onyx/db/chat.py | 3 +- .../server/query_and_chat/query_backend.py | 3 +- 6 files changed, 98 insertions(+), 42 deletions(-) diff --git a/backend/onyx/agents/agent_search/dr/sub_agents/basic_search/dr_basic_search_3_reduce.py b/backend/onyx/agents/agent_search/dr/sub_agents/basic_search/dr_basic_search_3_reduce.py index 7b7e04b4172..b87a7120e8c 100644 --- a/backend/onyx/agents/agent_search/dr/sub_agents/basic_search/dr_basic_search_3_reduce.py +++ b/backend/onyx/agents/agent_search/dr/sub_agents/basic_search/dr_basic_search_3_reduce.py @@ -5,12 +5,12 @@ from onyx.agents.agent_search.dr.sub_agents.states import SubAgentMainState from onyx.agents.agent_search.dr.sub_agents.states import SubAgentUpdate -from onyx.agents.agent_search.dr.utils import chunks_or_sections_to_search_docs from onyx.agents.agent_search.shared_graph_utils.utils import ( get_langgraph_node_log_string, ) from onyx.agents.agent_search.shared_graph_utils.utils import write_custom_event from onyx.context.search.models import SavedSearchDoc +from onyx.context.search.models import SearchDoc from onyx.server.query_and_chat.streaming_models import SectionEnd from onyx.utils.logger import setup_logger @@ -47,7 +47,7 @@ def is_reducer( doc_list.append(x) # Convert InferenceSections to SavedSearchDocs - search_docs = chunks_or_sections_to_search_docs(doc_list) + search_docs = SearchDoc.chunks_or_sections_to_search_docs(doc_list) retrieved_saved_search_docs = [ SavedSearchDoc.from_search_doc(search_doc, db_doc_id=0) for search_doc in search_docs diff --git a/backend/onyx/agents/agent_search/dr/utils.py b/backend/onyx/agents/agent_search/dr/utils.py index b0e86d9b52d..817b6e19dde 100644 --- a/backend/onyx/agents/agent_search/dr/utils.py +++ b/backend/onyx/agents/agent_search/dr/utils.py @@ -13,7 +13,7 @@ ) from onyx.context.search.models import InferenceSection from onyx.context.search.models import SavedSearchDoc -from onyx.context.search.utils import chunks_or_sections_to_search_docs +from onyx.context.search.models import SearchDoc from onyx.tools.tool_implementations.web_search.web_search_tool import ( WebSearchTool, ) @@ -266,7 +266,7 @@ def convert_inference_sections_to_search_docs( is_internet: bool = False, ) -> list[SavedSearchDoc]: # Convert InferenceSections to SavedSearchDocs - search_docs = chunks_or_sections_to_search_docs(inference_sections) + search_docs = SearchDoc.chunks_or_sections_to_search_docs(inference_sections) for search_doc in search_docs: search_doc.is_internet = is_internet diff --git a/backend/onyx/context/search/models.py b/backend/onyx/context/search/models.py index 14e7c5bcb40..1c31105df50 100644 --- a/backend/onyx/context/search/models.py +++ b/backend/onyx/context/search/models.py @@ -1,3 +1,4 @@ +from collections.abc import Sequence from datetime import datetime from typing import Any @@ -355,6 +356,97 @@ class SearchDoc(BaseModel): secondary_owners: list[str] | None = None is_internet: bool = False + @classmethod + def chunks_or_sections_to_search_docs( + cls, + items: "Sequence[InferenceChunk | InferenceSection] | None", + ) -> list["SearchDoc"]: + """Convert a sequence of InferenceChunk or InferenceSection objects to SearchDoc objects.""" + if not items: + return [] + + search_docs = [ + cls( + document_id=( + chunk := ( + item.center_chunk + if isinstance(item, InferenceSection) + else item + ) + ).document_id, + chunk_ind=chunk.chunk_id, + semantic_identifier=chunk.semantic_identifier or "Unknown", + link=chunk.source_links[0] if chunk.source_links else None, + blurb=chunk.blurb, + source_type=chunk.source_type, + boost=chunk.boost, + hidden=chunk.hidden, + metadata=chunk.metadata, + score=chunk.score, + match_highlights=chunk.match_highlights, + updated_at=chunk.updated_at, + primary_owners=chunk.primary_owners, + secondary_owners=chunk.secondary_owners, + is_internet=False, + ) + for item in items + ] + + return search_docs + + @classmethod + def from_inference_section( + cls, inference_section: "InferenceSection" + ) -> "SearchDoc": + """Convert an InferenceSection to a SearchDoc using the center chunk's data.""" + chunk = inference_section.center_chunk + return cls( + document_id=chunk.document_id, + chunk_ind=chunk.chunk_id, + semantic_identifier=chunk.semantic_identifier or "Unknown", + link=chunk.source_links[0] if chunk.source_links else None, + blurb=chunk.blurb, + source_type=chunk.source_type, + boost=chunk.boost, + hidden=chunk.hidden, + metadata=chunk.metadata, + score=chunk.score, + is_relevant=chunk.is_relevant, + relevance_explanation=chunk.relevance_explanation, + match_highlights=chunk.match_highlights, + updated_at=chunk.updated_at, + primary_owners=chunk.primary_owners, + secondary_owners=chunk.secondary_owners, + is_internet=False, + ) + + @classmethod + def from_inference_chunk(cls, inference_chunk: "InferenceChunk") -> "SearchDoc": + """Convert an InferenceChunk to a SearchDoc.""" + return cls( + document_id=inference_chunk.document_id, + chunk_ind=inference_chunk.chunk_id, + semantic_identifier=inference_chunk.semantic_identifier or "Unknown", + link=( + inference_chunk.source_links[0] + if inference_chunk.source_links + else None + ), + blurb=inference_chunk.blurb, + source_type=inference_chunk.source_type, + boost=inference_chunk.boost, + hidden=inference_chunk.hidden, + metadata=inference_chunk.metadata, + score=inference_chunk.score, + is_relevant=inference_chunk.is_relevant, + relevance_explanation=inference_chunk.relevance_explanation, + match_highlights=inference_chunk.match_highlights, + updated_at=inference_chunk.updated_at, + primary_owners=inference_chunk.primary_owners, + secondary_owners=inference_chunk.secondary_owners, + is_internet=False, + ) + def model_dump(self, *args: list, **kwargs: dict[str, Any]) -> dict[str, Any]: # type: ignore initial_dict = super().model_dump(*args, **kwargs) # type: ignore initial_dict["updated_at"] = ( diff --git a/backend/onyx/context/search/utils.py b/backend/onyx/context/search/utils.py index 607eb877fff..8edd75f7e51 100644 --- a/backend/onyx/context/search/utils.py +++ b/backend/onyx/context/search/utils.py @@ -118,40 +118,6 @@ def inference_section_from_chunks( ) -def chunks_or_sections_to_search_docs( - items: Sequence[InferenceChunk | InferenceSection] | None, -) -> list[SearchDoc]: - if not items: - return [] - - search_docs = [ - SearchDoc( - document_id=( - chunk := ( - item.center_chunk if isinstance(item, InferenceSection) else item - ) - ).document_id, - chunk_ind=chunk.chunk_id, - semantic_identifier=chunk.semantic_identifier or "Unknown", - link=chunk.source_links[0] if chunk.source_links else None, - blurb=chunk.blurb, - source_type=chunk.source_type, - boost=chunk.boost, - hidden=chunk.hidden, - metadata=chunk.metadata, - score=chunk.score, - match_highlights=chunk.match_highlights, - updated_at=chunk.updated_at, - primary_owners=chunk.primary_owners, - secondary_owners=chunk.secondary_owners, - is_internet=False, - ) - for item in items - ] - - return search_docs - - def remove_stop_words_and_punctuation(keywords: list[str]) -> list[str]: try: # Re-tokenize using the NLTK tokenizer for better matching diff --git a/backend/onyx/db/chat.py b/backend/onyx/db/chat.py index 440d23c28b4..d6c0eadebe5 100644 --- a/backend/onyx/db/chat.py +++ b/backend/onyx/db/chat.py @@ -34,7 +34,6 @@ from onyx.context.search.models import RetrievalDocs from onyx.context.search.models import SavedSearchDoc from onyx.context.search.models import SearchDoc as ServerSearchDoc -from onyx.context.search.utils import chunks_or_sections_to_search_docs from onyx.db.models import AgentSearchMetrics from onyx.db.models import AgentSubQuery from onyx.db.models import AgentSubQuestion @@ -1147,7 +1146,7 @@ def log_agent_sub_question_results( db_session.add(sub_query_object) db_session.commit() - search_docs = chunks_or_sections_to_search_docs( + search_docs = ServerSearchDoc.chunks_or_sections_to_search_docs( sub_query.retrieved_documents ) for doc in search_docs: diff --git a/backend/onyx/server/query_and_chat/query_backend.py b/backend/onyx/server/query_and_chat/query_backend.py index dc54056fd01..9cbe9584754 100644 --- a/backend/onyx/server/query_and_chat/query_backend.py +++ b/backend/onyx/server/query_and_chat/query_backend.py @@ -14,7 +14,6 @@ from onyx.context.search.preprocessing.access_filters import ( build_access_filters_for_user, ) -from onyx.context.search.utils import chunks_or_sections_to_search_docs from onyx.db.chat import get_chat_messages_by_session from onyx.db.chat import get_chat_session_by_id from onyx.db.chat import get_chat_sessions_by_user @@ -74,7 +73,7 @@ def admin_search( ) matching_chunks = document_index.admin_retrieval(query=query, filters=final_filters) - documents = chunks_or_sections_to_search_docs(matching_chunks) + documents = SearchDoc.chunks_or_sections_to_search_docs(matching_chunks) # Deduplicate documents by id deduplicated_documents: list[SearchDoc] = [] From deb6315ecc6b4d38df40cd00bff0bd9d09f3379e Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 23 Sep 2025 13:46:20 -0700 Subject: [PATCH 2/5] . --- .../onyx/background/celery/apps/app_base.py | 68 +++++++++--------- .../celery/tasks/docfetching/tasks.py | 20 +++--- backend/onyx/db/document.py | 68 +++++++++--------- backend/onyx/redis/redis_connector_delete.py | 70 +++++++++---------- 4 files changed, 113 insertions(+), 113 deletions(-) diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index f766a6d8e1a..ebd897d4e66 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -24,15 +24,15 @@ from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter from onyx.background.celery.celery_utils import celery_is_worker_primary from onyx.background.celery.celery_utils import make_probe_path -from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFIX -from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY +# from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFIX +# from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine.sql_engine import get_sqlalchemy_engine from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout from onyx.httpx.httpx_pool import HttpxPool -from onyx.redis.redis_connector import RedisConnector -from onyx.redis.redis_connector_delete import RedisConnectorDelete +# from onyx.redis.redis_connector import RedisConnector +# from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync from onyx.redis.redis_connector_prune import RedisConnectorPrune @@ -157,9 +157,9 @@ def on_task_postrun( # NOTE: we want to remove the `Redis*` classes, prefer to just have functions to # do these things going forward. In short, things should generally be like the doc # sync task rather than the others below - if task_id.startswith(DOCUMENT_SYNC_PREFIX): - r.srem(DOCUMENT_SYNC_TASKSET_KEY, task_id) - return + # if task_id.startswith(DOCUMENT_SYNC_PREFIX): + # r.srem(DOCUMENT_SYNC_TASKSET_KEY, task_id) + # return if task_id.startswith(RedisDocumentSet.PREFIX): document_set_id = RedisDocumentSet.get_id_from_task_id(task_id) @@ -175,33 +175,33 @@ def on_task_postrun( r.srem(rug.taskset_key, task_id) return - if task_id.startswith(RedisConnectorDelete.PREFIX): - cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - if cc_pair_id is not None: - RedisConnectorDelete.remove_from_taskset(int(cc_pair_id), task_id, r) - return - - if task_id.startswith(RedisConnectorPrune.SUBTASK_PREFIX): - cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - if cc_pair_id is not None: - RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r) - return - - if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX): - cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - if cc_pair_id is not None: - RedisConnectorPermissionSync.remove_from_taskset( - int(cc_pair_id), task_id, r - ) - return - - if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX): - cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - if cc_pair_id is not None: - RedisConnectorExternalGroupSync.remove_from_taskset( - int(cc_pair_id), task_id, r - ) - return + # if task_id.startswith(RedisConnectorDelete.PREFIX): + # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + # if cc_pair_id is not None: + # RedisConnectorDelete.remove_from_taskset(int(cc_pair_id), task_id, r) + # return + + # if task_id.startswith(RedisConnectorPrune.SUBTASK_PREFIX): + # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + # if cc_pair_id is not None: + # RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r) + # return + + # if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX): + # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + # if cc_pair_id is not None: + # RedisConnectorPermissionSync.remove_from_taskset( + # int(cc_pair_id), task_id, r + # ) + # return + + # if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX): + # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + # if cc_pair_id is not None: + # RedisConnectorExternalGroupSync.remove_from_taskset( + # int(cc_pair_id), task_id, r + # ) + # return def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: diff --git a/backend/onyx/background/celery/tasks/docfetching/tasks.py b/backend/onyx/background/celery/tasks/docfetching/tasks.py index 675a2642c88..9323a6ee1b7 100644 --- a/backend/onyx/background/celery/tasks/docfetching/tasks.py +++ b/backend/onyx/background/celery/tasks/docfetching/tasks.py @@ -21,7 +21,7 @@ from onyx.background.indexing.job_client import SimpleJob from onyx.background.indexing.job_client import SimpleJobClient from onyx.background.indexing.job_client import SimpleJobException -from onyx.background.indexing.run_docfetching import run_docfetching_entrypoint +# from onyx.background.indexing.run_docfetching import run_docfetching_entrypoint from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT from onyx.configs.constants import OnyxCeleryTask from onyx.connectors.exceptions import ConnectorValidationError @@ -32,7 +32,7 @@ from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed from onyx.db.indexing_coordination import IndexingCoordination -from onyx.redis.redis_connector import RedisConnector +# from onyx.redis.redis_connector import RedisConnector from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import global_version from shared_configs.configs import SENTRY_DSN @@ -211,14 +211,14 @@ def _docfetching_task( ) # This is where the heavy/real work happens - run_docfetching_entrypoint( - app, - index_attempt_id, - tenant_id, - cc_pair_id, - is_ee, - callback=callback, - ) + # run_docfetching_entrypoint( + # app, + # index_attempt_id, + # tenant_id, + # cc_pair_id, + # is_ee, + # callback=callback, + # ) except ConnectorValidationError: raise SimpleJobException( diff --git a/backend/onyx/db/document.py b/backend/onyx/db/document.py index 62b98859c05..f394d90e5cf 100644 --- a/backend/onyx/db/document.py +++ b/backend/onyx/db/document.py @@ -22,7 +22,7 @@ from sqlalchemy.orm import Session from sqlalchemy.sql.expression import null -from onyx.agents.agent_search.kb_search.models import KGEntityDocInfo +# from onyx.agents.agent_search.kb_search.models import KGEntityDocInfo from onyx.configs.constants import DEFAULT_BOOST from onyx.configs.constants import DocumentSource from onyx.configs.kg_configs import KG_SIMPLE_ANSWER_MAX_DISPLAYED_SOURCES @@ -33,7 +33,7 @@ from onyx.db.entities import delete_from_kg_entities_extraction_staging__no_commit from onyx.db.enums import AccessType from onyx.db.enums import ConnectorCredentialPairStatus -from onyx.db.feedback import delete_document_feedback_for_documents__no_commit +# from onyx.db.feedback import delete_document_feedback_for_documents__no_commit from onyx.db.models import Connector from onyx.db.models import ConnectorCredentialPair from onyx.db.models import Credential @@ -747,9 +747,9 @@ def delete_documents_complete__no_commit( ) delete_documents_by_connector_credential_pair__no_commit(db_session, document_ids) - delete_document_feedback_for_documents__no_commit( - document_ids=document_ids, db_session=db_session - ) + # delete_document_feedback_for_documents__no_commit( + # document_ids=document_ids, db_session=db_session + # ) delete_document_tags_for_documents__no_commit( document_ids=document_ids, db_session=db_session ) @@ -1199,35 +1199,35 @@ def get_skipped_kg_documents(db_session: Session) -> list[str]: return list(db_session.scalars(stmt).all()) -def get_kg_doc_info_for_entity_name( - db_session: Session, document_id: str, entity_type: str -) -> KGEntityDocInfo: - """ - Get the semantic ID and the link for an entity name. - """ - - result = ( - db_session.query(Document.semantic_id, Document.link) - .filter(Document.id == document_id) - .first() - ) - - if result is None: - return KGEntityDocInfo( - doc_id=None, - doc_semantic_id=None, - doc_link=None, - semantic_entity_name=f"{entity_type}:{document_id}", - semantic_linked_entity_name=f"{entity_type}:{document_id}", - ) - - return KGEntityDocInfo( - doc_id=document_id, - doc_semantic_id=result[0], - doc_link=result[1], - semantic_entity_name=f"{entity_type.upper()}:{result[0]}", - semantic_linked_entity_name=f"[{entity_type.upper()}:{result[0]}]({result[1]})", - ) +# def get_kg_doc_info_for_entity_name( +# db_session: Session, document_id: str, entity_type: str +# ) -> KGEntityDocInfo: +# """ +# Get the semantic ID and the link for an entity name. +# """ + +# result = ( +# db_session.query(Document.semantic_id, Document.link) +# .filter(Document.id == document_id) +# .first() +# ) + +# if result is None: +# return KGEntityDocInfo( +# doc_id=None, +# doc_semantic_id=None, +# doc_link=None, +# semantic_entity_name=f"{entity_type}:{document_id}", +# semantic_linked_entity_name=f"{entity_type}:{document_id}", +# ) + +# return KGEntityDocInfo( +# doc_id=document_id, +# doc_semantic_id=result[0], +# doc_link=result[1], +# semantic_entity_name=f"{entity_type.upper()}:{result[0]}", +# semantic_linked_entity_name=f"[{entity_type.upper()}:{result[0]}]({result[1]})", +# ) def check_for_documents_needing_kg_processing( diff --git a/backend/onyx/redis/redis_connector_delete.py b/backend/onyx/redis/redis_connector_delete.py index 3ad08f24106..999c2127464 100644 --- a/backend/onyx/redis/redis_connector_delete.py +++ b/backend/onyx/redis/redis_connector_delete.py @@ -16,7 +16,7 @@ from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id -from onyx.db.document import construct_document_id_select_for_connector_credential_pair +# from onyx.db.document import construct_document_id_select_for_connector_credential_pair class RedisConnectorDeletePayload(BaseModel): @@ -118,40 +118,40 @@ def generate_tasks( num_tasks_sent = 0 - stmt = construct_document_id_select_for_connector_credential_pair( - cc_pair.connector_id, cc_pair.credential_id - ) - for doc_id in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT): - doc_id = cast(str, doc_id) - current_time = time.monotonic() - if current_time - last_lock_time >= ( - CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 - ): - lock.reacquire() - last_lock_time = current_time - - custom_task_id = self._generate_task_id() - - # add to the tracking taskset in redis BEFORE creating the celery task. - # note that for the moment we are using a single taskset key, not differentiated by cc_pair id - self.redis.sadd(self.taskset_key, custom_task_id) - - # Priority on sync's triggered by new indexing should be medium - celery_app.send_task( - OnyxCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, - kwargs=dict( - document_id=doc_id, - connector_id=cc_pair.connector_id, - credential_id=cc_pair.credential_id, - tenant_id=self.tenant_id, - ), - queue=OnyxCeleryQueues.CONNECTOR_DELETION, - task_id=custom_task_id, - priority=OnyxCeleryPriority.MEDIUM, - ignore_result=True, - ) - - num_tasks_sent += 1 + # stmt = construct_document_id_select_for_connector_credential_pair( + # cc_pair.connector_id, cc_pair.credential_id + # ) + # for doc_id in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT): + # doc_id = cast(str, doc_id) + # current_time = time.monotonic() + # if current_time - last_lock_time >= ( + # CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 + # ): + # lock.reacquire() + # last_lock_time = current_time + + # custom_task_id = self._generate_task_id() + + # # add to the tracking taskset in redis BEFORE creating the celery task. + # # note that for the moment we are using a single taskset key, not differentiated by cc_pair id + # self.redis.sadd(self.taskset_key, custom_task_id) + + # # Priority on sync's triggered by new indexing should be medium + # celery_app.send_task( + # OnyxCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, + # kwargs=dict( + # document_id=doc_id, + # connector_id=cc_pair.connector_id, + # credential_id=cc_pair.credential_id, + # tenant_id=self.tenant_id, + # ), + # queue=OnyxCeleryQueues.CONNECTOR_DELETION, + # task_id=custom_task_id, + # priority=OnyxCeleryPriority.MEDIUM, + # ignore_result=True, + # ) + + # num_tasks_sent += 1 return num_tasks_sent From 78c255f702da0495c5cba4e315830f38376cb878 Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 23 Sep 2025 15:07:34 -0700 Subject: [PATCH 3/5] . --- .../agent_search/orchestration/states.py | 2 +- .../onyx/background/celery/apps/app_base.py | 68 +++++++++---------- .../celery/tasks/docfetching/tasks.py | 23 ++++--- .../celery/tasks/docprocessing/tasks.py | 3 +- .../background/indexing/run_docfetching.py | 6 +- .../prompt_builder/answer_prompt_builder.py | 5 -- backend/onyx/chat/prompt_builder/schemas.py | 10 +++ .../tool_handling/tool_response_handler.py | 2 +- backend/onyx/db/chat.py | 2 +- backend/onyx/db/document.py | 68 +++++++++---------- .../onyx/file_processing/extract_file_text.py | 26 +++++-- backend/onyx/llm/utils.py | 36 ++++++---- .../search_nlp_models.py | 8 ++- 13 files changed, 146 insertions(+), 113 deletions(-) create mode 100644 backend/onyx/chat/prompt_builder/schemas.py diff --git a/backend/onyx/agents/agent_search/orchestration/states.py b/backend/onyx/agents/agent_search/orchestration/states.py index 80cc9f8dbfe..725b399b219 100644 --- a/backend/onyx/agents/agent_search/orchestration/states.py +++ b/backend/onyx/agents/agent_search/orchestration/states.py @@ -1,6 +1,6 @@ from pydantic import BaseModel -from onyx.chat.prompt_builder.answer_prompt_builder import PromptSnapshot +from onyx.chat.prompt_builder.schemas import PromptSnapshot from onyx.tools.message import ToolCallSummary from onyx.tools.models import SearchToolOverrideKwargs from onyx.tools.models import ToolCallFinalResult diff --git a/backend/onyx/background/celery/apps/app_base.py b/backend/onyx/background/celery/apps/app_base.py index ebd897d4e66..f766a6d8e1a 100644 --- a/backend/onyx/background/celery/apps/app_base.py +++ b/backend/onyx/background/celery/apps/app_base.py @@ -24,15 +24,15 @@ from onyx.background.celery.apps.task_formatters import CeleryTaskPlainFormatter from onyx.background.celery.celery_utils import celery_is_worker_primary from onyx.background.celery.celery_utils import make_probe_path -# from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFIX -# from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY +from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_PREFIX +from onyx.background.celery.tasks.vespa.document_sync import DOCUMENT_SYNC_TASKSET_KEY from onyx.configs.constants import ONYX_CLOUD_CELERY_TASK_PREFIX from onyx.configs.constants import OnyxRedisLocks from onyx.db.engine.sql_engine import get_sqlalchemy_engine from onyx.document_index.vespa.shared_utils.utils import wait_for_vespa_with_timeout from onyx.httpx.httpx_pool import HttpxPool -# from onyx.redis.redis_connector import RedisConnector -# from onyx.redis.redis_connector_delete import RedisConnectorDelete +from onyx.redis.redis_connector import RedisConnector +from onyx.redis.redis_connector_delete import RedisConnectorDelete from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_ext_group_sync import RedisConnectorExternalGroupSync from onyx.redis.redis_connector_prune import RedisConnectorPrune @@ -157,9 +157,9 @@ def on_task_postrun( # NOTE: we want to remove the `Redis*` classes, prefer to just have functions to # do these things going forward. In short, things should generally be like the doc # sync task rather than the others below - # if task_id.startswith(DOCUMENT_SYNC_PREFIX): - # r.srem(DOCUMENT_SYNC_TASKSET_KEY, task_id) - # return + if task_id.startswith(DOCUMENT_SYNC_PREFIX): + r.srem(DOCUMENT_SYNC_TASKSET_KEY, task_id) + return if task_id.startswith(RedisDocumentSet.PREFIX): document_set_id = RedisDocumentSet.get_id_from_task_id(task_id) @@ -175,33 +175,33 @@ def on_task_postrun( r.srem(rug.taskset_key, task_id) return - # if task_id.startswith(RedisConnectorDelete.PREFIX): - # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - # if cc_pair_id is not None: - # RedisConnectorDelete.remove_from_taskset(int(cc_pair_id), task_id, r) - # return - - # if task_id.startswith(RedisConnectorPrune.SUBTASK_PREFIX): - # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - # if cc_pair_id is not None: - # RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r) - # return - - # if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX): - # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - # if cc_pair_id is not None: - # RedisConnectorPermissionSync.remove_from_taskset( - # int(cc_pair_id), task_id, r - # ) - # return - - # if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX): - # cc_pair_id = RedisConnector.get_id_from_task_id(task_id) - # if cc_pair_id is not None: - # RedisConnectorExternalGroupSync.remove_from_taskset( - # int(cc_pair_id), task_id, r - # ) - # return + if task_id.startswith(RedisConnectorDelete.PREFIX): + cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + if cc_pair_id is not None: + RedisConnectorDelete.remove_from_taskset(int(cc_pair_id), task_id, r) + return + + if task_id.startswith(RedisConnectorPrune.SUBTASK_PREFIX): + cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + if cc_pair_id is not None: + RedisConnectorPrune.remove_from_taskset(int(cc_pair_id), task_id, r) + return + + if task_id.startswith(RedisConnectorPermissionSync.SUBTASK_PREFIX): + cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + if cc_pair_id is not None: + RedisConnectorPermissionSync.remove_from_taskset( + int(cc_pair_id), task_id, r + ) + return + + if task_id.startswith(RedisConnectorExternalGroupSync.SUBTASK_PREFIX): + cc_pair_id = RedisConnector.get_id_from_task_id(task_id) + if cc_pair_id is not None: + RedisConnectorExternalGroupSync.remove_from_taskset( + int(cc_pair_id), task_id, r + ) + return def on_celeryd_init(sender: str, conf: Any = None, **kwargs: Any) -> None: diff --git a/backend/onyx/background/celery/tasks/docfetching/tasks.py b/backend/onyx/background/celery/tasks/docfetching/tasks.py index 9323a6ee1b7..200d3a0dd2b 100644 --- a/backend/onyx/background/celery/tasks/docfetching/tasks.py +++ b/backend/onyx/background/celery/tasks/docfetching/tasks.py @@ -3,6 +3,7 @@ import time import traceback from time import sleep +from typing import Any import sentry_sdk from celery import Celery @@ -21,7 +22,7 @@ from onyx.background.indexing.job_client import SimpleJob from onyx.background.indexing.job_client import SimpleJobClient from onyx.background.indexing.job_client import SimpleJobException -# from onyx.background.indexing.run_docfetching import run_docfetching_entrypoint +from onyx.background.indexing.run_docfetching import run_docfetching_entrypoint from onyx.configs.constants import CELERY_INDEXING_WATCHDOG_CONNECTOR_TIMEOUT from onyx.configs.constants import OnyxCeleryTask from onyx.connectors.exceptions import ConnectorValidationError @@ -32,7 +33,7 @@ from onyx.db.index_attempt import mark_attempt_canceled from onyx.db.index_attempt import mark_attempt_failed from onyx.db.indexing_coordination import IndexingCoordination -# from onyx.redis.redis_connector import RedisConnector +from onyx.redis.redis_connector import RedisConnector from onyx.utils.logger import setup_logger from onyx.utils.variable_functionality import global_version from shared_configs.configs import SENTRY_DSN @@ -211,14 +212,14 @@ def _docfetching_task( ) # This is where the heavy/real work happens - # run_docfetching_entrypoint( - # app, - # index_attempt_id, - # tenant_id, - # cc_pair_id, - # is_ee, - # callback=callback, - # ) + run_docfetching_entrypoint( + app, + index_attempt_id, + tenant_id, + cc_pair_id, + is_ee, + callback=callback, + ) except ConnectorValidationError: raise SimpleJobException( @@ -259,7 +260,7 @@ def process_job_result( job: SimpleJob, connector_source: str | None, index_attempt_id: int, - log_builder: ConnectorIndexingLogBuilder, + log_builder: Any, ) -> SimpleJobResult: result = SimpleJobResult() result.connector_source = connector_source diff --git a/backend/onyx/background/celery/tasks/docprocessing/tasks.py b/backend/onyx/background/celery/tasks/docprocessing/tasks.py index 11afc53aade..b376b236539 100644 --- a/backend/onyx/background/celery/tasks/docprocessing/tasks.py +++ b/backend/onyx/background/celery/tasks/docprocessing/tasks.py @@ -86,7 +86,6 @@ from onyx.file_store.document_batch_storage import get_document_batch_storage from onyx.httpx.httpx_pool import HttpxPool from onyx.indexing.embedder import DefaultIndexingEmbedder -from onyx.indexing.indexing_pipeline import run_indexing_pipeline from onyx.natural_language_processing.search_nlp_models import EmbeddingModel from onyx.natural_language_processing.search_nlp_models import ( InformationContentClassificationModel, @@ -1268,6 +1267,8 @@ def _docprocessing_task( tenant_id: str, batch_num: int, ) -> None: + from onyx.indexing.indexing_pipeline import run_indexing_pipeline + start_time = time.monotonic() if tenant_id: diff --git a/backend/onyx/background/indexing/run_docfetching.py b/backend/onyx/background/indexing/run_docfetching.py index 70502f1f895..955c8d9fac7 100644 --- a/backend/onyx/background/indexing/run_docfetching.py +++ b/backend/onyx/background/indexing/run_docfetching.py @@ -28,7 +28,6 @@ from onyx.connectors.connector_runner import ConnectorRunner from onyx.connectors.exceptions import ConnectorValidationError from onyx.connectors.exceptions import UnexpectedValidationError -from onyx.connectors.factory import instantiate_connector from onyx.connectors.interfaces import CheckpointedConnector from onyx.connectors.models import ConnectorFailure from onyx.connectors.models import ConnectorStopSignal @@ -66,7 +65,6 @@ from onyx.httpx.httpx_pool import HttpxPool from onyx.indexing.embedder import DefaultIndexingEmbedder from onyx.indexing.indexing_heartbeat import IndexingHeartbeatInterface -from onyx.indexing.indexing_pipeline import run_indexing_pipeline from onyx.natural_language_processing.search_nlp_models import ( InformationContentClassificationModel, ) @@ -100,6 +98,8 @@ def _get_connector_runner( are the complete list of existing documents of the connector. If the task of type LOAD_STATE, the list will be considered complete and otherwise incomplete. """ + from onyx.connectors.factory import instantiate_connector + task = attempt.connector_credential_pair.connector.input_type try: @@ -283,6 +283,8 @@ def _run_indexing( 2. Embed and index these documents into the chosen datastore (vespa) 3. Updates Postgres to record the indexed documents + the outcome of this run """ + from onyx.indexing.indexing_pipeline import run_indexing_pipeline + start_time = time.monotonic() # jsut used for logging with get_session_with_current_tenant() as db_session_temp: diff --git a/backend/onyx/chat/prompt_builder/answer_prompt_builder.py b/backend/onyx/chat/prompt_builder/answer_prompt_builder.py index 0c3ee865b5a..f3573c5e05f 100644 --- a/backend/onyx/chat/prompt_builder/answer_prompt_builder.py +++ b/backend/onyx/chat/prompt_builder/answer_prompt_builder.py @@ -4,7 +4,6 @@ from langchain_core.messages import BaseMessage from langchain_core.messages import HumanMessage from langchain_core.messages import SystemMessage -from pydantic import BaseModel from pydantic.v1 import BaseModel as BaseModel__v1 from onyx.chat.models import PromptConfig @@ -196,10 +195,6 @@ def build(self) -> list[BaseMessage]: # Stores some parts of a prompt builder as needed for tool calls -class PromptSnapshot(BaseModel): - raw_message_history: list[PreviousMessage] - raw_user_query: str - built_prompt: list[BaseMessage] # TODO: rename this? AnswerConfig maybe? diff --git a/backend/onyx/chat/prompt_builder/schemas.py b/backend/onyx/chat/prompt_builder/schemas.py new file mode 100644 index 00000000000..461a76f45ef --- /dev/null +++ b/backend/onyx/chat/prompt_builder/schemas.py @@ -0,0 +1,10 @@ +from langchain_core.messages import BaseMessage +from pydantic import BaseModel + +from onyx.llm.models import PreviousMessage + + +class PromptSnapshot(BaseModel): + raw_message_history: list[PreviousMessage] + raw_user_query: str + built_prompt: list[BaseMessage] diff --git a/backend/onyx/chat/tool_handling/tool_response_handler.py b/backend/onyx/chat/tool_handling/tool_response_handler.py index 7531d9edf8a..613976b1ac6 100644 --- a/backend/onyx/chat/tool_handling/tool_response_handler.py +++ b/backend/onyx/chat/tool_handling/tool_response_handler.py @@ -7,7 +7,7 @@ from onyx.chat.models import ResponsePart from onyx.chat.prompt_builder.answer_prompt_builder import AnswerPromptBuilder from onyx.chat.prompt_builder.answer_prompt_builder import LLMCall -from onyx.chat.prompt_builder.answer_prompt_builder import PromptSnapshot +from onyx.chat.prompt_builder.schemas import PromptSnapshot from onyx.llm.interfaces import LLM from onyx.tools.force import ForceUseTool from onyx.tools.message import build_tool_message diff --git a/backend/onyx/db/chat.py b/backend/onyx/db/chat.py index d6c0eadebe5..559206a1377 100644 --- a/backend/onyx/db/chat.py +++ b/backend/onyx/db/chat.py @@ -56,7 +56,7 @@ from onyx.server.query_and_chat.models import ChatMessageDetail from onyx.server.query_and_chat.models import SubQueryDetail from onyx.server.query_and_chat.models import SubQuestionDetail -from onyx.tools.tool_runner import ToolCallFinalResult +from onyx.tools.models import ToolCallFinalResult from onyx.utils.logger import setup_logger from onyx.utils.special_types import JSON_ro diff --git a/backend/onyx/db/document.py b/backend/onyx/db/document.py index f394d90e5cf..62b98859c05 100644 --- a/backend/onyx/db/document.py +++ b/backend/onyx/db/document.py @@ -22,7 +22,7 @@ from sqlalchemy.orm import Session from sqlalchemy.sql.expression import null -# from onyx.agents.agent_search.kb_search.models import KGEntityDocInfo +from onyx.agents.agent_search.kb_search.models import KGEntityDocInfo from onyx.configs.constants import DEFAULT_BOOST from onyx.configs.constants import DocumentSource from onyx.configs.kg_configs import KG_SIMPLE_ANSWER_MAX_DISPLAYED_SOURCES @@ -33,7 +33,7 @@ from onyx.db.entities import delete_from_kg_entities_extraction_staging__no_commit from onyx.db.enums import AccessType from onyx.db.enums import ConnectorCredentialPairStatus -# from onyx.db.feedback import delete_document_feedback_for_documents__no_commit +from onyx.db.feedback import delete_document_feedback_for_documents__no_commit from onyx.db.models import Connector from onyx.db.models import ConnectorCredentialPair from onyx.db.models import Credential @@ -747,9 +747,9 @@ def delete_documents_complete__no_commit( ) delete_documents_by_connector_credential_pair__no_commit(db_session, document_ids) - # delete_document_feedback_for_documents__no_commit( - # document_ids=document_ids, db_session=db_session - # ) + delete_document_feedback_for_documents__no_commit( + document_ids=document_ids, db_session=db_session + ) delete_document_tags_for_documents__no_commit( document_ids=document_ids, db_session=db_session ) @@ -1199,35 +1199,35 @@ def get_skipped_kg_documents(db_session: Session) -> list[str]: return list(db_session.scalars(stmt).all()) -# def get_kg_doc_info_for_entity_name( -# db_session: Session, document_id: str, entity_type: str -# ) -> KGEntityDocInfo: -# """ -# Get the semantic ID and the link for an entity name. -# """ - -# result = ( -# db_session.query(Document.semantic_id, Document.link) -# .filter(Document.id == document_id) -# .first() -# ) - -# if result is None: -# return KGEntityDocInfo( -# doc_id=None, -# doc_semantic_id=None, -# doc_link=None, -# semantic_entity_name=f"{entity_type}:{document_id}", -# semantic_linked_entity_name=f"{entity_type}:{document_id}", -# ) - -# return KGEntityDocInfo( -# doc_id=document_id, -# doc_semantic_id=result[0], -# doc_link=result[1], -# semantic_entity_name=f"{entity_type.upper()}:{result[0]}", -# semantic_linked_entity_name=f"[{entity_type.upper()}:{result[0]}]({result[1]})", -# ) +def get_kg_doc_info_for_entity_name( + db_session: Session, document_id: str, entity_type: str +) -> KGEntityDocInfo: + """ + Get the semantic ID and the link for an entity name. + """ + + result = ( + db_session.query(Document.semantic_id, Document.link) + .filter(Document.id == document_id) + .first() + ) + + if result is None: + return KGEntityDocInfo( + doc_id=None, + doc_semantic_id=None, + doc_link=None, + semantic_entity_name=f"{entity_type}:{document_id}", + semantic_linked_entity_name=f"{entity_type}:{document_id}", + ) + + return KGEntityDocInfo( + doc_id=document_id, + doc_semantic_id=result[0], + doc_link=result[1], + semantic_entity_name=f"{entity_type.upper()}:{result[0]}", + semantic_linked_entity_name=f"[{entity_type.upper()}:{result[0]}]({result[1]})", + ) def check_for_documents_needing_kg_processing( diff --git a/backend/onyx/file_processing/extract_file_text.py b/backend/onyx/file_processing/extract_file_text.py index 793ee23901c..405d15daa4b 100644 --- a/backend/onyx/file_processing/extract_file_text.py +++ b/backend/onyx/file_processing/extract_file_text.py @@ -15,14 +15,12 @@ from typing import Any from typing import IO from typing import NamedTuple +from typing import Optional +from typing import TYPE_CHECKING from zipfile import BadZipFile import chardet import openpyxl -from markitdown import FileConversionException -from markitdown import MarkItDown -from markitdown import StreamInfo -from markitdown import UnsupportedFormatException from PIL import Image from pypdf import PdfReader from pypdf.errors import PdfStreamError @@ -37,6 +35,8 @@ from onyx.utils.file_types import WORD_PROCESSING_MIME_TYPE from onyx.utils.logger import setup_logger +if TYPE_CHECKING: + from markitdown import MarkItDown logger = setup_logger() # NOTE(rkuo): Unify this with upload_files_for_chat and file_valiation.py @@ -85,7 +85,7 @@ "image/webp", ] -_MARKITDOWN_CONVERTER: MarkItDown | None = None +_MARKITDOWN_CONVERTER: Optional["MarkItDown"] = None KNOWN_OPENPYXL_BUGS = [ "Value must be either numerical or a string containing a wildcard", @@ -93,8 +93,10 @@ ] -def get_markitdown_converter() -> MarkItDown: +def get_markitdown_converter() -> "MarkItDown": global _MARKITDOWN_CONVERTER + from markitdown import MarkItDown + if _MARKITDOWN_CONVERTER is None: _MARKITDOWN_CONVERTER = MarkItDown(enable_plugins=False) return _MARKITDOWN_CONVERTER @@ -358,6 +360,12 @@ def docx_to_text_and_images( The images list returned is empty in this case. """ md = get_markitdown_converter() + from markitdown import ( + StreamInfo, + FileConversionException, + UnsupportedFormatException, + ) + try: doc = md.convert( to_bytesio(file), stream_info=StreamInfo(mimetype=WORD_PROCESSING_MIME_TYPE) @@ -394,6 +402,12 @@ def docx_to_text_and_images( def pptx_to_text(file: IO[Any], file_name: str = "") -> str: md = get_markitdown_converter() + from markitdown import ( + StreamInfo, + FileConversionException, + UnsupportedFormatException, + ) + stream_info = StreamInfo( mimetype=PRESENTATION_MIME_TYPE, filename=file_name or None, extension=".pptx" ) diff --git a/backend/onyx/llm/utils.py b/backend/onyx/llm/utils.py index dd5af68e879..24810aaf852 100644 --- a/backend/onyx/llm/utils.py +++ b/backend/onyx/llm/utils.py @@ -8,8 +8,6 @@ from typing import cast from typing import TYPE_CHECKING -import litellm # type: ignore -import tiktoken from langchain.prompts.base import StringPromptValue from langchain.prompts.chat import ChatPromptValue from langchain.schema import PromptValue @@ -18,18 +16,6 @@ from langchain.schema.messages import BaseMessage from langchain.schema.messages import HumanMessage from langchain.schema.messages import SystemMessage -from litellm.exceptions import APIConnectionError # type: ignore -from litellm.exceptions import APIError # type: ignore -from litellm.exceptions import AuthenticationError # type: ignore -from litellm.exceptions import BadRequestError # type: ignore -from litellm.exceptions import BudgetExceededError # type: ignore -from litellm.exceptions import ContentPolicyViolationError # type: ignore -from litellm.exceptions import ContextWindowExceededError # type: ignore -from litellm.exceptions import NotFoundError # type: ignore -from litellm.exceptions import PermissionDeniedError # type: ignore -from litellm.exceptions import RateLimitError # type: ignore -from litellm.exceptions import Timeout # type: ignore -from litellm.exceptions import UnprocessableEntityError # type: ignore from onyx.configs.app_configs import LITELLM_CUSTOM_ERROR_MESSAGE_MAPPINGS from onyx.configs.app_configs import MAX_TOKENS_FOR_FULL_INCLUSION @@ -40,7 +26,6 @@ from onyx.configs.model_configs import GEN_AI_MAX_TOKENS from onyx.configs.model_configs import GEN_AI_MODEL_FALLBACK_MAX_TOKENS from onyx.configs.model_configs import GEN_AI_NUM_RESERVED_OUTPUT_TOKENS -from onyx.file_processing.extract_file_text import read_pdf_file from onyx.file_store.models import ChatFileType from onyx.file_store.models import InMemoryChatFile from onyx.llm.interfaces import LLM @@ -72,6 +57,19 @@ def litellm_exception_to_error_msg( dict[str, str] | None ) = LITELLM_CUSTOM_ERROR_MESSAGE_MAPPINGS, ) -> str: + from litellm.exceptions import BadRequestError + from litellm.exceptions import AuthenticationError + from litellm.exceptions import PermissionDeniedError + from litellm.exceptions import NotFoundError + from litellm.exceptions import UnprocessableEntityError + from litellm.exceptions import RateLimitError + from litellm.exceptions import ContextWindowExceededError + from litellm.exceptions import APIConnectionError + from litellm.exceptions import APIError + from litellm.exceptions import Timeout + from litellm.exceptions import ContentPolicyViolationError + from litellm.exceptions import BudgetExceededError + error_msg = str(e) if custom_error_msg_mappings: @@ -133,6 +131,8 @@ def _build_content( files: list[InMemoryChatFile] | None = None, ) -> str: """Applies all non-image files.""" + from onyx.file_processing.extract_file_text import read_pdf_file + if not files: return message @@ -355,6 +355,7 @@ def check_number_of_tokens( function. If none is provided, default to the tiktoken encoder used by GPT-3.5 and GPT-4. """ + import tiktoken if encode_fn is None: encode_fn = tiktoken.get_encoding("cl100k_base").encode @@ -378,6 +379,8 @@ def test_llm(llm: LLM) -> str | None: @lru_cache(maxsize=1) # the copy.deepcopy is expensive, so we cache the result def get_model_map() -> dict: + import litellm + starting_map = copy.deepcopy(cast(dict, litellm.model_cost)) # NOTE: we could add additional models here in the future, @@ -457,6 +460,7 @@ def get_llm_contextual_cost( this does not account for the cost of documents that fit within a single chunk which do not get contextualized. """ + import litellm # calculate input costs num_tokens = ONE_MILLION @@ -655,6 +659,8 @@ def model_supports_image_input(model_name: str, model_provider: str) -> bool: def model_is_reasoning_model(model_name: str, model_provider: str) -> bool: + import litellm + model_map = get_model_map() try: model_obj = find_model_obj( diff --git a/backend/onyx/natural_language_processing/search_nlp_models.py b/backend/onyx/natural_language_processing/search_nlp_models.py index 96712acccac..ae4cd8a75f1 100644 --- a/backend/onyx/natural_language_processing/search_nlp_models.py +++ b/backend/onyx/natural_language_processing/search_nlp_models.py @@ -13,13 +13,11 @@ import aioboto3 # type: ignore import httpx -import openai import requests import voyageai # type: ignore from cohere import AsyncClient as CohereAsyncClient from google.oauth2 import service_account # type: ignore from httpx import HTTPError -from litellm import aembedding from requests import JSONDecodeError from requests import RequestException from requests import Response @@ -186,6 +184,8 @@ def __init__( async def _embed_openai( self, texts: list[str], model: str | None, reduced_dimension: int | None ) -> list[Embedding]: + import openai + if not model: model = DEFAULT_OPENAI_MODEL @@ -249,6 +249,8 @@ async def _embed_voyage( async def _embed_azure( self, texts: list[str], model: str | None ) -> list[Embedding]: + from litellm import aembedding + response = await aembedding( model=model, input=texts, @@ -331,6 +333,8 @@ async def embed( deployment_name: str | None = None, reduced_dimension: int | None = None, ) -> list[Embedding]: + import openai + try: if self.provider == EmbeddingProvider.OPENAI: return await self._embed_openai(texts, model_name, reduced_dimension) From b9c7df4ea0bb79549ed9c0387ca97464a68997f1 Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 23 Sep 2025 15:12:07 -0700 Subject: [PATCH 4/5] . --- .../celery/tasks/docfetching/tasks.py | 3 +- backend/onyx/context/search/models.py | 53 ------------------- 2 files changed, 1 insertion(+), 55 deletions(-) diff --git a/backend/onyx/background/celery/tasks/docfetching/tasks.py b/backend/onyx/background/celery/tasks/docfetching/tasks.py index 200d3a0dd2b..675a2642c88 100644 --- a/backend/onyx/background/celery/tasks/docfetching/tasks.py +++ b/backend/onyx/background/celery/tasks/docfetching/tasks.py @@ -3,7 +3,6 @@ import time import traceback from time import sleep -from typing import Any import sentry_sdk from celery import Celery @@ -260,7 +259,7 @@ def process_job_result( job: SimpleJob, connector_source: str | None, index_attempt_id: int, - log_builder: Any, + log_builder: ConnectorIndexingLogBuilder, ) -> SimpleJobResult: result = SimpleJobResult() result.connector_source = connector_source diff --git a/backend/onyx/context/search/models.py b/backend/onyx/context/search/models.py index 1c31105df50..a1b062f57ca 100644 --- a/backend/onyx/context/search/models.py +++ b/backend/onyx/context/search/models.py @@ -394,59 +394,6 @@ def chunks_or_sections_to_search_docs( return search_docs - @classmethod - def from_inference_section( - cls, inference_section: "InferenceSection" - ) -> "SearchDoc": - """Convert an InferenceSection to a SearchDoc using the center chunk's data.""" - chunk = inference_section.center_chunk - return cls( - document_id=chunk.document_id, - chunk_ind=chunk.chunk_id, - semantic_identifier=chunk.semantic_identifier or "Unknown", - link=chunk.source_links[0] if chunk.source_links else None, - blurb=chunk.blurb, - source_type=chunk.source_type, - boost=chunk.boost, - hidden=chunk.hidden, - metadata=chunk.metadata, - score=chunk.score, - is_relevant=chunk.is_relevant, - relevance_explanation=chunk.relevance_explanation, - match_highlights=chunk.match_highlights, - updated_at=chunk.updated_at, - primary_owners=chunk.primary_owners, - secondary_owners=chunk.secondary_owners, - is_internet=False, - ) - - @classmethod - def from_inference_chunk(cls, inference_chunk: "InferenceChunk") -> "SearchDoc": - """Convert an InferenceChunk to a SearchDoc.""" - return cls( - document_id=inference_chunk.document_id, - chunk_ind=inference_chunk.chunk_id, - semantic_identifier=inference_chunk.semantic_identifier or "Unknown", - link=( - inference_chunk.source_links[0] - if inference_chunk.source_links - else None - ), - blurb=inference_chunk.blurb, - source_type=inference_chunk.source_type, - boost=inference_chunk.boost, - hidden=inference_chunk.hidden, - metadata=inference_chunk.metadata, - score=inference_chunk.score, - is_relevant=inference_chunk.is_relevant, - relevance_explanation=inference_chunk.relevance_explanation, - match_highlights=inference_chunk.match_highlights, - updated_at=inference_chunk.updated_at, - primary_owners=inference_chunk.primary_owners, - secondary_owners=inference_chunk.secondary_owners, - is_internet=False, - ) - def model_dump(self, *args: list, **kwargs: dict[str, Any]) -> dict[str, Any]: # type: ignore initial_dict = super().model_dump(*args, **kwargs) # type: ignore initial_dict["updated_at"] = ( From 6782d0018eb4903478860efa40dd5801879ad901 Mon Sep 17 00:00:00 2001 From: edwin Date: Tue, 23 Sep 2025 15:13:27 -0700 Subject: [PATCH 5/5] . --- backend/onyx/redis/redis_connector_delete.py | 70 ++++++++++---------- 1 file changed, 35 insertions(+), 35 deletions(-) diff --git a/backend/onyx/redis/redis_connector_delete.py b/backend/onyx/redis/redis_connector_delete.py index 999c2127464..3ad08f24106 100644 --- a/backend/onyx/redis/redis_connector_delete.py +++ b/backend/onyx/redis/redis_connector_delete.py @@ -16,7 +16,7 @@ from onyx.configs.constants import OnyxCeleryTask from onyx.configs.constants import OnyxRedisConstants from onyx.db.connector_credential_pair import get_connector_credential_pair_from_id -# from onyx.db.document import construct_document_id_select_for_connector_credential_pair +from onyx.db.document import construct_document_id_select_for_connector_credential_pair class RedisConnectorDeletePayload(BaseModel): @@ -118,40 +118,40 @@ def generate_tasks( num_tasks_sent = 0 - # stmt = construct_document_id_select_for_connector_credential_pair( - # cc_pair.connector_id, cc_pair.credential_id - # ) - # for doc_id in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT): - # doc_id = cast(str, doc_id) - # current_time = time.monotonic() - # if current_time - last_lock_time >= ( - # CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 - # ): - # lock.reacquire() - # last_lock_time = current_time - - # custom_task_id = self._generate_task_id() - - # # add to the tracking taskset in redis BEFORE creating the celery task. - # # note that for the moment we are using a single taskset key, not differentiated by cc_pair id - # self.redis.sadd(self.taskset_key, custom_task_id) - - # # Priority on sync's triggered by new indexing should be medium - # celery_app.send_task( - # OnyxCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, - # kwargs=dict( - # document_id=doc_id, - # connector_id=cc_pair.connector_id, - # credential_id=cc_pair.credential_id, - # tenant_id=self.tenant_id, - # ), - # queue=OnyxCeleryQueues.CONNECTOR_DELETION, - # task_id=custom_task_id, - # priority=OnyxCeleryPriority.MEDIUM, - # ignore_result=True, - # ) - - # num_tasks_sent += 1 + stmt = construct_document_id_select_for_connector_credential_pair( + cc_pair.connector_id, cc_pair.credential_id + ) + for doc_id in db_session.scalars(stmt).yield_per(DB_YIELD_PER_DEFAULT): + doc_id = cast(str, doc_id) + current_time = time.monotonic() + if current_time - last_lock_time >= ( + CELERY_VESPA_SYNC_BEAT_LOCK_TIMEOUT / 4 + ): + lock.reacquire() + last_lock_time = current_time + + custom_task_id = self._generate_task_id() + + # add to the tracking taskset in redis BEFORE creating the celery task. + # note that for the moment we are using a single taskset key, not differentiated by cc_pair id + self.redis.sadd(self.taskset_key, custom_task_id) + + # Priority on sync's triggered by new indexing should be medium + celery_app.send_task( + OnyxCeleryTask.DOCUMENT_BY_CC_PAIR_CLEANUP_TASK, + kwargs=dict( + document_id=doc_id, + connector_id=cc_pair.connector_id, + credential_id=cc_pair.credential_id, + tenant_id=self.tenant_id, + ), + queue=OnyxCeleryQueues.CONNECTOR_DELETION, + task_id=custom_task_id, + priority=OnyxCeleryPriority.MEDIUM, + ignore_result=True, + ) + + num_tasks_sent += 1 return num_tasks_sent