diff --git a/.github/workflows/validate-new-plugin-metadata.yml b/.github/workflows/validate-new-plugin-metadata.yml index 81312154..3ff3bb1b 100644 --- a/.github/workflows/validate-new-plugin-metadata.yml +++ b/.github/workflows/validate-new-plugin-metadata.yml @@ -16,7 +16,7 @@ jobs: uses: actions/checkout@v4 with: fetch-depth: 0 - ref: ${{ github.event.pull_request.head.ref }} + ref: ${{ github.event.pull_request.head.sha }} - name: Identify New Plugin Directories id: find_new_plugins diff --git a/plugins/RAGPinecone/README.md b/plugins/RAGPinecone/README.md new file mode 100644 index 00000000..0ddcd5c0 --- /dev/null +++ b/plugins/RAGPinecone/README.md @@ -0,0 +1,219 @@ +# RAGPinecone Plugin for GAME SDK + +A Retrieval Augmented Generation (RAG) plugin using Pinecone as the vector database for the GAME SDK. + +## Features + +- Query a knowledge base for relevant context +- Advanced hybrid search (vector + BM25) for better retrieval +- AI-generated answers based on retrieved documents +- Add documents to the knowledge base +- Delete documents from the knowledge base +- Chunk documents for better retrieval +- Process documents from a folder automatically +- Integrate with Telegram bot for RAG-powered conversations + +## Installation + +### From Source + +1. Clone the repository or navigate to the plugin directory: +```bash +cd game-python/plugins/RAGPinecone +``` + +2. Install the plugin in development mode: +```bash +pip install -e . +``` + +This will install all required dependencies and make the plugin available in your environment. + +## Setup and Configuration + +1. Set the following environment variables: + - `PINECONE_API_KEY`: Your Pinecone API key + - `OPENAI_API_KEY`: Your OpenAI API key (for embeddings) + - `GAME_API_KEY`: Your GAME API key + - `TELEGRAM_BOT_TOKEN`: Your Telegram bot token (if using with Telegram) + +2. Import and initialize the plugin to use in your agent: + +```python +from rag_pinecone_gamesdk.rag_pinecone_plugin import RAGPineconePlugin +from rag_pinecone_gamesdk.rag_pinecone_game_functions import query_knowledge_fn, add_document_fn + +# Initialize the plugin +rag_plugin = RAGPineconePlugin( + pinecone_api_key="your-pinecone-api-key", + openai_api_key="your-openai-api-key", + index_name="your-index-name", + namespace="your-namespace" +) + +# Add the functions to your agent's action space +agent_action_space = [ + query_knowledge_fn(rag_plugin), + add_document_fn(rag_plugin), + # ... other functions +] +``` + +## Available Functions + +### Basic RAG Functions + +1. `query_knowledge(query: str, num_results: int = 3)` - Query the knowledge base for relevant context +2. `add_document(content: str, metadata: dict = None)` - Add a document to the knowledge base + +### Advanced RAG Functions + +1. `advanced_query_knowledge(query: str)` - Query the knowledge base using hybrid retrieval (vector + BM25) and get an AI-generated answer +2. `get_relevant_documents(query: str)` - Get relevant documents using hybrid retrieval without generating an answer + +Example usage of advanced functions: + +```python +from rag_pinecone_gamesdk.search_rag import RAGSearcher +from rag_pinecone_gamesdk.rag_pinecone_game_functions import advanced_query_knowledge_fn, get_relevant_documents_fn + +# Initialize the RAG searcher +rag_searcher = RAGSearcher( + pinecone_api_key="your-pinecone-api-key", + openai_api_key="your-openai-api-key", + index_name="your-index-name", + namespace="your-namespace" +) + +# Add the advanced functions to your agent's action space +agent_action_space = [ + advanced_query_knowledge_fn(rag_searcher), + get_relevant_documents_fn(rag_searcher), + # ... other functions +] +``` + +## Populating the Knowledge Base + +### Using the Documents Folder + +The easiest way to populate the knowledge base is to place your documents in the `Documents` folder and run the provided script: + +```bash +cd game-python/plugins/RAGPinecone +python examples/populate_knowledge_base.py +``` + +This will process all supported files in the Documents folder and add them to the knowledge base. + +Supported file types: +- `.txt` - Text files +- `.pdf` - PDF documents +- `.docx` - Word documents +- `.doc` - Word documents +- `.csv` - CSV files +- `.md` - Markdown files +- `.html` - HTML files + +### Using the API + +You can also populate the knowledge base programmatically: + +```python +from rag_pinecone_gamesdk.populate_rag import RAGPopulator + +# Initialize the populator +populator = RAGPopulator( + pinecone_api_key="your-pinecone-api-key", + openai_api_key="your-openai-api-key", + index_name="your-index-name", + namespace="your-namespace" +) + +# Add a document +content = "Your document content here" +metadata = { + "title": "Document Title", + "author": "Author Name", + "source": "Source Name", +} + +status, message, results = populator.add_document(content, metadata) +print(f"Status: {status}") +print(f"Message: {message}") +print(f"Results: {results}") + +# Process all documents in a folder +status, message, results = populator.process_documents_folder() +print(f"Status: {status}") +print(f"Message: {message}") +print(f"Processed {results.get('total_files', 0)} files, {results.get('successful_files', 0)} successful") +``` + +## Testing the Advanced Search + +You can test the advanced search functionality using the provided example script: + +```bash +cd game-python/plugins/RAGPinecone +python examples/test_advanced_search.py +``` + +This will run a series of test queries using the advanced hybrid retrieval system. + +## Integration with Telegram + +See the `examples/test_rag_pinecone_telegram.py` file for an example of how to integrate the RAGPinecone plugin with a Telegram bot. + +To run the Telegram bot with advanced RAG capabilities: + +```bash +cd game-python/plugins/RAGPinecone +python examples/test_rag_pinecone_telegram.py +``` + +## Advanced Usage + +### Hybrid Retrieval + +The advanced search functionality uses a hybrid retrieval approach that combines: + +1. **Vector Search**: Uses embeddings to find semantically similar documents +2. **BM25 Search**: Uses keyword matching to find documents with relevant terms + +This hybrid approach often provides better results than either method alone, especially for complex queries. + +### Custom Document Processing + +You can customize how documents are processed by extending the `RAGPopulator` class: + +```python +from rag_pinecone_gamesdk.populate_rag import RAGPopulator + +class CustomRAGPopulator(RAGPopulator): + def chunk_document(self, content, metadata): + # Custom chunking logic + # ... + return chunked_docs +``` + +### Custom Embedding Models + +You can use different embedding models by specifying the `embedding_model` parameter: + +```python +rag_plugin = RAGPineconePlugin( + embedding_model="sentence-transformers/all-mpnet-base-v2" +) +``` + +## Requirements + +- Python 3.9+ +- Pinecone account +- OpenAI API key +- GAME SDK +- langchain +- langchain_community +- langchain_pinecone +- langchain_openai \ No newline at end of file diff --git a/plugins/RAGPinecone/examples/populate_knowledge_base.py b/plugins/RAGPinecone/examples/populate_knowledge_base.py new file mode 100644 index 00000000..079fd1c6 --- /dev/null +++ b/plugins/RAGPinecone/examples/populate_knowledge_base.py @@ -0,0 +1,142 @@ +import os +import logging +import tempfile +import requests +import re +from dotenv import load_dotenv +import gdown + +from rag_pinecone_gamesdk.populate_rag import RAGPopulator +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def download_from_google_drive(folder_url, download_folder): + """ + Download all files from a Google Drive folder + + Args: + folder_url: URL of the Google Drive folder + download_folder: Local folder to download files to + + Returns: + List of downloaded file paths + """ + logger.info(f"Downloading files from Google Drive folder: {folder_url}") + + # Extract folder ID from URL + folder_id_match = re.search(r'folders/([a-zA-Z0-9_-]+)', folder_url) + if not folder_id_match: + logger.error(f"Could not extract folder ID from URL: {folder_url}") + return [] + + folder_id = folder_id_match.group(1) + logger.info(f"Folder ID: {folder_id}") + + # Create download folder if it doesn't exist + os.makedirs(download_folder, exist_ok=True) + + # Download all files in the folder + try: + # Use gdown to download all files in the folder + downloaded_files = gdown.download_folder( + id=folder_id, + output=download_folder, + quiet=False, + use_cookies=False + ) + + if not downloaded_files: + logger.warning("No files were downloaded from Google Drive") + return [] + + logger.info(f"Downloaded {len(downloaded_files)} files from Google Drive") + return downloaded_files + + except Exception as e: + logger.error(f"Error downloading files from Google Drive: {str(e)}") + return [] + +def main(): + # Load environment variables + load_dotenv() + + # Check for required environment variables + pinecone_api_key = os.environ.get("PINECONE_API_KEY") + openai_api_key = os.environ.get("OPENAI_API_KEY") + + if not pinecone_api_key: + logger.error("PINECONE_API_KEY environment variable is not set") + return + + if not openai_api_key: + logger.error("OPENAI_API_KEY environment variable is not set") + return + + # Google Drive folder URL + google_drive_url = "https://drive.google.com/drive/folders/1dKYDQxenDkthF0MPr-KOsdPNqEmrAq1c?usp=sharing" + + # Create a temporary directory for downloaded files + with tempfile.TemporaryDirectory() as temp_dir: + logger.info(f"Created temporary directory for downloaded files: {temp_dir}") + + # Download files from Google Drive + downloaded_files = download_from_google_drive(google_drive_url, temp_dir) + + if not downloaded_files: + logger.error("No files were downloaded from Google Drive. Exiting.") + return + + # Get the Documents folder path for local processing + documents_folder = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "Documents" + ) + + # Ensure the Documents folder exists + if not os.path.exists(documents_folder): + os.makedirs(documents_folder) + logger.info(f"Created Documents folder at: {documents_folder}") + + # Initialize the RAGPopulator + logger.info("Initializing RAGPopulator...") + populator = RAGPopulator( + pinecone_api_key=pinecone_api_key, + openai_api_key=openai_api_key, + index_name=DEFAULT_INDEX_NAME, + namespace=DEFAULT_NAMESPACE, + documents_folder=temp_dir, # Use the temp directory with downloaded files + ) + + # Process all documents in the temporary folder + logger.info(f"Processing downloaded documents from: {temp_dir}") + status, message, results = populator.process_documents_folder() + + # Log the results + logger.info(f"Status: {status}") + logger.info(f"Message: {message}") + logger.info(f"Processed {results.get('total_files', 0)} files, {results.get('successful_files', 0)} successful") + + # Get all document IDs + ids = populator.fetch_all_ids() + logger.info(f"Total vectors in database: {len(ids)}") + + # Print detailed results for each file + if 'results' in results: + logger.info("\nDetailed results:") + for result in results['results']: + file_path = result.get('file_path', 'Unknown file') + status = result.get('status', 'Unknown status') + message = result.get('message', 'No message') + logger.info(f"File: {os.path.basename(file_path)}") + logger.info(f"Status: {status}") + logger.info(f"Message: {message}") + logger.info("---") + +if __name__ == "__main__": + main() diff --git a/plugins/RAGPinecone/examples/test_advanced_search.py b/plugins/RAGPinecone/examples/test_advanced_search.py new file mode 100644 index 00000000..3e427401 --- /dev/null +++ b/plugins/RAGPinecone/examples/test_advanced_search.py @@ -0,0 +1,74 @@ +import os +import logging +from dotenv import load_dotenv + +from rag_pinecone_gamesdk.search_rag import RAGSearcher +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", +) +logger = logging.getLogger(__name__) + +def main(): + # Load environment variables + load_dotenv() + + # Check for required environment variables + pinecone_api_key = os.environ.get("PINECONE_API_KEY") + openai_api_key = os.environ.get("OPENAI_API_KEY") + + if not pinecone_api_key: + logger.error("PINECONE_API_KEY environment variable is not set") + return + + if not openai_api_key: + logger.error("OPENAI_API_KEY environment variable is not set") + return + + # Initialize the RAG searcher + logger.info("Initializing RAG searcher...") + searcher = RAGSearcher( + pinecone_api_key=pinecone_api_key, + openai_api_key=openai_api_key, + index_name=DEFAULT_INDEX_NAME, + namespace=DEFAULT_NAMESPACE, + llm_model="gpt-4", # You can change this to "gpt-3.5-turbo" for faster, cheaper responses + temperature=0.0, + k=4 # Number of documents to retrieve + ) + + # Test queries + test_queries = [ + "How do I build a custom function?", + "How can I contribute plugins to the GAME SDK?", + "How do I deploy my AI application?", + ] + + # Run test queries + for query in test_queries: + logger.info(f"\n\n=== Testing query: '{query}' ===") + + # Get AI-generated answer with hybrid retrieval + logger.info("Getting AI-generated answer with hybrid retrieval...") + status, message, results = searcher.query(query) + logger.info(f"Status: {status}") + logger.info(f"Answer: {message}") + logger.info(f"Source documents: {len(results.get('source_documents', []))}") + + # Get relevant documents only + logger.info("\nGetting relevant documents only...") + status, message, results = searcher.get_relevant_documents(query) + logger.info(f"Status: {status}") + logger.info(f"Found {len(results.get('results', []))} relevant documents") + + # Print first document preview + if results.get('results'): + first_doc = results['results'][0] + content_preview = first_doc['content'][:100] + "..." if len(first_doc['content']) > 100 else first_doc['content'] + logger.info(f"First document preview: {content_preview}") + +if __name__ == "__main__": + main() diff --git a/plugins/RAGPinecone/examples/test_rag_pinecone_telegram.py b/plugins/RAGPinecone/examples/test_rag_pinecone_telegram.py new file mode 100644 index 00000000..d437dca6 --- /dev/null +++ b/plugins/RAGPinecone/examples/test_rag_pinecone_telegram.py @@ -0,0 +1,165 @@ +import os +from typing import TypedDict +import logging + +from telegram import Update +from telegram.ext import ContextTypes, filters, MessageHandler + +from game_sdk.game.chat_agent import Chat, ChatAgent +from telegram_plugin_gamesdk.telegram_plugin import TelegramPlugin +# Import RAG components +from rag_pinecone_gamesdk.rag_pinecone_plugin import RAGPineconePlugin +from rag_pinecone_gamesdk.rag_pinecone_game_functions import query_knowledge_fn, add_document_fn +from rag_pinecone_gamesdk.search_rag import RAGSearcher +from rag_pinecone_gamesdk.rag_pinecone_game_functions import ( + advanced_query_knowledge_fn, get_relevant_documents_fn +) +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE + +import sys +sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '../../telegram/examples'))) +from test_telegram_game_functions import send_message_fn, send_media_fn, create_poll_fn, pin_message_fn, unpin_message_fn, delete_message_fn +from dotenv import load_dotenv + +load_dotenv() + + +game_api_key = os.environ.get("GAME_API_KEY") +telegram_bot_token = os.environ.get("TELEGRAM_BOT_TOKEN") +# Add RAG environment variables +pinecone_api_key = os.environ.get("PINECONE_API_KEY") +openai_api_key = os.environ.get("OPENAI_API_KEY") + +# Print environment variable status +print(f"GAME API Key: {'✓' if game_api_key else '✗'}") +print(f"Telegram Bot Token: {'✓' if telegram_bot_token else '✗'}") +print(f"Pinecone API Key: {'✓' if pinecone_api_key else '✗'}") +print(f"OpenAI API Key: {'✓' if openai_api_key else '✗'}") + +logging.basicConfig( + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + level=logging.INFO, +) + +logger = logging.getLogger(__name__) + +class ActiveUser(TypedDict): + chat_id: int + name: str + +# Update the prompt to include RAG capabilities +chat_agent = ChatAgent( + prompt="""You are VIN (Virtuals Digital Intern). You are a helpful assistant with access to a knowledge base through RAG (Retrieval-Augmented Generation) capabilities. + +""", + api_key=game_api_key, +) + +active_users: list[ActiveUser] = [] +active_chats: dict[int, Chat] = {} + +if __name__ == "__main__": + tg_plugin = TelegramPlugin(bot_token=telegram_bot_token) + + # Initialize RAG plugins + rag_plugin = RAGPineconePlugin( + pinecone_api_key=pinecone_api_key, + openai_api_key=openai_api_key, + index_name=DEFAULT_INDEX_NAME, + namespace=DEFAULT_NAMESPACE + ) + + # Initialize advanced RAG searcher + rag_searcher = RAGSearcher( + pinecone_api_key=pinecone_api_key, + openai_api_key=openai_api_key, + index_name=DEFAULT_INDEX_NAME, + namespace=DEFAULT_NAMESPACE, + llm_model="gpt-4o-mini", # You can change this to "gpt-3.5-turbo" for faster, cheaper responses + temperature=0.0, + k=4 # Number of documents to retrieve + ) + + # Add RAG functions to the action space + agent_action_space = [ + # Telegram functions + send_message_fn(tg_plugin), + send_media_fn(tg_plugin), + create_poll_fn(tg_plugin), + pin_message_fn(tg_plugin), + unpin_message_fn(tg_plugin), + delete_message_fn(tg_plugin), + + # RAG functions + query_knowledge_fn(rag_plugin), + add_document_fn(rag_plugin), + advanced_query_knowledge_fn(rag_searcher), + get_relevant_documents_fn(rag_searcher), + ] + + async def default_message_handler(update: Update, context: ContextTypes.DEFAULT_TYPE): + """Handles incoming messages but ignores messages from the bot itself unless it's mentioned in a group chat.""" + + # Ignore messages from the bot itself + if update.message.from_user.id == tg_plugin.bot.id: + logger.info("Ignoring bot's own message.") + return + + user = update.message.from_user + chat_id = update.message.chat.id + chat_type = update.message.chat.type # "private", "group", "supergroup", or "channel" + bot_username = f"@{tg_plugin.bot.username}" + + logger.info(f"Update received: {update}") + logger.info(f"Message received: {update.message.text}") + + name = f"{user.first_name} (Telegram's chat_id: {chat_id}, this is not part of the partner's name but important for the telegram's function arguments)" + + # Handle non-text messages + if update.message.text is None: + logger.info("Received a non-text message, skipping processing") + return + + # For group chats, only respond when the bot is mentioned or when it's a direct reply to the bot's message + if chat_type in ["group", "supergroup"]: + if (bot_username not in update.message.text and + (update.message.reply_to_message is None or + update.message.reply_to_message.from_user.id != tg_plugin.bot.id)): + logger.info("Ignoring group message not mentioning or replying to the bot") + return + + if not any(u["chat_id"] == chat_id for u in active_users): + active_users.append({"chat_id": chat_id, "name": name}) + logger.info(f"Active user added: {name}") + logger.info(f"Active users: {active_users}") + chat = chat_agent.create_chat( + partner_id=str(chat_id), + partner_name=name, + action_space=agent_action_space, + ) + active_chats[chat_id] = chat + + try: + response = active_chats[chat_id].next(update.message.text.replace(bot_username, "").strip()) # Remove bot mention + logger.info(f"Response: {response}") + + if response.message: + await update.message.reply_text(response.message) + + if response.is_finished: + active_chats.pop(chat_id) + active_users.remove({"chat_id": chat_id, "name": name}) + logger.info(f"Chat with {name} ended.") + logger.info(f"Active users: {active_users}") + except Exception as e: + logger.error(f"Error processing message: {e}", exc_info=True) + await update.message.reply_text("I'm sorry, something went wrong. Please try again later.") + + tg_plugin.add_handler(MessageHandler(filters.ALL, default_message_handler)) + + # Start polling + print("Starting Telegram bot with RAG capabilities...") + tg_plugin.start_polling() + + # Example of executing a function from Telegram Plugin to a chat without polling + #tg_plugin.send_message(chat_id=829856292, text="Hello! I am a helpful assistant. How can I assist you today?") diff --git a/plugins/RAGPinecone/plugin_metadata.yml b/plugins/RAGPinecone/plugin_metadata.yml new file mode 100644 index 00000000..8ad20dce --- /dev/null +++ b/plugins/RAGPinecone/plugin_metadata.yml @@ -0,0 +1,14 @@ +# General Information +plugin_name: "rag_pinecone_gamesdk" +author: "Michiel Voortman" +logo_url: "" +release_date: "2025-03" + +# Description +short_description: "RAG (Retrieval Augmented Generation) Plugin with Pinecone for GAME SDK" +detailed_description: "This plugin provides Retrieval Augmented Generation capabilities using Pinecone as the vector database for the GAME SDK. It allows agents to store, retrieve, and use contextual information to enhance their responses." + +# Contact & Support +x_account_handle: "@VoortmanMichiel" +support_contact: "eve@eve-protocol.ai" +community_link: "https://t.me/eveprotocol" diff --git a/plugins/RAGPinecone/pyproject.toml b/plugins/RAGPinecone/pyproject.toml new file mode 100644 index 00000000..38804a53 --- /dev/null +++ b/plugins/RAGPinecone/pyproject.toml @@ -0,0 +1,32 @@ +[project] +name = "rag-pinecone-gamesdk" +version = "0.1.0" +description = "RAG (Retrieval Augmented Generation) Plugin with Pinecone for GAME SDK" +authors = [ + {name = "Your Name", email = "your.email@example.com"} +] +readme = "README.md" +requires-python = ">=3.9" +dependencies = [ + "pinecone-client>=2.2.1", + "langchain>=0.0.267", + "langchain-community>=0.0.1", + "langchain-pinecone>=0.0.1", + "langchain-openai>=0.0.2", + "openai>=1.1.1", + "python-dotenv>=1.0.0", + "unstructured>=0.10.0", + "pdf2image>=1.16.3", + "pytesseract>=0.3.10", + "docx2txt>=0.8", + "pandas>=2.0.0", + "beautifulsoup4>=4.12.0", + "markdown>=3.4.3", + "rank_bm25>=0.2.2", + "spacy>=3.0.0", + "gdown", +] + +[build-system] +requires = ["poetry-core>=2.0.0,<3.0.0"] +build-backend = "poetry.core.masonry.api" diff --git a/plugins/RAGPinecone/rag_pinecone_gamesdk/__init__.py b/plugins/RAGPinecone/rag_pinecone_gamesdk/__init__.py new file mode 100644 index 00000000..40b00ad2 --- /dev/null +++ b/plugins/RAGPinecone/rag_pinecone_gamesdk/__init__.py @@ -0,0 +1,10 @@ +""" +RAGPinecone plugin for GAME SDK - Retrieval Augmented Generation using Pinecone +""" + +__version__ = "0.1.0" + +# Default configuration values +DEFAULT_INDEX_NAME = "rag-pinecone-docs" +DEFAULT_NAMESPACE = "default" +DEFAULT_EMBEDDING_MODEL = "text-embedding-ada-002" diff --git a/plugins/RAGPinecone/rag_pinecone_gamesdk/populate_rag.py b/plugins/RAGPinecone/rag_pinecone_gamesdk/populate_rag.py new file mode 100644 index 00000000..ee1a174b --- /dev/null +++ b/plugins/RAGPinecone/rag_pinecone_gamesdk/populate_rag.py @@ -0,0 +1,570 @@ +import os +import logging +import asyncio +from typing import List, Dict, Any, Optional, Tuple +from datetime import datetime, timezone +import hashlib +import glob +import pathlib + +from pinecone import Pinecone, ServerlessSpec +from langchain_openai import OpenAIEmbeddings +from langchain_pinecone import PineconeVectorStore +from langchain.text_splitter import RecursiveCharacterTextSplitter +from langchain.schema import Document +from langchain_community.document_loaders import ( + TextLoader, + PyPDFLoader, + Docx2txtLoader, + CSVLoader, + UnstructuredMarkdownLoader, + UnstructuredHTMLLoader, +) + +from game_sdk.game.custom_types import Function, FunctionResultStatus, Argument +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE, DEFAULT_EMBEDDING_MODEL + +logger = logging.getLogger(__name__) + +class RAGPopulator: + """ + Utility class for populating the RAG knowledge base with documents. + """ + + def __init__( + self, + pinecone_api_key: Optional[str] = os.environ.get("PINECONE_API_KEY"), + openai_api_key: Optional[str] = os.environ.get("OPENAI_API_KEY"), + index_name: str = DEFAULT_INDEX_NAME, + namespace: str = DEFAULT_NAMESPACE, + embedding_model: str = DEFAULT_EMBEDDING_MODEL, + documents_folder: Optional[str] = None, + ): + self.pinecone_api_key = pinecone_api_key + self.openai_api_key = openai_api_key + self.index_name = index_name + self.namespace = namespace + self.embedding_model = embedding_model + + # Set documents folder path + if documents_folder is None: + # Default to Documents folder in the plugin directory + self.documents_folder = os.path.join( + os.path.dirname(os.path.dirname(os.path.abspath(__file__))), + "Documents" + ) + else: + self.documents_folder = documents_folder + + # Ensure the documents folder exists + os.makedirs(self.documents_folder, exist_ok=True) + + # Initialize Pinecone client + self.pc = Pinecone(api_key=self.pinecone_api_key) + + # Create index if it doesn't exist + if self.index_name not in self.pc.list_indexes().names(): + logger.info(f"Creating index '{self.index_name}'...") + self.pc.create_index( + name=self.index_name, + dimension=1536, + metric="dotproduct", + spec=ServerlessSpec( + cloud="aws", + region="us-east-1" + ) + ) + logger.info("Index created!") + + # Initialize embeddings + self.embeddings = OpenAIEmbeddings( + model=self.embedding_model, + openai_api_key=self.openai_api_key + ) + + # Initialize vector store + self.vector_store = PineconeVectorStore.from_existing_index( + index_name=self.index_name, + embedding=self.embeddings, + namespace=self.namespace + ) + + # Initialize text splitter + self.text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1000, + chunk_overlap=200, + length_function=len, + ) + + # File type to loader mapping + self.file_loaders = { + ".txt": TextLoader, + ".pdf": PyPDFLoader, + ".docx": Docx2txtLoader, + ".doc": Docx2txtLoader, + ".csv": CSVLoader, + ".md": UnstructuredMarkdownLoader, + ".html": UnstructuredHTMLLoader, + ".htm": UnstructuredHTMLLoader, + } + + def chunk_document(self, content: str, metadata: Dict[str, Any]) -> List[Document]: + """ + Split a document into chunks for embedding. + + Args: + content: The document content + metadata: Metadata for the document + + Returns: + List of Document objects + """ + # Split text into chunks + texts = self.text_splitter.split_text(content) + + # Create documents with metadata + documents = [] + for i, chunk in enumerate(texts): + # Create a unique chunk ID + chunk_id = f"{metadata.get('doc_id', 'doc')}_{i}" + + # Create chunk metadata + chunk_metadata = metadata.copy() + chunk_metadata["chunk_id"] = chunk_id + chunk_metadata["chunk_index"] = i + chunk_metadata["total_chunks"] = len(texts) + chunk_metadata["last_updated"] = datetime.now(timezone.utc).isoformat() + + # Create document + doc = Document( + page_content=chunk, + metadata=chunk_metadata + ) + documents.append(doc) + + return documents + + def add_document(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Add a document to the knowledge base. + + Args: + content: The document content + metadata: Metadata for the document + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Initialize metadata if None + metadata = metadata or {} + + # Generate a document ID if not provided + if "doc_id" not in metadata: + doc_hash = hashlib.md5(content.encode()).hexdigest() + metadata["doc_id"] = f"doc_{doc_hash}" + + # Add timestamp if not provided + if "timestamp" not in metadata: + metadata["timestamp"] = datetime.now(timezone.utc).isoformat() + + # Chunk the document + chunked_docs = self.chunk_document(content, metadata) + + # Generate IDs for each chunk + chunk_ids = [doc.metadata["chunk_id"] for doc in chunked_docs] + + # Add documents to vector store + self.vector_store.add_documents(chunked_docs, ids=chunk_ids) + + return ( + FunctionResultStatus.DONE, + f"Document added successfully with {len(chunked_docs)} chunks", + { + "doc_id": metadata["doc_id"], + "num_chunks": len(chunked_docs), + "metadata": metadata + } + ) + except Exception as e: + logger.error(f"Error adding document: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error adding document: {str(e)}", + { + "content_preview": content[:100] + "..." if len(content) > 100 else content + } + ) + + def add_file(self, file_path: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Add a file to the knowledge base. + + Args: + file_path: Path to the file + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Get file extension + file_ext = os.path.splitext(file_path)[1].lower() + + # Check if file type is supported + if file_ext not in self.file_loaders: + return ( + FunctionResultStatus.FAILED, + f"Unsupported file type: {file_ext}", + {"file_path": file_path} + ) + + # Get appropriate loader + loader_class = self.file_loaders[file_ext] + + # Load the document + loader = loader_class(file_path) + documents = loader.load() + + # Get file metadata + file_stats = os.stat(file_path) + file_name = os.path.basename(file_path) + + # Process each document + total_chunks = 0 + doc_ids = [] + + for doc in documents: + # Create metadata + metadata = { + "source": file_path, + "filename": file_name, + "filetype": file_ext, + "file_size": file_stats.st_size, + "created_at": datetime.fromtimestamp(file_stats.st_ctime, tz=timezone.utc).isoformat(), + "modified_at": datetime.fromtimestamp(file_stats.st_mtime, tz=timezone.utc).isoformat(), + } + + # Add document + status, _, results = self.add_document(doc.page_content, metadata) + + if status == FunctionResultStatus.DONE: + total_chunks += results.get("num_chunks", 0) + doc_ids.append(results.get("doc_id")) + + return ( + FunctionResultStatus.DONE, + f"File '{file_name}' added successfully with {total_chunks} chunks", + { + "file_path": file_path, + "doc_ids": doc_ids, + "total_chunks": total_chunks + } + ) + except Exception as e: + logger.error(f"Error adding file: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error adding file: {str(e)}", + {"file_path": file_path} + ) + + def process_documents_folder(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Process all documents in the documents folder. + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Get all files in the documents folder + all_files = [] + for ext in self.file_loaders.keys(): + pattern = os.path.join(self.documents_folder, f"**/*{ext}") + all_files.extend(glob.glob(pattern, recursive=True)) + + if not all_files: + return ( + FunctionResultStatus.DONE, + f"No supported files found in {self.documents_folder}", + {"documents_folder": self.documents_folder} + ) + + # Process each file + results = [] + for file_path in all_files: + logger.info(f"Processing file: {file_path}") + status, message, result = self.add_file(file_path) + results.append({ + "file_path": file_path, + "status": status, + "message": message, + "result": result + }) + logger.info(message) + + # Count successful files + successful_files = sum(1 for r in results if r["status"] == FunctionResultStatus.DONE) + + return ( + FunctionResultStatus.DONE, + f"Processed {len(results)} files, {successful_files} successful", + { + "documents_folder": self.documents_folder, + "total_files": len(results), + "successful_files": successful_files, + "results": results + } + ) + except Exception as e: + logger.error(f"Error processing documents folder: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error processing documents folder: {str(e)}", + {"documents_folder": self.documents_folder} + ) + + def delete_document(self, doc_id: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Delete a document from the knowledge base. + + Args: + doc_id: The document ID to delete + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Fetch documents with the given doc_id + filter_criteria = {"doc_id": doc_id} + matching_docs = self.vector_store.similarity_search( + query="", + k=1000, + filter=filter_criteria, + namespace=self.namespace + ) + + # Extract chunk IDs + chunk_ids = [doc.metadata["chunk_id"] for doc in matching_docs if "chunk_id" in doc.metadata] + + if not chunk_ids: + return ( + FunctionResultStatus.FAILED, + f"No chunks found for document ID: {doc_id}", + {"doc_id": doc_id} + ) + + # Delete chunks + self.vector_store.delete(ids=chunk_ids) + + return ( + FunctionResultStatus.DONE, + f"Document deleted successfully. Removed {len(chunk_ids)} chunks.", + { + "doc_id": doc_id, + "num_chunks_deleted": len(chunk_ids) + } + ) + except Exception as e: + logger.error(f"Error deleting document: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error deleting document: {str(e)}", + {"doc_id": doc_id} + ) + + def get_document_count(self) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Get the number of documents in the knowledge base. + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # This is a simplified approach - in a real implementation, + # you would need to query Pinecone for the actual count + # For now, we'll just return a placeholder + return ( + FunctionResultStatus.DONE, + "Document count retrieved successfully", + { + "count": "Unknown - Pinecone doesn't provide a direct count API. Use fetch_all_ids to get IDs." + } + ) + except Exception as e: + logger.error(f"Error getting document count: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error getting document count: {str(e)}", + {} + ) + + def fetch_all_ids(self) -> List[str]: + """ + Fetch all document IDs from the knowledge base. + + Returns: + List of document IDs + """ + index = self.pc.Index(self.index_name) + ids = [] + for id_batch in index.list(namespace=self.namespace): + ids.extend(id_batch) + return ids + + +# Function wrappers for GAME SDK integration + +def process_documents_folder_executable(populator: RAGPopulator) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the process_documents_folder function. + + Args: + populator: The RAGPopulator instance + + Returns: + Tuple containing status, message, and results dictionary + """ + return populator.process_documents_folder() + + +def process_documents_folder_fn(populator: RAGPopulator) -> Function: + """ + Create a GAME Function for processing the documents folder. + + Args: + populator: The RAGPopulator instance + + Returns: + Function object + """ + return Function( + fn_name="process_documents_folder", + fn_description="Process all documents in the documents folder and add them to the knowledge base", + args=[], + executable=lambda: process_documents_folder_executable(populator), + ) + + +def add_document_executable(populator: RAGPopulator, content: str, metadata: Optional[Dict[str, Any]] = None) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the add_document function. + + Args: + populator: The RAGPopulator instance + content: The document content + metadata: Metadata for the document + + Returns: + Tuple containing status, message, and results dictionary + """ + return populator.add_document(content, metadata) + + +def add_document_fn(populator: RAGPopulator) -> Function: + """ + Create a GAME Function for adding a document. + + Args: + populator: The RAGPopulator instance + + Returns: + Function object + """ + return Function( + fn_name="add_document", + fn_description="Add a document to the RAG knowledge base", + args=[ + Argument(name="content", description="The document content", type="str"), + Argument(name="metadata", description="Metadata for the document", type="dict", optional=True), + ], + executable=lambda content, metadata=None: add_document_executable(populator, content, metadata), + ) + + +def add_file_executable(populator: RAGPopulator, file_path: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the add_file function. + + Args: + populator: The RAGPopulator instance + file_path: Path to the file + + Returns: + Tuple containing status, message, and results dictionary + """ + return populator.add_file(file_path) + + +def add_file_fn(populator: RAGPopulator) -> Function: + """ + Create a GAME Function for adding a file. + + Args: + populator: The RAGPopulator instance + + Returns: + Function object + """ + return Function( + fn_name="add_file", + fn_description="Add a file to the RAG knowledge base", + args=[ + Argument(name="file_path", description="Path to the file", type="str"), + ], + executable=lambda file_path: add_file_executable(populator, file_path), + ) + + +def delete_document_executable(populator: RAGPopulator, doc_id: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the delete_document function. + + Args: + populator: The RAGPopulator instance + doc_id: The document ID to delete + + Returns: + Tuple containing status, message, and results dictionary + """ + return populator.delete_document(doc_id) + + +def delete_document_fn(populator: RAGPopulator) -> Function: + """ + Create a GAME Function for deleting a document. + + Args: + populator: The RAGPopulator instance + + Returns: + Function object + """ + return Function( + fn_name="delete_document", + fn_description="Delete a document from the RAG knowledge base", + args=[ + Argument(name="doc_id", description="The document ID to delete", type="str"), + ], + executable=lambda doc_id: delete_document_executable(populator, doc_id), + ) + + +# Example usage +if __name__ == "__main__": + # Load environment variables + from dotenv import load_dotenv + load_dotenv() + + # Initialize populator + populator = RAGPopulator() + + # Process all documents in the Documents folder + print("Processing documents folder...") + status, message, results = populator.process_documents_folder() + print(f"Status: {status}") + print(f"Message: {message}") + print(f"Processed {results.get('total_files', 0)} files, {results.get('successful_files', 0)} successful") + + # Get all document IDs + ids = populator.fetch_all_ids() + print(f"Total vectors in database: {len(ids)}") diff --git a/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_game_functions.py b/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_game_functions.py new file mode 100644 index 00000000..92167a4d --- /dev/null +++ b/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_game_functions.py @@ -0,0 +1,171 @@ +from typing import Tuple, Dict, Any, Optional + +from game_sdk.game.custom_types import Function, FunctionResultStatus, Argument +from rag_pinecone_gamesdk.rag_pinecone_plugin import RAGPineconePlugin +from rag_pinecone_gamesdk.search_rag import RAGSearcher + + +def query_knowledge_executable(rag_plugin: RAGPineconePlugin, query: str, num_results: int = 3) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the query_knowledge function from the RAG plugin. + + Args: + rag_plugin: The RAGPineconePlugin instance + query: The query to search for + num_results: Number of relevant documents to retrieve + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + status, message, results = rag_plugin.query_knowledge(query, num_results) + + # Format the results for better readability in chat + formatted_results = [] + for i, result in enumerate(results.get("results", [])): + content = result.get("content", "") + metadata = result.get("metadata", {}) + + # Format metadata for display + metadata_str = ", ".join([f"{k}: {v}" for k, v in metadata.items() + if k not in ["chunk_id", "file_fingerprint"]]) + + formatted_results.append(f"Document {i+1}:\n{content}\n\nSource: {metadata_str}\n") + + formatted_message = f"Found {len(formatted_results)} relevant documents for query: '{query}'\n\n" + formatted_message += "\n---\n".join(formatted_results) + + return FunctionResultStatus.DONE, formatted_message, results + except Exception as e: + return FunctionResultStatus.FAILED, f"Error querying knowledge base: {str(e)}", {"query": query} + + +def query_knowledge_fn(rag_plugin: RAGPineconePlugin) -> Function: + """ + Create a GAME Function for querying the knowledge base. + + Args: + rag_plugin: The RAGPineconePlugin instance + + Returns: + Function object for the query_knowledge function + """ + return Function( + fn_name="query_knowledge", + fn_description="Query the RAG knowledge base for relevant context", + args=[ + Argument(name="query", description="The query to find relevant context for", type="str"), + Argument(name="num_results", description="Number of relevant documents to retrieve (default: 3)", type="int", optional=True), + ], + executable=lambda query, num_results=3: query_knowledge_executable(rag_plugin, query, num_results), + ) + + +def add_document_executable(rag_plugin: RAGPineconePlugin, content: str, metadata: Optional[Dict[str, Any]] = None) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the add_document function from the RAG plugin. + + Args: + rag_plugin: The RAGPineconePlugin instance + content: The text content to add + metadata: Optional metadata about the document + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + status, message, results = rag_plugin.add_document(content, metadata) + return status, message, results + except Exception as e: + return FunctionResultStatus.FAILED, f"Error adding document: {str(e)}", {"content": content[:100] + "..." if len(content) > 100 else content} + + +def add_document_fn(rag_plugin: RAGPineconePlugin) -> Function: + """ + Create a GAME Function for adding a document to the knowledge base. + + Args: + rag_plugin: The RAGPineconePlugin instance + + Returns: + Function object for the add_document function + """ + return Function( + fn_name="add_document", + fn_description="Add a document to the RAG knowledge base", + args=[ + Argument(name="content", description="The text content to add to the knowledge base", type="str"), + Argument(name="metadata", description="Optional metadata about the document", type="dict", optional=True), + ], + executable=lambda content, metadata=None: add_document_executable(rag_plugin, content, metadata), + ) + + +# Advanced RAG search functions + +def advanced_query_knowledge_executable(searcher: RAGSearcher, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the advanced query_knowledge function using the hybrid retriever. + + Args: + searcher: The RAGSearcher instance + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + return searcher.query(query) + + +def advanced_query_knowledge_fn(searcher: RAGSearcher) -> Function: + """ + Create a GAME Function for advanced querying of the knowledge base. + + Args: + searcher: The RAGSearcher instance + + Returns: + Function object for the advanced_query_knowledge function + """ + return Function( + fn_name="advanced_query_knowledge", + fn_description="Query the RAG knowledge base using hybrid retrieval (vector + BM25) and get an AI-generated answer", + args=[ + Argument(name="query", description="The query to search for", type="str"), + ], + executable=lambda query: advanced_query_knowledge_executable(searcher, query), + ) + + +def get_relevant_documents_executable(searcher: RAGSearcher, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the get_relevant_documents function. + + Args: + searcher: The RAGSearcher instance + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + return searcher.get_relevant_documents(query) + + +def get_relevant_documents_fn(searcher: RAGSearcher) -> Function: + """ + Create a GAME Function for getting relevant documents. + + Args: + searcher: The RAGSearcher instance + + Returns: + Function object for the get_relevant_documents function + """ + return Function( + fn_name="get_relevant_documents", + fn_description="Get relevant documents from the RAG knowledge base using hybrid retrieval (vector + BM25)", + args=[ + Argument(name="query", description="The query to search for", type="str"), + ], + executable=lambda query: get_relevant_documents_executable(searcher, query), + ) diff --git a/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_plugin.py b/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_plugin.py new file mode 100644 index 00000000..5f8759bb --- /dev/null +++ b/plugins/RAGPinecone/rag_pinecone_gamesdk/rag_pinecone_plugin.py @@ -0,0 +1,225 @@ +from typing import Dict, List, Optional, Tuple, Any +import os +import logging +from game_sdk.game.custom_types import Function, FunctionResultStatus, Argument +from pinecone import Pinecone, ServerlessSpec +from langchain_openai import OpenAIEmbeddings +from langchain_pinecone import PineconeVectorStore +from langchain.schema import Document + +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE, DEFAULT_EMBEDDING_MODEL + +logger = logging.getLogger(__name__) + +class RAGPineconePlugin: + """ + RAG (Retrieval Augmented Generation) plugin using Pinecone for vector storage + + Requires: + - Pinecone API key + - OpenAI API key for embeddings + + Example: + rag_plugin = RAGPineconePlugin( + pinecone_api_key="your-pinecone-api-key", + openai_api_key="your-openai-api-key", + index_name="your-index-name", + ) + + query_knowledge_fn = rag_plugin.get_function("query_knowledge") + """ + def __init__( + self, + pinecone_api_key: Optional[str] = os.environ.get("PINECONE_API_KEY"), + openai_api_key: Optional[str] = os.environ.get("OPENAI_API_KEY"), + index_name: str = DEFAULT_INDEX_NAME, + namespace: str = DEFAULT_NAMESPACE, + embedding_model: str = DEFAULT_EMBEDDING_MODEL, + ): + self.pinecone_api_key = pinecone_api_key + self.openai_api_key = openai_api_key + self.index_name = index_name + self.namespace = namespace + self.embedding_model = embedding_model + + # Initialize Pinecone client + self.pc = Pinecone(api_key=self.pinecone_api_key) + + # Create index if it doesn't exist + if self.index_name not in self.pc.list_indexes().names(): + logger.info(f"Creating index '{self.index_name}'...") + self.pc.create_index( + name=self.index_name, + dimension=1536, + metric="dotproduct", + spec=ServerlessSpec( + cloud="aws", + region="us-east-1" + ) + ) + logger.info("Index created!") + + # Initialize embeddings + self.embeddings = OpenAIEmbeddings( + model=self.embedding_model, + openai_api_key=self.openai_api_key + ) + + # Initialize vector store + self.vector_store = PineconeVectorStore.from_existing_index( + index_name=self.index_name, + embedding=self.embeddings, + namespace=self.namespace + ) + + # Available client functions + self._functions: Dict[str, Function] = { + "query_knowledge": Function( + fn_name="query_knowledge", + fn_description="Query the RAG knowledge base for relevant context", + args=[ + Argument( + name="query", + description="The query to find relevant context for", + type="string", + ), + Argument( + name="num_results", + description="Number of relevant documents to retrieve", + type="int", + optional=True + ), + ], + executable=self.query_knowledge, + ), + "add_document": Function( + fn_name="add_document", + fn_description="Add a document to the RAG knowledge base", + args=[ + Argument( + name="content", + description="The text content to add to the knowledge base", + type="string", + ), + Argument( + name="metadata", + description="Optional metadata about the document", + type="dict", + optional=True + ), + ], + executable=self.add_document, + ), + } + + @property + def available_functions(self) -> List[str]: + """Get list of available function names.""" + return list(self._functions.keys()) + + def get_function(self, fn_name: str) -> Function: + """ + Get a specific function by name. + + Args: + fn_name: Name of the function to retrieve + + Raises: + ValueError: If function name is not found + + Returns: + Function object + """ + if fn_name not in self._functions: + raise ValueError( + f"Function '{fn_name}' not found. Available functions: {', '.join(self.available_functions)}" + ) + return self._functions[fn_name] + + def query_knowledge(self, query: str, num_results: int = 3) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Query the knowledge base for relevant context. + + Args: + query: The query to search for + num_results: Number of relevant documents to retrieve + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Perform similarity search + docs = self.vector_store.similarity_search( + query=query, + k=num_results, + namespace=self.namespace + ) + + # Format results + results = [] + for doc in docs: + results.append({ + "content": doc.page_content, + "metadata": doc.metadata + }) + + return ( + FunctionResultStatus.DONE, + f"Found {len(results)} relevant documents", + { + "query": query, + "results": results + } + ) + except Exception as e: + logger.error(f"Error querying knowledge base: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error querying knowledge base: {str(e)}", + { + "query": query + } + ) + + def add_document(self, content: str, metadata: Optional[Dict[str, Any]] = None) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Add a document to the knowledge base. + + Args: + content: The text content to add + metadata: Optional metadata about the document + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Create document + metadata = metadata or {} + doc = Document( + page_content=content, + metadata=metadata + ) + + # Generate a document ID + doc_id = f"doc_{metadata.get('id', hash(content))}" + + # Add document to vector store + self.vector_store.add_documents([doc], ids=[doc_id]) + + return ( + FunctionResultStatus.DONE, + f"Document added successfully with ID: {doc_id}", + { + "doc_id": doc_id, + "metadata": metadata + } + ) + except Exception as e: + logger.error(f"Error adding document: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error adding document: {str(e)}", + { + "content": content[:100] + "..." if len(content) > 100 else content + } + ) diff --git a/plugins/RAGPinecone/rag_pinecone_gamesdk/search_rag.py b/plugins/RAGPinecone/rag_pinecone_gamesdk/search_rag.py new file mode 100644 index 00000000..d2b82c03 --- /dev/null +++ b/plugins/RAGPinecone/rag_pinecone_gamesdk/search_rag.py @@ -0,0 +1,418 @@ +import os +import logging +import sys +import warnings +from typing import List, Dict, Any, Optional, Tuple, Type + +from langchain.tools import BaseTool +from langchain_openai import OpenAIEmbeddings, ChatOpenAI +from langchain_pinecone import PineconeVectorStore +from langchain.chains import RetrievalQA +from langchain.text_splitter import RecursiveCharacterTextSplitter, SpacyTextSplitter +from langchain_community.retrievers import BM25Retriever +from langchain.schema import BaseRetriever, Document +from pydantic import Field, BaseModel + +from game_sdk.game.custom_types import Function, FunctionResultStatus, Argument +from rag_pinecone_gamesdk import DEFAULT_INDEX_NAME, DEFAULT_NAMESPACE, DEFAULT_EMBEDDING_MODEL + +logger = logging.getLogger(__name__) + +# Increase the recursion limit for complex document processing +sys.setrecursionlimit(10000) + + +class HybridRetriever(BaseRetriever): + """ + A hybrid retriever that combines vector search and BM25 for better results. + """ + vector_store: Any = Field(default=None) + bm25_retriever: BM25Retriever = Field(default=None) + k: int = Field(default=4) + + def _get_relevant_documents(self, query: str, run_manager: Any = None) -> List[Document]: + """ + Get relevant documents using both vector search and BM25. + + Args: + query: The search query + run_manager: Optional run manager + + Returns: + List of relevant documents + """ + # Get documents from vector store + vector_docs = self.vector_store.similarity_search(query, k=self.k) + + # Get documents from BM25 + bm25_docs = self.bm25_retriever.invoke(query)[:self.k] + + # Combine and deduplicate + all_docs = vector_docs + bm25_docs + seen = set() + unique_docs = [] + for doc in all_docs: + if doc.page_content not in seen: + seen.add(doc.page_content) + unique_docs.append(doc) + + return unique_docs[:self.k] + + def invoke(self, input: str, run_manager: Any = None, **kwargs) -> List[Document]: + """ + Invoke the retriever. + + Args: + input: The search query + run_manager: Optional run manager + + Returns: + List of relevant documents + """ + return self._get_relevant_documents(input, run_manager=run_manager) + + async def ainvoke(self, input: str, run_manager: Any = None, **kwargs) -> List[Document]: + """ + Asynchronously invoke the retriever. + + Args: + input: The search query + run_manager: Optional run manager + + Returns: + List of relevant documents + """ + return self._get_relevant_documents(input, run_manager=run_manager) + + +class RAGSearcher: + """ + Advanced RAG searcher with hybrid retrieval capabilities. + """ + + def __init__( + self, + pinecone_api_key: Optional[str] = os.environ.get("PINECONE_API_KEY"), + openai_api_key: Optional[str] = os.environ.get("OPENAI_API_KEY"), + index_name: str = DEFAULT_INDEX_NAME, + namespace: str = DEFAULT_NAMESPACE, + embedding_model: str = DEFAULT_EMBEDDING_MODEL, + llm_model: str = "gpt-4", + temperature: float = 0.0, + k: int = 4, + ): + """ + Initialize the RAG searcher. + + Args: + pinecone_api_key: Pinecone API key + openai_api_key: OpenAI API key + index_name: Pinecone index name + namespace: Pinecone namespace + embedding_model: OpenAI embedding model + llm_model: LLM model to use for answering + temperature: Temperature for the LLM + k: Number of documents to retrieve + """ + self.pinecone_api_key = pinecone_api_key + self.openai_api_key = openai_api_key + self.index_name = index_name + self.namespace = namespace + self.embedding_model = embedding_model + self.llm_model = llm_model + self.temperature = temperature + self.k = k + + # These will be initialized when needed + self.llm = None + self.vector_store = None + self.bm25_retriever = None + self.hybrid_retriever = None + self.qa_chain = None + + # Initialize components if API keys are available + if self.pinecone_api_key and self.openai_api_key: + self.initialize_components() + + def initialize_components(self): + """ + Initialize the retrieval components. + """ + try: + logger.info(f"Initializing RAG components for index: {self.index_name}, namespace: {self.namespace}") + + # Initialize LLM + self.llm = ChatOpenAI( + api_key=self.openai_api_key, + model_name=self.llm_model, + temperature=self.temperature + ) + + # Initialize embeddings + embeddings = OpenAIEmbeddings( + model=self.embedding_model, + openai_api_key=self.openai_api_key + ) + + # Initialize vector store + self.vector_store = PineconeVectorStore.from_existing_index( + index_name=self.index_name, + namespace=self.namespace, + embedding=embeddings + ) + + # Get all documents for BM25 + all_docs = self.vector_store.similarity_search("", k=1000) # Get a sample of documents + logger.info(f"Retrieved {len(all_docs)} documents from vector store for BM25 indexing") + + # Initialize text splitter for BM25 + # Suppress specific warnings from SpaCy + warnings.filterwarnings("ignore", message="\\[W108\\] The rule-based lemmatizer did not find POS annotation for one or more tokens.*") + + try: + text_splitter = SpacyTextSplitter( + pipeline="en_core_web_sm", + chunk_size=1500, + chunk_overlap=200 + ) + split_docs = text_splitter.split_documents(all_docs) + except Exception as e: + logger.warning(f"Error using SpacyTextSplitter: {str(e)}. Falling back to RecursiveCharacterTextSplitter.") + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=1500, + chunk_overlap=200 + ) + split_docs = text_splitter.split_documents(all_docs) + + # Initialize BM25 retriever + self.bm25_retriever = BM25Retriever.from_documents(split_docs) + logger.info("BM25 retriever initialized successfully") + + # Initialize hybrid retriever + self.hybrid_retriever = HybridRetriever( + vector_store=self.vector_store, + bm25_retriever=self.bm25_retriever, + k=self.k + ) + + # Initialize QA chain + self.qa_chain = RetrievalQA.from_chain_type( + llm=self.llm, + chain_type="stuff", + retriever=self.hybrid_retriever + ) + + logger.info("RAG components initialized successfully") + + except Exception as e: + logger.error(f"Error initializing RAG components: {str(e)}") + raise + + def query(self, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Query the knowledge base. + + Args: + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Initialize components if not already initialized + if not self.qa_chain: + self.initialize_components() + + # Check if components are initialized + if not self.qa_chain: + return ( + FunctionResultStatus.FAILED, + "RAG system is not properly initialized. Please check the setup.", + {"query": query} + ) + + # Get answer from QA chain + result = self.qa_chain.invoke(query) + answer = result['result'] if isinstance(result, dict) and 'result' in result else str(result) + + # Get source documents + source_docs = [] + if self.hybrid_retriever: + docs = self.hybrid_retriever.invoke(query) + for i, doc in enumerate(docs): + source_docs.append({ + "content": doc.page_content, + "metadata": doc.metadata + }) + + return ( + FunctionResultStatus.DONE, + answer, + { + "query": query, + "source_documents": source_docs + } + ) + except Exception as e: + logger.error(f"Error querying knowledge base: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error querying knowledge base: {str(e)}", + {"query": query} + ) + + def get_relevant_documents(self, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Get relevant documents for a query without generating an answer. + + Args: + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + try: + # Initialize components if not already initialized + if not self.hybrid_retriever: + self.initialize_components() + + # Check if components are initialized + if not self.hybrid_retriever: + return ( + FunctionResultStatus.FAILED, + "RAG system is not properly initialized. Please check the setup.", + {"query": query} + ) + + # Get relevant documents + docs = self.hybrid_retriever.invoke(query) + + # Format results + results = [] + for i, doc in enumerate(docs): + results.append({ + "content": doc.page_content, + "metadata": doc.metadata + }) + + # Format message + formatted_message = f"Found {len(results)} relevant documents for query: '{query}'\n\n" + for i, result in enumerate(results): + content = result["content"] + metadata = result["metadata"] + + # Format metadata for display + metadata_str = ", ".join([f"{k}: {v}" for k, v in metadata.items() + if k not in ["chunk_id", "file_fingerprint"]]) + + formatted_message += f"Document {i+1}:\n{content}\n\nSource: {metadata_str}\n\n---\n\n" + + return ( + FunctionResultStatus.DONE, + formatted_message, + { + "query": query, + "results": results + } + ) + except Exception as e: + logger.error(f"Error retrieving relevant documents: {str(e)}") + return ( + FunctionResultStatus.FAILED, + f"Error retrieving relevant documents: {str(e)}", + {"query": query} + ) + + +# Function wrappers for GAME SDK integration + +def query_knowledge_executable(searcher: RAGSearcher, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the query_knowledge function. + + Args: + searcher: The RAGSearcher instance + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + return searcher.query(query) + + +def query_knowledge_fn(searcher: RAGSearcher) -> Function: + """ + Create a GAME Function for querying the knowledge base. + + Args: + searcher: The RAGSearcher instance + + Returns: + Function object + """ + return Function( + fn_name="query_knowledge", + fn_description="Query the RAG knowledge base for relevant information and get an AI-generated answer", + args=[ + Argument(name="query", description="The query to search for", type="str"), + ], + executable=lambda query: query_knowledge_executable(searcher, query), + ) + + +def get_relevant_documents_executable(searcher: RAGSearcher, query: str) -> Tuple[FunctionResultStatus, str, Dict[str, Any]]: + """ + Execute the get_relevant_documents function. + + Args: + searcher: The RAGSearcher instance + query: The query to search for + + Returns: + Tuple containing status, message, and results dictionary + """ + return searcher.get_relevant_documents(query) + + +def get_relevant_documents_fn(searcher: RAGSearcher) -> Function: + """ + Create a GAME Function for getting relevant documents. + + Args: + searcher: The RAGSearcher instance + + Returns: + Function object + """ + return Function( + fn_name="get_relevant_documents", + fn_description="Get relevant documents from the RAG knowledge base without generating an answer", + args=[ + Argument(name="query", description="The query to search for", type="str"), + ], + executable=lambda query: get_relevant_documents_executable(searcher, query), + ) + + +# Example usage +if __name__ == "__main__": + # Load environment variables + from dotenv import load_dotenv + load_dotenv() + + # Initialize searcher + searcher = RAGSearcher() + + # Test query + query = "What is RAG?" + print(f"Query: {query}") + status, message, results = searcher.query(query) + print(f"Status: {status}") + print(f"Answer: {message}") + print(f"Source documents: {len(results.get('source_documents', []))}") + + # Test get relevant documents + print("\nGetting relevant documents...") + status, message, results = searcher.get_relevant_documents(query) + print(f"Status: {status}") + print(f"Found {len(results.get('results', []))} relevant documents") \ No newline at end of file diff --git a/plugins/RAGPinecone/setup.py b/plugins/RAGPinecone/setup.py new file mode 100644 index 00000000..7954059a --- /dev/null +++ b/plugins/RAGPinecone/setup.py @@ -0,0 +1,30 @@ +from setuptools import setup, find_packages + +setup( + name="rag-pinecone-gamesdk", + version="0.1.0", + packages=find_packages(), + install_requires=[ + "pinecone-client>=2.2.1", + "langchain>=0.0.267", + "langchain-community>=0.0.1", + "langchain-pinecone>=0.0.1", + "langchain-openai>=0.0.2", + "openai>=1.1.1", + "python-dotenv>=1.0.0", + "unstructured>=0.10.0", + "pdf2image>=1.16.3", + "pytesseract>=0.3.10", + "docx2txt>=0.8", + "pandas>=2.0.0", + "beautifulsoup4>=4.12.0", + "markdown>=3.4.3", + "rank_bm25>=0.2.2", + "spacy>=3.0.0", + "gdown", + ], + python_requires=">=3.9", +) + + +#python -m spacy download en_core_web_sm