|
| 1 | +from llama_index.core import Document, Settings, SimpleDirectoryReader, StorageContext, VectorStoreIndex |
| 2 | +from llama_index.core.node_parser import SentenceSplitter, CodeSplitter, MarkdownNodeParser, JSONNodeParser |
| 3 | +from llama_index.vector_stores.elasticsearch import ElasticsearchStore |
| 4 | +from dotenv import load_dotenv |
| 5 | +from llama_index.embeddings.openai import OpenAIEmbedding |
| 6 | +from llama_index.core.ingestion import IngestionPipeline |
| 7 | +import tree_sitter_python as tspython |
| 8 | +from tree_sitter_languages import get_parser, get_language |
| 9 | +from tree_sitter import Parser, Language |
| 10 | +import logging |
| 11 | +import nest_asyncio |
| 12 | +import elastic_transport |
| 13 | +import sys |
| 14 | +import subprocess |
| 15 | +import shutil |
| 16 | +import time |
| 17 | +import glob |
| 18 | +import os |
| 19 | + |
| 20 | +#logging.basicConfig(stream=sys.stdout, level=logging.INFO) |
| 21 | +#logging.getLogger().addHandler(logging.StreamHandler(stream=sys.stdout)) |
| 22 | +#logging.getLogger('elasticsearch').setLevel(logging.DEBUG) |
| 23 | + |
| 24 | +nest_asyncio.apply() |
| 25 | + |
| 26 | +load_dotenv('.env') |
| 27 | + |
| 28 | +Settings.embed_model = OpenAIEmbedding(model="text-embedding-3-large") |
| 29 | +Settings.chunk_lines = 1024 |
| 30 | +Settings.chunk_size = 1024 |
| 31 | +Settings.chunk_lines_overlap = 20 |
| 32 | +Settings.max_chars = 1500 |
| 33 | + |
| 34 | + |
| 35 | +def clone_repository(owner, repo, branch, base_path="/tmp"): |
| 36 | + branch = branch or os.getenv("GITHUB_BRANCH") |
| 37 | + if not branch: |
| 38 | + raise ValueError("Branch is not provided and GITHUB_BRANCH environment variable is not set.") |
| 39 | + |
| 40 | + local_repo_path = os.path.join(base_path, owner, repo) |
| 41 | + clone_url = f"https://github.com/{owner}/{repo}.git" |
| 42 | + |
| 43 | + if os.path.exists(local_repo_path): |
| 44 | + print(f"Repository already exists at {local_repo_path}. Skipping clone.") |
| 45 | + return local_repo_path |
| 46 | + |
| 47 | + attempts = 3 |
| 48 | + |
| 49 | + for attempt in range(attempts): |
| 50 | + try: |
| 51 | + os.makedirs(local_repo_path, exist_ok=True) |
| 52 | + print(f"Attempting to clone repository... Attempt {attempt + 1}") |
| 53 | + subprocess.run(["git", "clone", "-b", branch, clone_url, local_repo_path], check=True) |
| 54 | + print(f"Repository cloned into {local_repo_path}.") |
| 55 | + return local_repo_path |
| 56 | + except subprocess.CalledProcessError: |
| 57 | + print(f"Attempt {attempt + 1} failed, retrying...") |
| 58 | + time.sleep(10) |
| 59 | + if attempt < attempts - 1: |
| 60 | + continue |
| 61 | + else: |
| 62 | + raise Exception("Failed to clone repository after multiple attempts") |
| 63 | + |
| 64 | +def print_docs_and_nodes(docs, nodes): |
| 65 | + print("\n=== Documents ===\n") |
| 66 | + for doc in docs: |
| 67 | + print(f"Document ID: {doc.doc_id}") |
| 68 | + print(f"Document Content:\n{doc.text}\n\n---\n") |
| 69 | + |
| 70 | + print("\n=== Nodes ===\n") |
| 71 | + for node in nodes: |
| 72 | + print(f"Node ID: {node.id_}") |
| 73 | + print(f"Node Content:\n{node.text}\n\n---\n") |
| 74 | + |
| 75 | +def collect_and_print_file_summary(file_summary): |
| 76 | + print("\n=== File Summary ===\n") |
| 77 | + for summary in file_summary: |
| 78 | + print(summary) |
| 79 | + |
| 80 | +def parse_documents(): |
| 81 | + owner = os.getenv('GITHUB_OWNER') |
| 82 | + repo = os.getenv('GITHUB_REPO') |
| 83 | + branch = os.getenv('GITHUB_BRANCH') |
| 84 | + base_path = os.getenv('BASE_PATH', "/tmp") |
| 85 | + |
| 86 | + if not owner or not repo: |
| 87 | + raise ValueError("GITHUB_OWNER and GITHUB_REPO environment variables must be set.") |
| 88 | + |
| 89 | + local_repo_path = clone_repository(owner, repo, branch, base_path) |
| 90 | + |
| 91 | + nodes = [] |
| 92 | + file_summary = [] |
| 93 | + |
| 94 | + ts_parser = get_parser('typescript') |
| 95 | + py_parser = get_parser('python') |
| 96 | + go_parser = get_parser('go') |
| 97 | + js_parser = get_parser('javascript') |
| 98 | + bash_parser = get_parser('bash') |
| 99 | + yaml_parser = get_parser('yaml') |
| 100 | + |
| 101 | + parsers_and_extensions = [ |
| 102 | + (SentenceSplitter(), [".md"]), |
| 103 | + (CodeSplitter(language='python', parser=py_parser), [".py", ".ipynb"]), |
| 104 | + (CodeSplitter(language='typescript', parser=ts_parser), [".ts"]), |
| 105 | + (CodeSplitter(language='go', parser=go_parser), [".go"]), |
| 106 | + (CodeSplitter(language='javascript', parser=js_parser), [".js"]), |
| 107 | + (CodeSplitter(language='bash', parser=bash_parser), [".bash", ",sh"]), |
| 108 | + (CodeSplitter(language='yaml', parser=yaml_parser), [".yaml", ".yml"]), |
| 109 | + (JSONNodeParser(), [".json"]), |
| 110 | + ] |
| 111 | + |
| 112 | + for parser, extensions in parsers_and_extensions: |
| 113 | + matching_files = [] |
| 114 | + for ext in extensions: |
| 115 | + matching_files.extend(glob.glob(f"{local_repo_path}/**/*{ext}", recursive=True)) |
| 116 | + |
| 117 | + if len(matching_files) > 0: |
| 118 | + file_summary.append(f"Found {len(matching_files)} {', '.join(extensions)} files in the repository.") |
| 119 | + loader = SimpleDirectoryReader(input_dir=local_repo_path, required_exts=extensions, recursive=True) |
| 120 | + docs = loader.load_data() |
| 121 | + parsed_nodes = parser.get_nodes_from_documents(docs) |
| 122 | + |
| 123 | + print_docs_and_nodes(docs, parsed_nodes) |
| 124 | + |
| 125 | + nodes.extend(parsed_nodes) |
| 126 | + else: |
| 127 | + file_summary.append(f"No {', '.join(extensions)} files found in the repository.") |
| 128 | + |
| 129 | + collect_and_print_file_summary(file_summary) |
| 130 | + print("\n") |
| 131 | + return nodes |
| 132 | + |
| 133 | +def get_es_vector_store(): |
| 134 | + print("Initializing Elasticsearch store...") |
| 135 | + es_cloud_id = os.getenv("ELASTIC_CLOUD_ID") |
| 136 | + es_user = os.getenv("ELASTIC_USER") |
| 137 | + es_password = os.getenv("ELASTIC_PASSWORD") |
| 138 | + index_name = os.getenv("ELASTIC_INDEX") |
| 139 | + retries = 20 |
| 140 | + for attempt in range(retries): |
| 141 | + try: |
| 142 | + es_vector_store = ElasticsearchStore( |
| 143 | + index_name=index_name, |
| 144 | + es_cloud_id=es_cloud_id, |
| 145 | + es_user=es_user, |
| 146 | + es_password=es_password, |
| 147 | + batch_size=100 |
| 148 | + ) |
| 149 | + print("Elasticsearch store initialized.") |
| 150 | + return es_vector_store |
| 151 | + except elastic_transport.ConnectionTimeout: |
| 152 | + print(f"Connection attempt {attempt + 1}/{retries} timed out. Retrying...") |
| 153 | + time.sleep(10) |
| 154 | + raise Exception("Failed to initialize Elasticsearch store after multiple attempts") |
| 155 | + |
| 156 | +def main(): |
| 157 | + nodes = parse_documents() |
| 158 | + es_vector_store = get_es_vector_store() |
| 159 | + |
| 160 | + try: |
| 161 | + pipeline = IngestionPipeline( |
| 162 | + vector_store=es_vector_store, |
| 163 | + ) |
| 164 | + |
| 165 | + pipeline.run(documents=nodes, show_progress=True) |
| 166 | + finally: |
| 167 | + if hasattr(es_vector_store, "close"): |
| 168 | + es_vector_store.close() |
| 169 | + print("Elasticsearch connection closed.") |
| 170 | + |
| 171 | +if __name__ == "__main__": |
| 172 | + main() |
0 commit comments