44import asyncio
55import inspect
66import os
7+ import traceback
78from typing import cast
89
910import yaml
@@ -98,7 +99,7 @@ async def setup_indexing_pipeline(
9899 if not _blob_service_client .get_container_client (sanitized_storage_name ).exists ():
99100 raise HTTPException (
100101 status_code = 500 ,
101- detail = f"Data container ' { storage_name } ' does not exist. " ,
102+ detail = f"Storage blob container { storage_name } does not exist" ,
102103 )
103104
104105 # check for prompts
@@ -202,7 +203,7 @@ async def setup_indexing_pipeline(
202203 status_code = 500 ,
203204 detail = "exception when calling BatchV1Api->create_namespaced_job" ,
204205 )
205- return BaseResponse (status = "indexing operation has been scheduled. " )
206+ return BaseResponse (status = "Indexing operation scheduled" )
206207 except Exception :
207208 reporter = ReporterSingleton ().get_instance ()
208209 job_details = {
@@ -215,7 +216,7 @@ async def setup_indexing_pipeline(
215216 )
216217 raise HTTPException (
217218 status_code = 500 ,
218- detail = f"Error occurred during setup of indexing job for ' { index_name } '. " ,
219+ detail = f"Error occurred during setup of indexing job for index { index_name } " ,
219220 )
220221
221222
@@ -232,19 +233,6 @@ async def _start_indexing_pipeline(index_name: str):
232233 # download nltk dependencies
233234 bootstrap ()
234235
235- # create new reporters/callbacks just for this job
236- reporters = []
237- reporter_names = os .getenv ("REPORTERS" , Reporters .CONSOLE .name .upper ()).split ("," )
238- for reporter_name in reporter_names :
239- try :
240- reporters .append (Reporters [reporter_name .upper ()])
241- except KeyError :
242- raise ValueError (f"Found unknown reporter: { reporter_name } " )
243-
244- workflow_callbacks = load_pipeline_reporter (
245- reporting_dir = sanitized_index_name , reporters = reporters
246- )
247-
248236 # load custom pipeline settings
249237 this_directory = os .path .dirname (
250238 os .path .abspath (inspect .getfile (inspect .currentframe ()))
@@ -295,7 +283,22 @@ async def _start_indexing_pipeline(index_name: str):
295283 for workflow in pipeline_config .workflows :
296284 pipeline_job .all_workflows .append (workflow .name )
297285
298- # add pipeline_job callback to the callback manager
286+ # create new reporters/callbacks just for this job
287+ reporters = []
288+ reporter_names = os .getenv ("REPORTERS" , Reporters .CONSOLE .name .upper ()).split ("," )
289+ for reporter_name in reporter_names :
290+ try :
291+ reporters .append (Reporters [reporter_name .upper ()])
292+ except KeyError :
293+ raise ValueError (f"Unknown reporter type: { reporter_name } " )
294+ workflow_callbacks = load_pipeline_reporter (
295+ index_name = index_name ,
296+ num_workflow_steps = len (pipeline_job .all_workflows ),
297+ reporting_dir = sanitized_index_name ,
298+ reporters = reporters ,
299+ )
300+
301+ # add pipeline job callback to the callback manager
299302 cast (WorkflowCallbacksManager , workflow_callbacks ).register (
300303 PipelineJobWorkflowCallbacks (pipeline_job )
301304 )
@@ -312,6 +315,7 @@ async def _start_indexing_pipeline(index_name: str):
312315 # if the workflow failed, record the failure
313316 pipeline_job .failed_workflows .append (workflow_result .workflow )
314317 pipeline_job .update_db ()
318+ # TODO: exit early if a workflow fails and add more detailed error logging
315319
316320 # if job is done, check if any workflow steps failed
317321 if len (pipeline_job .failed_workflows ) > 0 :
@@ -327,38 +331,43 @@ async def _start_indexing_pipeline(index_name: str):
327331 )
328332
329333 workflow_callbacks .on_log (
330- f"Index Name: { index_name } , Container Name: { storage_name } \n " ,
331- details = {"status_message" : "Indexing pipeline complete." },
334+ message = f"Indexing pipeline complete for index { index_name } ." ,
335+ details = {
336+ "index" : index_name ,
337+ "storage_name" : storage_name ,
338+ "status_message" : "indexing pipeline complete" ,
339+ },
332340 )
333341
334342 del workflow_callbacks # garbage collect
335343 if pipeline_job .status == PipelineJobState .FAILED :
336344 exit (1 ) # signal to AKS that indexing job failed
337345
338- except Exception :
346+ except Exception as e :
339347 pipeline_job .status = PipelineJobState .FAILED
340348
341349 # update failed state in cosmos db
342350 error_details = {
343- "error_message" : "Indexing pipeline failed." ,
351+ "index" : index_name ,
352+ "storage_name" : storage_name ,
344353 }
345354 # log error in local index directory logs
346355 workflow_callbacks .on_error (
347- message = f"Index Name: { index_name } , Container Name: { storage_name } \n " ,
348- cause = None ,
349- stack = None ,
356+ message = f"Indexing pipeline failed for index { index_name } . " ,
357+ cause = e ,
358+ stack = traceback . format_exc () ,
350359 details = error_details ,
351360 )
352361 # log error in global index directory logs
353362 reporter .on_error (
354- f"Index Name: { index_name } , Container Name: { storage_name } \n { str ( e ) } \n " ,
355- cause = str ( e ) ,
356- stack = None ,
363+ message = f"Indexing pipeline failed for index { index_name } . " ,
364+ cause = e ,
365+ stack = traceback . format_exc () ,
357366 details = error_details ,
358367 )
359368 raise HTTPException (
360369 status_code = 500 ,
361- detail = f"Error occurred during indexing job for index ' { index_name } ' ." ,
370+ detail = f"Error encountered during indexing job for index { index_name } ." ,
362371 )
363372
364373
@@ -437,8 +446,8 @@ def _delete_k8s_job(job_name: str, namespace: str) -> None:
437446 batch_v1 .delete_namespaced_job (name = job_name , namespace = namespace )
438447 except Exception :
439448 reporter .on_error (
440- f"Error deleting k8s job { job_name } ." ,
441- details = {"Container " : job_name },
449+ message = f"Error deleting k8s job { job_name } ." ,
450+ details = {"container " : job_name },
442451 )
443452 pass
444453 try :
@@ -448,8 +457,8 @@ def _delete_k8s_job(job_name: str, namespace: str) -> None:
448457 core_v1 .delete_namespaced_pod (job_pod , namespace = namespace )
449458 except Exception :
450459 reporter .on_error (
451- f"Error deleting k8s pod for job { job_name } ." ,
452- details = {"Container " : job_name },
460+ message = f"Error deleting k8s pod for job { job_name } ." ,
461+ details = {"container " : job_name },
453462 )
454463 pass
455464
0 commit comments