@@ -102,6 +102,23 @@ async def setup_indexing_pipeline(
102102 detail = f"Data container '{ storage_name } ' does not exist." ,
103103 )
104104
105+ # check for prompts
106+ entity_extraction_prompt_content = (
107+ entity_extraction_prompt .file .read ().decode ("utf-8" )
108+ if entity_extraction_prompt
109+ else None
110+ )
111+ community_report_prompt_content = (
112+ community_report_prompt .file .read ().decode ("utf-8" )
113+ if community_report_prompt
114+ else None
115+ )
116+ summarize_descriptions_prompt_content = (
117+ summarize_descriptions_prompt .file .read ().decode ("utf-8" )
118+ if summarize_descriptions_prompt
119+ else None
120+ )
121+
105122 # check for existing index job
106123 # it is okay if job doesn't exist, but if it does,
107124 # it must not be scheduled or running
@@ -117,46 +134,29 @@ async def setup_indexing_pipeline(
117134 # if indexing job is in a failed state, delete the associated K8s job and pod to allow for a new job to be scheduled
118135 if PipelineJobState (existing_job .status ) == PipelineJobState .FAILED :
119136 _delete_k8s_job (f"indexing-job-{ sanitized_index_name } " , "graphrag" )
120-
121- # reset the job to scheduled state
122- existing_job .status = PipelineJobState .SCHEDULED
123- existing_job .percent_complete = 0
124- existing_job .progress = ""
125- existing_job .all_workflows = existing_job .completed_workflows = (
126- existing_job .failed_workflows
137+ # reset the pipeline job details
138+ existing_job ._status = PipelineJobState .SCHEDULED
139+ existing_job ._percent_complete = 0
140+ existing_job ._progress = ""
141+ existing_job ._all_workflows = existing_job ._completed_workflows = (
142+ existing_job ._failed_workflows
127143 ) = []
128- existing_job .entity_extraction_prompt = None
129- existing_job .community_report_prompt = None
130- existing_job .summarize_descriptions_prompt = None
131-
132- # create or update state in cosmos db
133- entity_extraction_prompt_content = (
134- entity_extraction_prompt .file .read ().decode ("utf-8" )
135- if entity_extraction_prompt
136- else None
137- )
138- community_report_prompt_content = (
139- community_report_prompt .file .read ().decode ("utf-8" )
140- if community_report_prompt
141- else None
142- )
143- summarize_descriptions_prompt_content = (
144- summarize_descriptions_prompt .file .read ().decode ("utf-8" )
145- if summarize_descriptions_prompt
146- else None
147- )
148- print (f"ENTITY EXTRACTION PROMPT:\n { entity_extraction_prompt_content } " )
149- print (f"COMMUNITY REPORT PROMPT:\n { community_report_prompt_content } " )
150- print (f"SUMMARIZE DESCRIPTIONS PROMPT:\n { summarize_descriptions_prompt_content } " )
151- pipelinejob .create_item (
152- id = sanitized_index_name ,
153- index_name = sanitized_index_name ,
154- storage_name = sanitized_storage_name ,
155- entity_extraction_prompt = entity_extraction_prompt_content ,
156- community_report_prompt = community_report_prompt_content ,
157- summarize_descriptions_prompt = summarize_descriptions_prompt_content ,
158- status = PipelineJobState .SCHEDULED ,
159- )
144+ existing_job ._entity_extraction_prompt = entity_extraction_prompt_content
145+ existing_job ._community_report_prompt = community_report_prompt_content
146+ existing_job ._summarize_descriptions_prompt = (
147+ summarize_descriptions_prompt_content
148+ )
149+ existing_job .update_db ()
150+ else :
151+ pipelinejob .create_item (
152+ id = sanitized_index_name ,
153+ index_name = sanitized_index_name ,
154+ storage_name = sanitized_storage_name ,
155+ entity_extraction_prompt = entity_extraction_prompt_content ,
156+ community_report_prompt = community_report_prompt_content ,
157+ summarize_descriptions_prompt = summarize_descriptions_prompt_content ,
158+ status = PipelineJobState .SCHEDULED ,
159+ )
160160
161161 """
162162 At this point, we know:
@@ -167,7 +167,6 @@ async def setup_indexing_pipeline(
167167 # update or create new item in container-store in cosmosDB
168168 if not _blob_service_client .get_container_client (sanitized_index_name ).exists ():
169169 _blob_service_client .create_container (sanitized_index_name )
170-
171170 container_store_client = get_database_container_client (
172171 database_name = "graphrag" , container_name = "container-store"
173172 )
@@ -221,9 +220,7 @@ async def setup_indexing_pipeline(
221220 )
222221
223222
224- async def _start_indexing_pipeline (
225- index_name : str
226- ):
223+ async def _start_indexing_pipeline (index_name : str ):
227224 # get sanitized name
228225 sanitized_index_name = sanitize_name (index_name )
229226
@@ -265,41 +262,29 @@ async def _start_indexing_pipeline(
265262 )
266263
267264 # set prompts for entity extraction, community report, and summarize descriptions.
268- # an environment variable is set to the file path of the prompt
269265 if pipeline_job .entity_extraction_prompt :
270266 fname = "entity-extraction-prompt.txt"
271267 with open (fname , "w" ) as outfile :
272268 outfile .write (pipeline_job .entity_extraction_prompt )
273- os .environ ["GRAPHRAG_ENTITY_EXTRACTION_PROMPT_FILE" ] = fname
274- # data["entity_extraction"]["prompt"] = fname
275- # else:
276- # data["entity_extraction"]["prompt"] = None
269+ data ["entity_extraction" ]["prompt" ] = fname
270+ else :
271+ data .pop ("entity_extraction" )
277272 if pipeline_job .community_report_prompt :
278273 fname = "community-report-prompt.txt"
279274 with open (fname , "w" ) as outfile :
280275 outfile .write (pipeline_job .community_report_prompt )
281- os .environ ["GRAPHRAG_COMMUNITY_REPORT_PROMPT_FILE" ] = fname
282- # data["community_reports"]["prompt"] = fname
283- # else:
284- # data["community_reports"]["prompt"] = None
276+ data ["community_reports" ]["prompt" ] = fname
277+ else :
278+ data .pop ("community_reports" )
285279 if pipeline_job .summarize_descriptions_prompt :
286280 fname = "summarize-descriptions-prompt.txt"
287281 with open (fname , "w" ) as outfile :
288282 outfile .write (pipeline_job .summarize_descriptions_prompt )
289- os .environ ["GRAPHRAG_SUMMARIZE_DESCRIPTIONS_PROMPT_FILE" ] = fname
290- # data["summarize_descriptions"]["prompt"] = fname
291- # else:
292- # data["summarize_descriptions"]["prompt"] = None
293-
294- # set placeholder values to None if they have not been set
295- # if data["entity_extraction"]["prompt"] == "PLACEHOLDER":
296- # data["entity_extraction"]["prompt"] = None
297- # if data["community_reports"]["prompt"] == "PLACEHOLDER":
298- # data["community_reports"]["prompt"] = None
299- # if data["summarize_descriptions"]["prompt"] == "PLACEHOLDER":
300- # data["summarize_descriptions"]["prompt"] = None
301-
302- # generate the default pipeline from default parameters and override with custom settings
283+ data ["summarize_descriptions" ]["prompt" ] = fname
284+ else :
285+ data .pop ("summarize_descriptions" )
286+
287+ # generate the default pipeline and override with custom settings
303288 parameters = create_graphrag_config (data , "." )
304289 pipeline_config = create_pipeline_config (parameters , True )
305290
@@ -316,11 +301,6 @@ async def _start_indexing_pipeline(
316301 PipelineJobWorkflowCallbacks (pipeline_job )
317302 )
318303
319- # print("#################### PIPELINE JOB:")
320- # pprint(pipeline_job.dump_model())
321- print ("#################### PIPELINE CONFIG:" )
322- print (pipeline_config )
323-
324304 # run the pipeline
325305 try :
326306 async for workflow_result in run_pipeline_with_config (
0 commit comments