11# Copyright (c) Microsoft Corporation.
22# Licensed under the MIT License.
33
4- import asyncio
54import inspect
65import os
76import traceback
87from time import time
9- from typing import cast
108
9+ import graphrag .api as api
1110import yaml
1211from azure .identity import DefaultAzureCredential
1312from azure .search .documents .indexes import SearchIndexClient
14- from datashaper import WorkflowCallbacksManager
1513from fastapi import (
1614 APIRouter ,
1715 HTTPException ,
@@ -193,21 +191,16 @@ async def _start_indexing_pipeline(index_name: str):
193191 f"{ sanitized_index_name } _description_embedding"
194192 )
195193
196- # set prompts for entity extraction, community report, and summarize descriptions.
194+ # set prompt for entity extraction
197195 if pipeline_job .entity_extraction_prompt :
198196 fname = "entity-extraction-prompt.txt"
199197 with open (fname , "w" ) as outfile :
200198 outfile .write (pipeline_job .entity_extraction_prompt )
201199 data ["entity_extraction" ]["prompt" ] = fname
202200 else :
203201 data .pop ("entity_extraction" )
204- if pipeline_job .community_report_prompt :
205- fname = "community-report-prompt.txt"
206- with open (fname , "w" ) as outfile :
207- outfile .write (pipeline_job .community_report_prompt )
208- data ["community_reports" ]["prompt" ] = fname
209- else :
210- data .pop ("community_reports" )
202+
203+ # set prompt for summarize descriptions
211204 if pipeline_job .summarize_descriptions_prompt :
212205 fname = "summarize-descriptions-prompt.txt"
213206 with open (fname , "w" ) as outfile :
@@ -216,15 +209,24 @@ async def _start_indexing_pipeline(index_name: str):
216209 else :
217210 data .pop ("summarize_descriptions" )
218211
219- # generate the default pipeline and override with custom settings
212+ # set prompt for community report
213+ if pipeline_job .community_report_prompt :
214+ fname = "community-report-prompt.txt"
215+ with open (fname , "w" ) as outfile :
216+ outfile .write (pipeline_job .community_report_prompt )
217+ data ["community_reports" ]["prompt" ] = fname
218+ else :
219+ data .pop ("community_reports" )
220+
221+ # generate a default GraphRagConfig and override with custom settings
220222 parameters = create_graphrag_config (data , "." )
221- pipeline_config = create_pipeline_config (parameters , True )
222223
223224 # reset pipeline job details
224225 pipeline_job .status = PipelineJobState .RUNNING
225226 pipeline_job .all_workflows = []
226227 pipeline_job .completed_workflows = []
227228 pipeline_job .failed_workflows = []
229+ pipeline_config = create_pipeline_config (parameters )
228230 for workflow in pipeline_config .workflows :
229231 pipeline_job .all_workflows .append (workflow .name )
230232
@@ -243,49 +245,44 @@ async def _start_indexing_pipeline(index_name: str):
243245 reporters = loggers ,
244246 )
245247
246- # add pipeline job callback to the callback manager
247- cast (WorkflowCallbacksManager , workflow_callbacks ).register (
248- PipelineJobWorkflowCallbacks (pipeline_job )
249- )
248+ # add pipeline job callback to monitor job progress
249+ pipeline_job_callback = PipelineJobWorkflowCallbacks (pipeline_job )
250250
251251 # run the pipeline
252252 try :
253- # TODO refactor to use the new replacement for run_pipeline_with_config
254- from graphrag .index .run import run_pipeline_with_config
255- async for workflow_result in run_pipeline_with_config (
256- config_or_path = pipeline_config ,
257- callbacks = workflow_callbacks ,
258- progress_reporter = None ,
259- ):
260- await asyncio .sleep (0 )
261- if len (workflow_result .errors or []) > 0 :
262- # if the workflow failed, record the failure
263- pipeline_job .failed_workflows .append (workflow_result .workflow )
264- pipeline_job .update_db ()
265- # TODO: exit early if a workflow fails and add more detailed error logging
266-
253+ await api .build_index (
254+ config = parameters ,
255+ callbacks = [workflow_callbacks , pipeline_job_callback ],
256+ )
267257 # if job is done, check if any workflow steps failed
268258 if len (pipeline_job .failed_workflows ) > 0 :
269259 pipeline_job .status = PipelineJobState .FAILED
260+ workflow_callbacks .on_log (
261+ message = f"Indexing pipeline encountered error for index'{ index_name } '." ,
262+ details = {
263+ "index" : index_name ,
264+ "storage_name" : storage_name ,
265+ "status_message" : "indexing pipeline encountered error" ,
266+ },
267+ )
270268 else :
271269 # record the workflow completion
272270 pipeline_job .status = PipelineJobState .COMPLETE
273271 pipeline_job .percent_complete = 100
272+ workflow_callbacks .on_log (
273+ message = f"Indexing pipeline complete for index'{ index_name } '." ,
274+ details = {
275+ "index" : index_name ,
276+ "storage_name" : storage_name ,
277+ "status_message" : "indexing pipeline complete" ,
278+ },
279+ )
274280
275281 pipeline_job .progress = (
276282 f"{ len (pipeline_job .completed_workflows )} out of "
277283 f"{ len (pipeline_job .all_workflows )} workflows completed successfully."
278284 )
279285
280- workflow_callbacks .on_log (
281- message = f"Indexing pipeline complete for index'{ index_name } '." ,
282- details = {
283- "index" : index_name ,
284- "storage_name" : storage_name ,
285- "status_message" : "indexing pipeline complete" ,
286- },
287- )
288-
289286 del workflow_callbacks # garbage collect
290287 if pipeline_job .status == PipelineJobState .FAILED :
291288 exit (1 ) # signal to AKS that indexing job failed
0 commit comments