@@ -94,25 +94,29 @@ def main():
9494
9595 # Vector embedding using Amazon Bedrock Titan text embedding
9696 all_json_records = []
97- logging .info (f"Creating embeddings for { len ( all_records ) } records" )
97+ logging .info (f"Creating embeddings for records" )
9898
9999 # using the arg --early-stop
100100 i = 0
101101 for record in all_records :
102102 i += 1
103103 if args .early_stop :
104104 if i > early_stop_record_count :
105+ # Bulk put all records to OpenSearch
106+ success , failed = opensearch .put_bulk_in_opensearch (all_json_records , opensearch_client )
107+ logging .info (f"Documents saved { success } , documents failed to save { failed } " )
105108 break
106109 records_with_embedding = create_vector_embedding_with_bedrock (record , name , bedrock_client )
107110 logging .info (f"Embedding for record { i } created" )
108111 all_json_records .append (records_with_embedding )
109-
112+ if i % 500 == 0 or i == len (all_records )- 1 :
113+ # Bulk put all records to OpenSearch
114+ success , failed = opensearch .put_bulk_in_opensearch (all_json_records , opensearch_client )
115+ all_json_records = []
116+ logging .info (f"Documents saved { success } , documents failed to save { failed } " )
117+
110118 logging .info ("Finished creating records using Amazon Bedrock Titan text embedding" )
111119
112- # Bulk put all records to OpenSearch
113- success , failed = opensearch .put_bulk_in_opensearch (all_json_records , opensearch_client )
114- logging .info (f"Documents saved { success } , documents failed to save { failed } " )
115-
116120 logging .info ("Cleaning up" )
117121 dataset .delete_file (compressed_file_path )
118122 dataset .delete_file (file_path )
0 commit comments