2929def update_exception_db (graph_obj ,file_name ,exp_msg ):
3030 try :
3131 job_status = "Failed"
32- source_node = "fileName: '{}'"
33- update_node_prop = 'SET d.status = "{}", d.errorMessage = "{}"'
34- graph_obj .query ('MERGE(d:Document {' + source_node .format (file_name )+ '}) ' + update_node_prop .format (job_status ,exp_msg ))
32+ graph_obj .query ("""MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""" ,
33+ {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg })
3534 except Exception as e :
3635 error_message = str (e )
3736 logging .error (f"Error in updating document node status as failed: { error_message } " )
@@ -41,10 +40,15 @@ def create_source_node(graph_obj,file_name,file_size,file_type,source,model,url=
4140 try :
4241 current_time = datetime .now ()
4342 job_status = "New"
44- source_node = "fileName: '{}'"
45- update_node_prop = "SET d.fileSize = '{}', d.fileType = '{}' ,d.status = '{}',d.url='{}',d.awsAccessKeyId='{}',d.fileSource='{}', d.createdAt ='{}', d.updatedAt = '{}', d.processingTime = '{}', d.errorMessage = '{}', d.nodeCount= {}, d.relationshipCount = {}, d.model= '{}'"
4643 logging .info ("create source node as file name if not exist" )
47- graph_obj .query ('MERGE(d:Document {' + source_node .format (file_name )+ '}) ' + update_node_prop .format (file_size ,file_type ,job_status ,url ,aws_access_key_id ,source ,current_time ,current_time ,0 ,'' ,0 ,0 ,model ))
44+ graph_obj .query ("""MERGE(d:Document {fileName :$fn}) SET d.fileSize = $fs, d.fileType = $ft ,
45+ d.status = $st, d.url = $url, d.awsAccessKeyId = $awsacc_key_id,
46+ d.fileSource = $f_source, d.createdAt = $c_at, d.updatedAt = $u_at,
47+ d.processingTime = $pt, d.errorMessage = $e_message, d.nodeCount= $n_count,
48+ d.relationshipCount = $r_count, d.model= $model""" ,
49+ {"fn" :file_name , "fs" :file_size , "ft" :file_type , "st" :job_status , "url" :url ,
50+ "awsacc_key_id" :aws_access_key_id , "f_source" :source , "c_at" :current_time ,
51+ "u_at" :current_time , "pt" :0 , "e_message" :'' , "n_count" :0 , "r_count" :0 , "model" :model })
4852 except Exception as e :
4953 error_message = str (e )
5054 update_exception_db (graph_obj ,file_name ,error_message )
@@ -69,10 +73,6 @@ def create_source_node_graph_local_file(uri, userName, password, file, model, db
6973 file_size = file .size
7074 file_name = file .filename
7175 source = 'local file'
72- # if db_name is not None:
73- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
74- # else:
75- # graph = Neo4jGraph(url=uri, username=userName, password=password)
7676 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
7777 create_source_node (graph ,file_name ,file_size ,file_type ,source ,model )
7878 return create_api_response ("Success" ,message = "Source Node created successfully" ,file_source = source )
@@ -173,10 +173,6 @@ def create_source_node_graph_url(uri, userName, password, source_url ,model, db_
173173 """
174174 try :
175175 source_type ,youtube_url = check_url_source (source_url )
176- # if db_name is not None:
177- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
178- # else:
179- # graph = Neo4jGraph(url=uri, username=userName, password=password)
180176 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
181177 logging .info (f"source type URL:{ source_type } " )
182178 if source_type == "s3 bucket" :
@@ -212,7 +208,6 @@ def create_source_node_graph_url(uri, userName, password, source_url ,model, db_
212208 return create_api_response ("Success" ,message = "Source Node created successfully" ,success_count = success_count ,Failed_count = Failed_count ,file_source = 's3 bucket' ,file_name = lst_s3_file_name )
213209 elif source_type == 'youtube' :
214210 source_url = youtube_url
215- # match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11})", source_url)
216211 match = re .search (r'(?:v=)([0-9A-Za-z_-]{11})\s*' ,source_url )
217212 logging .info (f"match value{ match } " )
218213 youtube_id = match .group (1 )
@@ -246,8 +241,6 @@ def file_into_chunks(pages: List[Document]):
246241 logging .info ("Split file into smaller chunks" )
247242 text_splitter = TokenTextSplitter (chunk_size = 200 , chunk_overlap = 20 )
248243 chunks = text_splitter .split_documents (pages )
249- # print('Before chunks',len(chunks))
250- #chunks=chunks[:10]
251244 return chunks
252245
253246def get_s3_pdf_content (s3_url ,aws_access_key_id = None ,aws_secret_access_key = None ):
@@ -304,14 +297,9 @@ def extract_graph_from_file(uri, userName, password, model, db_name=None, file=N
304297 Json response to API with fileName, nodeCount, relationshipCount, processingTime,
305298 status and model as attributes.
306299 """
307- # logging.info(f"extract_graph_from_file called for file:{file.filename}")
308300 try :
309301 start_time = datetime .now ()
310302 file_name = ''
311- # if db_name is not None:
312- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
313- # else:
314- # graph = Neo4jGraph(url=uri, username=userName, password=password)
315303 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
316304 source_node = "fileName: '{}'"
317305
@@ -398,7 +386,15 @@ def extract_graph_from_file(uri, userName, password, model, db_name=None, file=N
398386 job_status = "Completed"
399387 error_message = ""
400388 logging .info ("Update source node properties" )
401- graph .query ('MERGE(d:Document {' + source_node .format (file_key .split ('/' )[- 1 ])+ '}) ' + update_node_prop .format (start_time ,end_time ,round (processed_time .total_seconds (),2 ),job_status ,error_message ,nodes_created ,relationships_created ,model ))
389+ graph .query ("""MERGE(d:Document {fileName :$fn}) SET d.status = $st, d.createdAt = $c_at,
390+ d.updatedAt = $u_at, d.processingTime = $pt, d.nodeCount= $n_count,
391+ d.relationshipCount = $r_count, d.model= $model
392+ """ ,
393+ {"fn" :file_key .split ('/' )[- 1 ], "st" :job_status , "c_at" :start_time ,
394+ "u_at" :end_time , "pt" :round (processed_time .total_seconds (),2 ), "e_message" :'' ,
395+ "n_count" :nodes_created , "r_count" :relationships_created , "model" :model
396+ }
397+ )
402398
403399 output = {
404400 "fileName" : file_name ,
@@ -481,12 +477,6 @@ def get_source_list_from_graph(uri,userName,password,db_name=None):
481477 """
482478 logging .info ("Get existing files list from graph" )
483479 try :
484- # if len(db_name)!=0:
485- # logging.info(f"Fetching source list from, database = {db_name}")
486- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
487- # else:
488- # logging.info(f"Fetching source list from default database (neo4j)")
489- # graph = Neo4jGraph(url=uri, username=userName, password=password)
490480 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
491481 query = "MATCH(d:Document) RETURN d ORDER BY d.updatedAt DESC"
492482 result = graph .query (query )
@@ -505,12 +495,14 @@ def update_graph(uri,userName,password,db_name):
505495 """
506496 try :
507497 knn_min_score = os .environ .get ('KNN_MIN_SCORE' )
508-
509- query = "WHERE node <> c and score >= {} MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score"
510498 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
511499 result = graph .query ("""MATCH (c:Chunk)
512- WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5
513- CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score """ + query .format (knn_min_score ))
500+ WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5
501+ CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score
502+ WHERE node <> c and score >= $score MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score
503+ """ ,
504+ {"score" :knn_min_score }
505+ )
514506 logging .info (f"result : { result } " )
515507 except Exception as e :
516508 error_message = str (e )
0 commit comments