@@ -123,7 +123,8 @@ async def _run_pipeline(
123123 last_workflow = "<startup>"
124124
125125 try :
126- await _dump_json (context )
126+ await _dump_stats_json (context )
127+ await _dump_context_json (context )
127128
128129 logger .info ("Executing pipeline..." )
129130 for name , workflow_function in pipeline .run ():
@@ -138,13 +139,15 @@ async def _run_pipeline(
138139 workflow = name , result = result .result , state = context .state , error = None
139140 )
140141 context .stats .workflows [name ] = profiler .metrics
142+ await _dump_stats_json (context )
141143 if result .stop :
142144 logger .info ("Halting pipeline at workflow request" )
143145 break
144146
145147 context .stats .total_runtime = time .time () - start_time
146148 logger .info ("Indexing pipeline complete." )
147- await _dump_json (context )
149+ await _dump_stats_json (context )
150+ await _dump_context_json (context )
148151
149152 except Exception as e :
150153 logger .exception ("error running workflow %s" , last_workflow )
@@ -153,11 +156,15 @@ async def _run_pipeline(
153156 )
154157
155158
156- async def _dump_json (context : PipelineRunContext ) -> None :
157- """Dump the stats and context state to the storage."""
159+ async def _dump_stats_json (context : PipelineRunContext ) -> None :
160+ """Dump stats state to storage."""
158161 await context .output_storage .set (
159162 "stats.json" , json .dumps (asdict (context .stats ), indent = 4 , ensure_ascii = False )
160163 )
164+
165+
166+ async def _dump_context_json (context : PipelineRunContext ) -> None :
167+ """Dump context state to storage."""
161168 # Dump context state, excluding additional_context
162169 temp_context = context .state .pop (
163170 "additional_context" , None
0 commit comments