3333def update_exception_db (graph_obj ,file_name ,exp_msg ):
3434 try :
3535 job_status = "Failed"
36- source_node = "fileName: '{}'"
37- update_node_prop = 'SET d.status = "{}", d.errorMessage = "{}"'
38- graph_obj .query ('MERGE(d:Document {' + source_node .format (file_name )+ '}) ' + update_node_prop .format (job_status ,exp_msg ))
36+ graph_obj .query ("""MERGE(d:Document {fileName :$fName}) SET d.status = $status, d.errorMessage = $error_msg""" ,
37+ {"fName" :file_name , "status" :job_status , "error_msg" :exp_msg })
3938 except Exception as e :
4039 error_message = str (e )
4140 logging .error (f"Error in updating document node status as failed: { error_message } " )
@@ -45,10 +44,15 @@ def create_source_node(graph_obj,file_name,file_size,file_type,source,model,url=
4544 try :
4645 current_time = datetime .now ()
4746 job_status = "New"
48- source_node = "fileName: '{}'"
49- update_node_prop = "SET d.fileSize = '{}', d.fileType = '{}' ,d.status = '{}',d.url='{}',d.awsAccessKeyId='{}',d.fileSource='{}', d.createdAt ='{}', d.updatedAt = '{}', d.processingTime = '{}', d.errorMessage = '{}', d.nodeCount= {}, d.relationshipCount = {}, d.model= '{}'"
5047 logging .info ("create source node as file name if not exist" )
51- graph_obj .query ('MERGE(d:Document {' + source_node .format (file_name )+ '}) ' + update_node_prop .format (file_size ,file_type ,job_status ,url ,aws_access_key_id ,source ,current_time ,current_time ,0 ,'' ,0 ,0 ,model ))
48+ graph_obj .query ("""MERGE(d:Document {fileName :$fn}) SET d.fileSize = $fs, d.fileType = $ft ,
49+ d.status = $st, d.url = $url, d.awsAccessKeyId = $awsacc_key_id,
50+ d.fileSource = $f_source, d.createdAt = $c_at, d.updatedAt = $u_at,
51+ d.processingTime = $pt, d.errorMessage = $e_message, d.nodeCount= $n_count,
52+ d.relationshipCount = $r_count, d.model= $model""" ,
53+ {"fn" :file_name , "fs" :file_size , "ft" :file_type , "st" :job_status , "url" :url ,
54+ "awsacc_key_id" :aws_access_key_id , "f_source" :source , "c_at" :current_time ,
55+ "u_at" :current_time , "pt" :0 , "e_message" :'' , "n_count" :0 , "r_count" :0 , "model" :model })
5256 except Exception as e :
5357 error_message = str (e )
5458 update_exception_db (graph_obj ,file_name ,error_message )
@@ -73,10 +77,6 @@ def create_source_node_graph_local_file(uri, userName, password, file, model, db
7377 file_size = file .size
7478 file_name = file .filename
7579 source = 'local file'
76- # if db_name is not None:
77- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
78- # else:
79- # graph = Neo4jGraph(url=uri, username=userName, password=password)
8080 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
8181 create_source_node (graph ,file_name ,file_size ,file_type ,source ,model )
8282 return create_api_response ("Success" ,message = "Source Node created successfully" ,file_source = source )
@@ -177,10 +177,6 @@ def create_source_node_graph_url(uri, userName, password, source_url ,model, db_
177177 """
178178 try :
179179 source_type ,youtube_url = check_url_source (source_url )
180- # if db_name is not None:
181- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
182- # else:
183- # graph = Neo4jGraph(url=uri, username=userName, password=password)
184180 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
185181 logging .info (f"source type URL:{ source_type } " )
186182 if source_type == "s3 bucket" :
@@ -216,7 +212,6 @@ def create_source_node_graph_url(uri, userName, password, source_url ,model, db_
216212 return create_api_response ("Success" ,message = "Source Node created successfully" ,success_count = success_count ,Failed_count = Failed_count ,file_source = 's3 bucket' ,file_name = lst_s3_file_name )
217213 elif source_type == 'youtube' :
218214 source_url = youtube_url
219- # match = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11})", source_url)
220215 match = re .search (r'(?:v=)([0-9A-Za-z_-]{11})\s*' ,source_url )
221216 logging .info (f"match value{ match } " )
222217 file_name = YouTube (source_url ).title
@@ -266,8 +261,6 @@ def file_into_chunks(pages: List[Document]):
266261 logging .info ("Split file into smaller chunks" )
267262 text_splitter = TokenTextSplitter (chunk_size = 200 , chunk_overlap = 20 )
268263 chunks = text_splitter .split_documents (pages )
269- # print('Before chunks',len(chunks))
270- #chunks=chunks[:10]
271264 return chunks
272265
273266def get_s3_pdf_content (s3_url ,aws_access_key_id = None ,aws_secret_access_key = None ):
@@ -324,14 +317,9 @@ def extract_graph_from_file(uri, userName, password, model, db_name=None, file=N
324317 Json response to API with fileName, nodeCount, relationshipCount, processingTime,
325318 status and model as attributes.
326319 """
327- # logging.info(f"extract_graph_from_file called for file:{file.filename}")
328320 try :
329321 start_time = datetime .now ()
330322 file_name = ''
331- # if db_name is not None:
332- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
333- # else:
334- # graph = Neo4jGraph(url=uri, username=userName, password=password)
335323 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
336324 source_node = "fileName: '{}'"
337325
@@ -418,7 +406,15 @@ def extract_graph_from_file(uri, userName, password, model, db_name=None, file=N
418406 job_status = "Completed"
419407 error_message = ""
420408 logging .info ("Update source node properties" )
421- graph .query ('MERGE(d:Document {' + source_node .format (file_key .split ('/' )[- 1 ])+ '}) ' + update_node_prop .format (start_time ,end_time ,round (processed_time .total_seconds (),2 ),job_status ,error_message ,nodes_created ,relationships_created ,model ))
409+ graph .query ("""MERGE(d:Document {fileName :$fn}) SET d.status = $st, d.createdAt = $c_at,
410+ d.updatedAt = $u_at, d.processingTime = $pt, d.nodeCount= $n_count,
411+ d.relationshipCount = $r_count, d.model= $model
412+ """ ,
413+ {"fn" :file_key .split ('/' )[- 1 ], "st" :job_status , "c_at" :start_time ,
414+ "u_at" :end_time , "pt" :round (processed_time .total_seconds (),2 ), "e_message" :'' ,
415+ "n_count" :nodes_created , "r_count" :relationships_created , "model" :model
416+ }
417+ )
422418
423419 output = {
424420 "fileName" : file_name ,
@@ -502,12 +498,6 @@ def get_source_list_from_graph(uri,userName,password,db_name=None):
502498 """
503499 logging .info ("Get existing files list from graph" )
504500 try :
505- # if len(db_name)!=0:
506- # logging.info(f"Fetching source list from, database = {db_name}")
507- # graph = Neo4jGraph(url=uri, database=db_name, username=userName, password=password)
508- # else:
509- # logging.info(f"Fetching source list from default database (neo4j)")
510- # graph = Neo4jGraph(url=uri, username=userName, password=password)
511501 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
512502 query = "MATCH(d:Document) RETURN d ORDER BY d.updatedAt DESC"
513503 result = graph .query (query )
@@ -526,12 +516,14 @@ def update_graph(uri,userName,password,db_name):
526516 """
527517 try :
528518 knn_min_score = os .environ .get ('KNN_MIN_SCORE' )
529-
530- query = "WHERE node <> c and score >= {} MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score"
531519 graph = Neo4jGraph (url = uri , database = db_name , username = userName , password = password )
532520 result = graph .query ("""MATCH (c:Chunk)
533- WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5
534- CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score """ + query .format (knn_min_score ))
521+ WHERE c.embedding IS NOT NULL AND count { (c)-[:SIMILAR]-() } < 5
522+ CALL db.index.vector.queryNodes('vector', 6, c.embedding) yield node, score
523+ WHERE node <> c and score >= $score MERGE (c)-[rel:SIMILAR]-(node) SET rel.score = score
524+ """ ,
525+ {"score" :knn_min_score }
526+ )
535527 logging .info (f"result : { result } " )
536528 except Exception as e :
537529 error_message = str (e )
0 commit comments