diff --git a/src/intugle/core/semantic_search/crud.py b/src/intugle/core/semantic_search/crud.py index ef494b0..65d81ef 100644 --- a/src/intugle/core/semantic_search/crud.py +++ b/src/intugle/core/semantic_search/crud.py @@ -20,13 +20,27 @@ class SemanticSearchCRUD: - def __init__(self, collection_name: str, embeddings: List[Embeddings], batch_size: int = 30): + def __init__(self, collection_name: str, embeddings: List[Embeddings], batch_size: Optional[int] = None): + if not collection_name or not isinstance(collection_name, str): + raise ValueError("collection_name must be a non-empty string") + if not embeddings or not isinstance(embeddings, list) or len(embeddings) == 0: + raise ValueError("embeddings must be a non-empty list of Embeddings") + self.collection_name = collection_name self.embeddings = embeddings - self.batch_size = batch_size + self.batch_size = batch_size if batch_size is not None else settings.QDRANT_INSERT_BATCH_SIZE + + if self.batch_size <= 0: + raise ValueError("batch_size must be a positive integer") + + log.info(f"SemanticSearchCRUD initialized with collection_name='{self.collection_name}', " + f"embeddings_count={len(self.embeddings)}, batch_size={self.batch_size}") @property def vector_store(self): + if not settings.QDRANT_URL: + raise ValueError("QDRANT_URL setting is required but not configured") + client_config = {"url": settings.QDRANT_URL, "api_key": settings.QDRANT_API_KEY} return VectorStoreService( collection_name=self.collection_name, @@ -109,28 +123,50 @@ def create_content_for_vectorization(self, _: int, row: pd.Series) -> pd.DataFra return final_consolidated_content[final_consolidated_content.content != ""] async def vectorize(self, content: pd.DataFrame) -> List[models.PointStruct]: + """Vectorize content using configured embeddings.""" + if content is None or content.empty: + log.warning("Empty content provided for vectorization") + return [] + + if not isinstance(content, pd.DataFrame): + raise ValueError("content must be a pandas DataFrame") + + required_columns = ["content", "type", "column_id"] + missing_columns = [col for col in required_columns if col not in content.columns] + if missing_columns: + raise ValueError(f"Content DataFrame is missing required columns: {missing_columns}") + tags_and_columns = content.loc[content.type.isin(["tag", "column_name"])].reset_index(drop=True) business_glossary = content.loc[content.type.isin(["glossary"])].reset_index(drop=True) - tags_and_columns_content = tags_and_columns["content"].tolist() - business_glossary_content = business_glossary["content"].tolist() - log.info(f"tags_column: {tags_and_columns_content}") - log.info(f"business glossary: {business_glossary_content}") + tags_and_columns_content = tags_and_columns["content"].tolist() if not tags_and_columns.empty else [] + business_glossary_content = business_glossary["content"].tolist() if not business_glossary.empty else [] + + log.debug(f"Vectorizing {len(tags_and_columns_content)} tag/column items and {len(business_glossary_content)} glossary items") + + if not tags_and_columns_content and not business_glossary_content: + log.warning("No content to vectorize") + return [] async def run(): - # Run tags col and glossary concurrenty + # Run tags col and glossary concurrently try: coroutines = [] embedding_map = [] for embedding in self.embeddings: - coroutines.append( - embedding.aencode(tags_and_columns_content, embeddings_types=[EmbeddingsType.DENSE]) - ) - embedding_map.append((embedding, "tags_col")) - coroutines.append( - embedding.aencode(business_glossary_content, embeddings_types=[EmbeddingsType.LATE]) - ) - embedding_map.append((embedding, "glossary")) + if tags_and_columns_content: + coroutines.append( + embedding.aencode(tags_and_columns_content, embeddings_types=[EmbeddingsType.DENSE]) + ) + embedding_map.append((embedding, "tags_col")) + if business_glossary_content: + coroutines.append( + embedding.aencode(business_glossary_content, embeddings_types=[EmbeddingsType.LATE]) + ) + embedding_map.append((embedding, "glossary")) + + if not coroutines: + return {"tags_col": {}, "glossary": {}} gathered_results = await asyncio.gather(*coroutines) @@ -143,17 +179,21 @@ async def run(): return results except Exception as ex: - raise Exception(f"[!] Semantic Search: Couldnot vectorize => {ex}") + log.error(f"Vectorization failed: {ex}") + raise RuntimeError(f"Semantic Search vectorization failed: {ex}") from ex - # Run all type of embeddings concurrenlty + # Run all type of embeddings concurrently results = await run() points = [] - point = self.convert_to_qdrant_point(tags_and_columns, results["tags_col"]) - points.extend(point) - point = self.convert_to_qdrant_point(business_glossary, results["glossary"]) - points.extend(point) - + if not tags_and_columns.empty and results["tags_col"]: + point = self.convert_to_qdrant_point(tags_and_columns, results["tags_col"]) + points.extend(point) + if not business_glossary.empty and results["glossary"]: + point = self.convert_to_qdrant_point(business_glossary, results["glossary"]) + points.extend(point) + + log.debug(f"Generated {len(points)} points from vectorization") return points @staticmethod @@ -208,19 +248,58 @@ async def clean_collection(self): await vdb.client.create_payload_index( collection_name=self.collection_name, field_name="type", - field_schema=models.PayloadSchemaType.KEYWORD + field_type=models.PayloadSchemaType.KEYWORD ) async def initialize(self, column_details: list[dict]): - await self.clean_collection() + """Initialize the semantic search collection with column details.""" + if not column_details or not isinstance(column_details, list): + raise ValueError("column_details must be a non-empty list of dictionaries") + + log.info(f"Starting initialization with {len(column_details)} column details, batch_size={self.batch_size}") + + try: + await self.clean_collection() + log.info("Collection cleaned successfully") + except Exception as e: + log.error(f"Failed to clean collection: {e}") + raise RuntimeError(f"Failed to initialize collection: {e}") from e + async with self.vector_store as vdb: - column_details = pd.DataFrame(column_details) + column_details_df = pd.DataFrame(column_details) + total_batches = (len(column_details_df) + self.batch_size - 1) // self.batch_size + log.info(f"Processing {len(column_details_df)} columns in {total_batches} batches") + + batch_count = 0 + for batch in batched(column_details_df, self.batch_size): + batch_count += 1 + try: + log.debug(f"Processing batch {batch_count}/{total_batches}") + + content = list(itertools.starmap(self.create_content_for_vectorization, batch.iterrows())) + if not content: + log.warning(f"Batch {batch_count} produced no content, skipping") + continue + + content_df = pd.concat(content, axis=0).reset_index(drop=True) + if content_df.empty: + log.warning(f"Batch {batch_count} content is empty, skipping") + continue + + log.debug(f"Batch {batch_count}: Created {len(content_df)} content items") + + points = await self.vectorize(content_df) + if not points: + log.warning(f"Batch {batch_count}: No points generated, skipping") + continue - for batch in batched(column_details, self.batch_size): - content = list(itertools.starmap(self.create_content_for_vectorization, batch.iterrows())) + log.debug(f"Batch {batch_count}: Generated {len(points)} points") - content = pd.concat(content, axis=0).reset_index(drop=True) + await vdb.bulk_insert(points) + log.info(f"Batch {batch_count}/{total_batches} completed successfully") - points = await self.vectorize(content) + except Exception as e: + log.error(f"Failed to process batch {batch_count}: {e}") + raise RuntimeError(f"Batch processing failed at batch {batch_count}: {e}") from e - vdb.bulk_insert(points) + log.info("Semantic search initialization completed successfully") diff --git a/src/intugle/core/settings.py b/src/intugle/core/settings.py index 60cea25..49125ae 100644 --- a/src/intugle/core/settings.py +++ b/src/intugle/core/settings.py @@ -115,6 +115,7 @@ def set_project_base(self, project_base: str): VECTOR_COLLECTION_NAME: str = os.getcwd().split("/")[-1] QDRANT_URL: str = "http://localhost:6333" QDRANT_API_KEY: Optional[str] = None + QDRANT_INSERT_BATCH_SIZE: int = 10 TAVILY_API_KEY: Optional[str] = None EMBEDDING_MODEL_NAME: str = "openai:ada" TOKENIZER_MODEL_NAME: str = "cl100k_base" diff --git a/uv.lock b/uv.lock index fee0f82..f52b738 100644 --- a/uv.lock +++ b/uv.lock @@ -2043,6 +2043,10 @@ mysql = [ { name = "pymysql" }, { name = "sqlglot" }, ] +oracle = [ + { name = "oracledb" }, + { name = "sqlglot" }, +] postgres = [ { name = "asyncpg" }, { name = "sqlglot" }, @@ -2114,6 +2118,7 @@ requires-dist = [ { name = "networkx", specifier = ">=3.4.2" }, { name = "nltk", specifier = ">=3.9.1" }, { name = "numpy", specifier = "<=2.3.0" }, + { name = "oracledb", marker = "extra == 'oracle'", specifier = ">=2.0.0" }, { name = "pandas", specifier = ">=2.2.2" }, { name = "plotly", marker = "extra == 'streamlit'" }, { name = "pyaml", specifier = ">=25.7.0" }, @@ -2132,6 +2137,7 @@ requires-dist = [ { name = "snowflake-snowpark-python", extras = ["pandas"], marker = "extra == 'snowflake'", specifier = ">=1.12.0" }, { name = "sqlglot", marker = "extra == 'databricks'", specifier = ">=27.20.0" }, { name = "sqlglot", marker = "extra == 'mysql'", specifier = ">=27.20.0" }, + { name = "sqlglot", marker = "extra == 'oracle'", specifier = ">=27.20.0" }, { name = "sqlglot", marker = "extra == 'postgres'", specifier = ">=27.20.0" }, { name = "sqlglot", marker = "extra == 'snowflake'", specifier = ">=27.20.0" }, { name = "sqlglot", marker = "extra == 'sqlserver'", specifier = ">=27.20.0" }, @@ -2142,7 +2148,7 @@ requires-dist = [ { name = "xgboost", specifier = ">=3.0.4" }, { name = "xlsxwriter", marker = "extra == 'streamlit'", specifier = "==3.2.9" }, ] -provides-extras = ["snowflake", "databricks", "postgres", "sqlserver", "mysql", "streamlit"] +provides-extras = ["snowflake", "databricks", "postgres", "sqlserver", "mysql", "oracle", "streamlit"] [package.metadata.requires-dev] dev = [ @@ -3684,6 +3690,43 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/c0/da/977ded879c29cbd04de313843e76868e6e13408a94ed6b987245dc7c8506/openpyxl-3.1.5-py2.py3-none-any.whl", hash = "sha256:5282c12b107bffeef825f4617dc029afaf41d0ea60823bbb665ef3079dc79de2", size = 250910, upload-time = "2024-06-28T14:03:41.161Z" }, ] +[[package]] +name = "oracledb" +version = "3.4.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "cryptography" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/5e/9d/4e86cd410294ebbb1f90a609aaae61c5fa064a5c10e501de3f4c67664e6c/oracledb-3.4.1.tar.gz", hash = "sha256:f5920df5ac9446579e8409607bba31dc2d23a2286a5b0ea17cb0d78d419392a6", size = 852693, upload-time = "2025-11-12T03:21:36.157Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/4b/70/05645e72a67b45396a248a7949d89c91dc7a1ab5f7cedad110d9804e29d5/oracledb-3.4.1-cp310-cp310-macosx_10_9_universal2.whl", hash = "sha256:dfe18061f064d0455fad10d9301f6f92df9e32d18d75fb32802caf1ced4b304c", size = 4243226, upload-time = "2025-11-12T03:21:41.734Z" }, + { url = "https://files.pythonhosted.org/packages/7e/cc/f3a78ae31f87e41378c7bc60928fa5432d4eba80806cb0086edc11803a22/oracledb-3.4.1-cp310-cp310-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:84055d6fd093a4d7b8ed653f433531e4c4cc161f7261d78efd7f6a65a1f19444", size = 2426914, upload-time = "2025-11-12T03:21:43.641Z" }, + { url = "https://files.pythonhosted.org/packages/a6/a6/3d3dabbec2651851f13fdb7c318a3c50780090235d340d851f7cb8deeeec/oracledb-3.4.1-cp310-cp310-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:c9e20b6cd3245e84c30188874c524bb3c67c79b7a04fcb864e6ac39f55eae826", size = 2605903, upload-time = "2025-11-12T03:21:45.378Z" }, + { url = "https://files.pythonhosted.org/packages/ae/59/aa174fc8f5629b890424702edf582a8a635acaa0db1315b16160d703a887/oracledb-3.4.1-cp310-cp310-win32.whl", hash = "sha256:abedb0bf464bcf14d83e245eae000e03cad8ac68c945eb09cc46002d800fbf00", size = 1490352, upload-time = "2025-11-12T03:21:46.732Z" }, + { url = "https://files.pythonhosted.org/packages/8a/1c/9dded6efc747d8980667584c8464295d80d205f8a131e31cacfb274b6ed5/oracledb-3.4.1-cp310-cp310-win_amd64.whl", hash = "sha256:4ee604bb0f3acb5680782818f973445b8cd168e72a73b5ca2cd9807140afadee", size = 1837541, upload-time = "2025-11-12T03:21:48.571Z" }, + { url = "https://files.pythonhosted.org/packages/ed/9e/5901349b8797fabc7c6f78230376bfbd5541a847f1eb34be23bfb971add7/oracledb-3.4.1-cp311-cp311-macosx_10_9_universal2.whl", hash = "sha256:20b268be64994d0f636df9ff7613dcce420133f373d0d7fc84a31dd2f07322c0", size = 4226376, upload-time = "2025-11-12T03:21:49.959Z" }, + { url = "https://files.pythonhosted.org/packages/fc/c0/951d2ab8c04df9da309a82e211d19223a64dbbcfdd79f5f1aba6d8736408/oracledb-3.4.1-cp311-cp311-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d493946318d99a0f0e3f01d7c64c08ddae66f0aac735fa23c1eb94949d9db0f5", size = 2422323, upload-time = "2025-11-12T03:21:51.583Z" }, + { url = "https://files.pythonhosted.org/packages/a8/7c/82843dd7e55dec6331c0c7737e32523eb2f6156c6469055e2cb752e848f4/oracledb-3.4.1-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:4d64fda2fa5d3e82c58b2c5126ab5511bccb84f8b47eedfe9f17e9c100fe7683", size = 2601267, upload-time = "2025-11-12T03:21:52.978Z" }, + { url = "https://files.pythonhosted.org/packages/27/3f/67b50042f955574fca574a2234ba4af421e9268601bceb49efd9c43c6bc8/oracledb-3.4.1-cp311-cp311-win32.whl", hash = "sha256:cd80aa4c4dec7347c6d2909fbaf7e35a5253341ff2cb6f3782ab7ca712bf0405", size = 1488075, upload-time = "2025-11-12T03:21:54.704Z" }, + { url = "https://files.pythonhosted.org/packages/8d/14/bab071234d61e84c65712902dd0edec825d82b3198ffddc977c9ea9a91f3/oracledb-3.4.1-cp311-cp311-win_amd64.whl", hash = "sha256:5e01e8696009cec4ebcb9fe678b23b8223595dc186c065899660cac4c1fc189b", size = 1843449, upload-time = "2025-11-12T03:21:56.342Z" }, + { url = "https://files.pythonhosted.org/packages/f7/d9/98367ba2c358de366de70b505531f9717cdfa7e29eff0c9ad113eecfce96/oracledb-3.4.1-cp312-cp312-macosx_10_13_universal2.whl", hash = "sha256:1c3f92c023ef1983e0e7f9a1b4a31df8568974c28c06ab0a574b1126e45083a8", size = 4222133, upload-time = "2025-11-12T03:21:58.212Z" }, + { url = "https://files.pythonhosted.org/packages/36/52/48ad2f7dae6288a2ddf0ac536d46ce4883d2d10ec7e16afbbd48f1ec0ff3/oracledb-3.4.1-cp312-cp312-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:251211d64b90cc42d00ec2d2893873bc02ff4bc22125e9fc5a7f148a6208fd88", size = 2230374, upload-time = "2025-11-12T03:21:59.656Z" }, + { url = "https://files.pythonhosted.org/packages/8d/08/60d4301b4f72f099ed2252f8d0eb143e6fe9e5c8f4c2705c3163cea36808/oracledb-3.4.1-cp312-cp312-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ea529a5e6036fae3e2bc195fa76b6f48cd9c431e68c74ef78ee6a5e39c855c39", size = 2421755, upload-time = "2025-11-12T03:22:01.543Z" }, + { url = "https://files.pythonhosted.org/packages/48/35/412a90019a030f5dff0c031319733c6b8dd477832bafa88b733b4b3ec57b/oracledb-3.4.1-cp312-cp312-win32.whl", hash = "sha256:94e8e6d63b45fedd4e243147cb25dea1a0f6599d83852f3979fe725a8533e85a", size = 1449688, upload-time = "2025-11-12T03:22:03.422Z" }, + { url = "https://files.pythonhosted.org/packages/7b/01/ae9eca3055dc625923564ca653ca99ddd8eda95e44953ce55c18aba55066/oracledb-3.4.1-cp312-cp312-win_amd64.whl", hash = "sha256:84f15c483f9ec80dcded925df6ff473c69a293cd694d09b69abb911500659df4", size = 1794622, upload-time = "2025-11-12T03:22:04.941Z" }, + { url = "https://files.pythonhosted.org/packages/f0/4d/e32db901340dc6fc824d0d3b5e4660fe0199fba8adb0e81ac08b639c8ab9/oracledb-3.4.1-cp313-cp313-macosx_10_13_universal2.whl", hash = "sha256:ad817807b293e371c951af8ee67a56a5af88a5680a54fe79dfc7b9393ca128aa", size = 4206469, upload-time = "2025-11-12T03:22:06.881Z" }, + { url = "https://files.pythonhosted.org/packages/cf/68/1a038f29523eea19e42f4dd765bf523752408816b5ff21e8b998d8b25457/oracledb-3.4.1-cp313-cp313-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:34b9bc25eae217defa3f4b8289b4915cd1101aaeeec33c7bace74f927996d452", size = 2233055, upload-time = "2025-11-12T03:22:08.259Z" }, + { url = "https://files.pythonhosted.org/packages/b9/66/a51243553ac6b0e1bc2cfd4db8a2f3299b1b60c9231d7c9133ee1442d15b/oracledb-3.4.1-cp313-cp313-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:be6575759ba56ab3758f82bfbb74f75288ce69190e19c087793050cb012c0aa1", size = 2443312, upload-time = "2025-11-12T03:22:09.615Z" }, + { url = "https://files.pythonhosted.org/packages/f7/57/a6056d4432c07a959fd1032dd45bfaff69b91ac7e1204dbccf7bf7b4a91d/oracledb-3.4.1-cp313-cp313-win32.whl", hash = "sha256:635587e5f28be83ec0bf72e4bfb2f3a4544c0f8e303f2327f376d57116894541", size = 1453553, upload-time = "2025-11-12T03:22:11.045Z" }, + { url = "https://files.pythonhosted.org/packages/6a/57/dca415d8dd18a2a030a9402d49039493cdce6acfd37c8a038a4ede2328e6/oracledb-3.4.1-cp313-cp313-win_amd64.whl", hash = "sha256:354177708352e124c0f97ceccbe34be05e7f3ce7040a7dd3c2ebd857145ffe74", size = 1794005, upload-time = "2025-11-12T03:22:12.694Z" }, + { url = "https://files.pythonhosted.org/packages/59/07/dff7b9e6242b627d56f3fa6ad6639802003e1e5fbcc883d0ce27d82455ad/oracledb-3.4.1-cp314-cp314-macosx_10_15_universal2.whl", hash = "sha256:3ec1f9dd7310da7cbf219c2a05bb52df08da950c95ad2ace8a289854947bdc6b", size = 4247946, upload-time = "2025-11-12T03:22:14.473Z" }, + { url = "https://files.pythonhosted.org/packages/1f/95/739868c6f312683cc3afe9534644b4ce2d054fe137d8f7a1e7786df9f5aa/oracledb-3.4.1-cp314-cp314-manylinux2014_aarch64.manylinux_2_17_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:337a67d6c91015dfe7a2a1915f65c74adad26fcd428daaead296d91c92f09ad1", size = 2271628, upload-time = "2025-11-12T03:22:15.956Z" }, + { url = "https://files.pythonhosted.org/packages/fb/7c/307da513f5fb68e6454beb5bc1c715ec09a70d2af70a28b9fa6001c1b09b/oracledb-3.4.1-cp314-cp314-manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:9d5ffe4dd26e8012de433ec69f93be5737d81b04324072ec36dad37eb778fd9d", size = 2455603, upload-time = "2025-11-12T03:22:18.112Z" }, + { url = "https://files.pythonhosted.org/packages/c5/1a/af5bd7239cebfc33541432cfcba75893a3f2f44fa66648e6d8ce1fe96b0c/oracledb-3.4.1-cp314-cp314-win32.whl", hash = "sha256:693ef5f8c420545511096b3bc9a3861617222717321bc78c776afbbb6c16c5b9", size = 1474932, upload-time = "2025-11-12T03:22:19.574Z" }, + { url = "https://files.pythonhosted.org/packages/f1/ee/79d2ed18fd234bcbd407c1b36372dc898cf68de825ec650df7b1627acb51/oracledb-3.4.1-cp314-cp314-win_amd64.whl", hash = "sha256:6adb483d7120cdd056173b71c901f71dbe2265c5bd402f768b0b1ab27af519b1", size = 1837566, upload-time = "2025-11-12T03:22:20.959Z" }, +] + [[package]] name = "orjson" version = "3.11.4"