Skip to content

Commit b6ad90b

Browse files
committed
Added comments to understand code
Signed-off-by: hmumtazz <[email protected]>
1 parent 08ed40e commit b6ad90b

File tree

7 files changed

+112
-14
lines changed

7 files changed

+112
-14
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
[DEFAULT]
2+
region = us-west-2
3+
iam_principal = arn:aws:iam::615299771255:user/hmumtazz
4+
index_name = drpepper
5+
collection_name =
6+
is_serverless = False
7+
opensearch_endpoint = https://search-hashim-test5-eivrlyacr3n653fnkkrg2yab7u.aos.us-west-2.on.aws
8+
opensearch_username = admin
9+
opensearch_password = MyPassword123!
10+
embedding_dimension = 768
11+
space_type = l2

opensearch_py_ml/ml_commons/rag_pipeline/rag/ingest.py

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,16 @@ class Ingest:
4646
EMBEDDING_MODEL_ID = 'amazon.titan-embed-text-v2:0'
4747

4848
def __init__(self, config):
49+
# Initialize the Ingest class with configuration
4950
self.config = config
5051
self.aws_region = config.get('region')
5152
self.index_name = config.get('index_name')
5253
self.bedrock_client = None
5354
self.opensearch = OpenSearchConnector(config)
5455

5556
def initialize_clients(self):
57+
# Initialize AWS Bedrock and OpenSearch clients
58+
# Returns True if successful, False otherwise
5659
try:
5760
self.bedrock_client = boto3.client('bedrock-runtime', region_name=self.aws_region)
5861
if self.opensearch.initialize_opensearch_client():
@@ -66,6 +69,9 @@ def initialize_clients(self):
6669
return False
6770

6871
def process_file(self, file_path: str) -> List[Dict[str, str]]:
72+
# Process a file based on its extension
73+
# Supports CSV, TXT, and PDF files
74+
# Returns a list of dictionaries containing extracted text
6975
_, file_extension = os.path.splitext(file_path)
7076

7177
if file_extension.lower() == '.csv':
@@ -79,6 +85,9 @@ def process_file(self, file_path: str) -> List[Dict[str, str]]:
7985
return []
8086

8187
def process_csv(self, file_path: str) -> List[Dict[str, str]]:
88+
# Process a CSV file
89+
# Extracts information and formats it into a sentence
90+
# Returns a list of dictionaries with the formatted text
8291
documents = []
8392
with open(file_path, 'r') as csvfile:
8493
reader = csv.DictReader(csvfile)
@@ -90,11 +99,17 @@ def process_csv(self, file_path: str) -> List[Dict[str, str]]:
9099
return documents
91100

92101
def process_txt(self, file_path: str) -> List[Dict[str, str]]:
102+
# Process a TXT file
103+
# Reads the entire content of the file
104+
# Returns a list with a single dictionary containing the file content
93105
with open(file_path, 'r') as txtfile:
94106
content = txtfile.read()
95107
return [{"text": content}]
96108

97109
def process_pdf(self, file_path: str) -> List[Dict[str, str]]:
110+
# Process a PDF file
111+
# Extracts text from each page of the PDF
112+
# Returns a list of dictionaries, each containing text from a page
98113
documents = []
99114
with open(file_path, 'rb') as pdffile:
100115
pdf_reader = PyPDF2.PdfReader(pdffile)
@@ -105,6 +120,9 @@ def process_pdf(self, file_path: str) -> List[Dict[str, str]]:
105120
return documents
106121

107122
def text_embedding(self, text, max_retries=5, initial_delay=1, backoff_factor=2):
123+
# Generate text embedding using AWS Bedrock
124+
# Implements exponential backoff for retries in case of failures
125+
# Returns the embedding if successful, None otherwise
108126
if self.bedrock_client is None:
109127
print("Bedrock client is not initialized. Please run setup first.")
110128
return None
@@ -139,6 +157,9 @@ def text_embedding(self, text, max_retries=5, initial_delay=1, backoff_factor=2)
139157
return None
140158

141159
def process_and_ingest_data(self, file_paths: List[str]):
160+
# Process and ingest data from multiple files
161+
# Generates embeddings for each document and ingests into OpenSearch
162+
# Displays progress and results of the ingestion process
142163
if not self.initialize_clients():
143164
print("Failed to initialize clients. Aborting ingestion.")
144165
return
@@ -197,6 +218,8 @@ def process_and_ingest_data(self, file_paths: List[str]):
197218
print(f"{Fore.RED}Failed to ingest {failed} documents.{Style.RESET_ALL}")
198219

199220
def ingest_command(self, paths: List[str]):
221+
# Main ingestion command
222+
# Processes all valid files in the given paths and initiates ingestion
200223
all_files = []
201224
for path in paths:
202225
if os.path.isfile(path):
@@ -215,4 +238,4 @@ def ingest_command(self, paths: List[str]):
215238

216239
print(f"{Fore.GREEN}Found {len(valid_files)} valid files for ingestion.{Style.RESET_ALL}")
217240

218-
self.process_and_ingest_data(valid_files)
241+
self.process_and_ingest_data(valid_files)

opensearch_py_ml/ml_commons/rag_pipeline/rag/opensearch_connector.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
class OpenSearchConnector:
3232
def __init__(self, config):
33+
# Initialize the OpenSearchConnector with configuration
3334
self.config = config
3435
self.opensearch_client = None
3536
self.aws_region = config.get('region')
@@ -40,6 +41,9 @@ def __init__(self, config):
4041
self.opensearch_password = config.get('opensearch_password')
4142

4243
def initialize_opensearch_client(self):
44+
# Initialize the OpenSearch client
45+
# Handles both serverless and non-serverless configurations
46+
# Returns True if successful, False otherwise
4347
if not self.opensearch_endpoint:
4448
print("OpenSearch endpoint not set. Please run setup first.")
4549
return False
@@ -73,6 +77,8 @@ def initialize_opensearch_client(self):
7377
return False
7478

7579
def create_index(self, embedding_dimension, space_type):
80+
# Create a new KNN index in OpenSearch
81+
# Sets up the mapping for nominee_text and nominee_vector fields
7682
index_body = {
7783
"mappings": {
7884
"properties": {
@@ -107,6 +113,8 @@ def create_index(self, embedding_dimension, space_type):
107113
print(f"Error creating index '{self.index_name}': {e}")
108114

109115
def verify_and_create_index(self, embedding_dimension, space_type):
116+
# Check if the index exists, create it if it doesn't
117+
# Returns True if the index exists or was successfully created, False otherwise
110118
try:
111119
index_exists = self.opensearch_client.indices.exists(index=self.index_name)
112120
if index_exists:
@@ -119,6 +127,8 @@ def verify_and_create_index(self, embedding_dimension, space_type):
119127
return False
120128

121129
def bulk_index(self, actions):
130+
# Perform bulk indexing of documents
131+
# Returns the number of successfully indexed documents and the number of failures
122132
try:
123133
success_count, error_info = opensearch_helpers.bulk(self.opensearch_client, actions)
124134
error_count = len(error_info)
@@ -129,6 +139,8 @@ def bulk_index(self, actions):
129139
return 0, len(actions)
130140

131141
def search(self, vector, k=5):
142+
# Perform a KNN search using the provided vector
143+
# Returns the top k matching documents
132144
try:
133145
response = self.opensearch_client.search(
134146
index=self.index_name,

opensearch_py_ml/ml_commons/rag_pipeline/rag/query.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,13 +40,16 @@ class Query:
4040
LLM_MODEL_ID = 'amazon.titan-text-express-v1'
4141

4242
def __init__(self, config):
43+
# Initialize the Query class with configuration
4344
self.config = config
4445
self.aws_region = config.get('region')
4546
self.index_name = config.get('index_name')
4647
self.bedrock_client = None
4748
self.opensearch = OpenSearchConnector(config)
4849

4950
def initialize_clients(self):
51+
# Initialize AWS Bedrock and OpenSearch clients
52+
# Returns True if successful, False otherwise
5053
try:
5154
self.bedrock_client = boto3.client('bedrock-runtime', region_name=self.aws_region)
5255
if self.opensearch.initialize_opensearch_client():
@@ -60,6 +63,9 @@ def initialize_clients(self):
6063
return False
6164

6265
def text_embedding(self, text, max_retries=5, initial_delay=1, backoff_factor=2):
66+
# Generate text embedding using AWS Bedrock
67+
# Implements exponential backoff for retries in case of failures
68+
# Returns the embedding if successful, None otherwise
6369
if self.bedrock_client is None:
6470
print("Bedrock client is not initialized. Please run setup first.")
6571
return None
@@ -94,6 +100,9 @@ def text_embedding(self, text, max_retries=5, initial_delay=1, backoff_factor=2)
94100
return None
95101

96102
def bulk_query(self, queries, k=5):
103+
# Perform bulk semantic search for multiple queries
104+
# Generates embeddings for queries and searches OpenSearch index
105+
# Returns a list of results containing query, context, and number of results
97106
print("Generating embeddings for queries...")
98107
query_vectors = []
99108
for query in queries:
@@ -133,6 +142,9 @@ def bulk_query(self, queries, k=5):
133142
return results
134143

135144
def generate_answer(self, prompt, config):
145+
# Generate an answer using the LLM model
146+
# Handles token limit and configures LLM parameters
147+
# Returns the generated answer or None if an error occurs
136148
try:
137149
max_input_tokens = 8192 # Max tokens for the model
138150
expected_output_tokens = config.get('maxTokenCount', 1000)
@@ -172,6 +184,9 @@ def generate_answer(self, prompt, config):
172184
return None
173185

174186
def query_command(self, queries: List[str], num_results=5):
187+
# Main query command to process multiple queries
188+
# Performs semantic search and generates answers using LLM
189+
# Prints results for each query
175190
if not self.initialize_clients():
176191
print("Failed to initialize clients. Aborting query.")
177192
return

opensearch_py_ml/ml_commons/rag_pipeline/rag/rag.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,20 @@
3838
CONFIG_FILE = 'config.ini'
3939

4040
def load_config():
41+
# Load configuration from the config file
4142
config = configparser.ConfigParser()
4243
config.read(CONFIG_FILE)
4344
return config['DEFAULT']
4445

4546
def save_config(config):
47+
# Save configuration to the config file
4648
parser = configparser.ConfigParser()
4749
parser['DEFAULT'] = config
4850
with open(CONFIG_FILE, 'w') as f:
4951
parser.write(f)
5052

5153
def main():
54+
# Set up argument parser for CLI
5255
parser = argparse.ArgumentParser(description="RAG Pipeline CLI")
5356
parser.add_argument('command', choices=['setup', 'ingest', 'query'], help='Command to run')
5457
parser.add_argument('--paths', nargs='+', help='Paths to files or directories for ingestion')
@@ -57,14 +60,18 @@ def main():
5760

5861
args = parser.parse_args()
5962

63+
# Load existing configuration
6064
config = load_config()
6165

6266
if args.command == 'setup':
67+
# Run setup process
6368
setup = Setup()
6469
setup.setup_command()
6570
save_config(setup.config)
6671
elif args.command == 'ingest':
72+
# Handle ingestion command
6773
if not args.paths:
74+
# If no paths provided as arguments, prompt user for input
6875
paths = []
6976
while True:
7077
path = input("Enter a file or directory path (or press Enter to finish): ")
@@ -76,7 +83,9 @@ def main():
7683
ingest = Ingest(config)
7784
ingest.ingest_command(paths)
7885
elif args.command == 'query':
86+
# Handle query command
7987
if not args.queries:
88+
# If no queries provided as arguments, prompt user for input
8089
queries = []
8190
while True:
8291
query = input("Enter a query (or press Enter to finish): ")
@@ -88,7 +97,8 @@ def main():
8897
query = Query(config)
8998
query.query_command(queries, num_results=args.num_results)
9099
else:
100+
# If an invalid command is provided, print help
91101
parser.print_help()
92102

93103
if __name__ == "__main__":
94-
main()
104+
main()

0 commit comments

Comments
 (0)