Skip to content

Commit f414d24

Browse files
authored
Feat/iteration single run time (langgenius#10512)
1 parent 0c1307b commit f414d24

File tree

16 files changed

+101
-29
lines changed

16 files changed

+101
-29
lines changed

api/core/app/apps/workflow_app_runner.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -361,6 +361,7 @@ def _handle_event(self, workflow_entry: WorkflowEntry, event: GraphEngineEvent)
361361
node_run_index=workflow_entry.graph_engine.graph_runtime_state.node_run_steps,
362362
output=event.pre_iteration_output,
363363
parallel_mode_run_id=event.parallel_mode_run_id,
364+
duration=event.duration,
364365
)
365366
)
366367
elif isinstance(event, (IterationRunSucceededEvent | IterationRunFailedEvent)):

api/core/app/entities/queue_entities.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ class QueueIterationNextEvent(AppQueueEvent):
111111
"""iteratoin run in parallel mode run id"""
112112
node_run_index: int
113113
output: Optional[Any] = None # output for the current iteration
114+
duration: Optional[float] = None
114115

115116
@field_validator("output", mode="before")
116117
@classmethod
@@ -307,6 +308,8 @@ class QueueNodeSucceededEvent(AppQueueEvent):
307308
execution_metadata: Optional[dict[NodeRunMetadataKey, Any]] = None
308309

309310
error: Optional[str] = None
311+
"""single iteration duration map"""
312+
iteration_duration_map: Optional[dict[str, float]] = None
310313

311314

312315
class QueueNodeInIterationFailedEvent(AppQueueEvent):

api/core/app/entities/task_entities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -434,6 +434,7 @@ class Data(BaseModel):
434434
parallel_id: Optional[str] = None
435435
parallel_start_node_id: Optional[str] = None
436436
parallel_mode_run_id: Optional[str] = None
437+
duration: Optional[float] = None
437438

438439
event: StreamEvent = StreamEvent.ITERATION_NEXT
439440
workflow_run_id: str

api/core/app/task_pipeline/workflow_cycle_manage.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -624,6 +624,7 @@ def _workflow_iteration_next_to_stream_response(
624624
parallel_id=event.parallel_id,
625625
parallel_start_node_id=event.parallel_start_node_id,
626626
parallel_mode_run_id=event.parallel_mode_run_id,
627+
duration=event.duration,
627628
),
628629
)
629630

api/core/workflow/entities/node_entities.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ class NodeRunMetadataKey(str, Enum):
2424
PARENT_PARALLEL_ID = "parent_parallel_id"
2525
PARENT_PARALLEL_START_NODE_ID = "parent_parallel_start_node_id"
2626
PARALLEL_MODE_RUN_ID = "parallel_mode_run_id"
27+
ITERATION_DURATION_MAP = "iteration_duration_map" # single iteration duration if iteration node runs
2728

2829

2930
class NodeRunResult(BaseModel):

api/core/workflow/graph_engine/entities/event.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ class IterationRunStartedEvent(BaseIterationEvent):
148148
class IterationRunNextEvent(BaseIterationEvent):
149149
index: int = Field(..., description="index")
150150
pre_iteration_output: Optional[Any] = Field(None, description="pre iteration output")
151+
duration: Optional[float] = Field(None, description="duration")
151152

152153

153154
class IterationRunSucceededEvent(BaseIterationEvent):
@@ -156,6 +157,7 @@ class IterationRunSucceededEvent(BaseIterationEvent):
156157
outputs: Optional[dict[str, Any]] = None
157158
metadata: Optional[dict[str, Any]] = None
158159
steps: int = 0
160+
iteration_duration_map: Optional[dict[str, float]] = None
159161

160162

161163
class IterationRunFailedEvent(BaseIterationEvent):

api/core/workflow/nodes/iteration/iteration_node.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
156156
index=0,
157157
pre_iteration_output=None,
158158
)
159+
iter_run_map: dict[str, float] = {}
159160
outputs: list[Any] = [None] * len(iterator_list_value)
160161
try:
161162
if self.node_data.is_parallel:
@@ -175,6 +176,7 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
175176
iteration_graph,
176177
index,
177178
item,
179+
iter_run_map,
178180
)
179181
future.add_done_callback(thread_pool.task_done_callback)
180182
futures.append(future)
@@ -213,6 +215,7 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
213215
start_at,
214216
graph_engine,
215217
iteration_graph,
218+
iter_run_map,
216219
)
217220
if self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
218221
outputs = [output for output in outputs if output is not None]
@@ -230,7 +233,9 @@ def _run(self) -> Generator[NodeEvent | InNodeEvent, None, None]:
230233

231234
yield RunCompletedEvent(
232235
run_result=NodeRunResult(
233-
status=WorkflowNodeExecutionStatus.SUCCEEDED, outputs={"output": jsonable_encoder(outputs)}
236+
status=WorkflowNodeExecutionStatus.SUCCEEDED,
237+
outputs={"output": jsonable_encoder(outputs)},
238+
metadata={NodeRunMetadataKey.ITERATION_DURATION_MAP: iter_run_map},
234239
)
235240
)
236241
except IterationNodeError as e:
@@ -356,15 +361,19 @@ def _run_single_iter(
356361
start_at: datetime,
357362
graph_engine: "GraphEngine",
358363
iteration_graph: Graph,
364+
iter_run_map: dict[str, float],
359365
parallel_mode_run_id: Optional[str] = None,
360366
) -> Generator[NodeEvent | InNodeEvent, None, None]:
361367
"""
362368
run single iteration
363369
"""
370+
iter_start_at = datetime.now(timezone.utc).replace(tzinfo=None)
371+
364372
try:
365373
rst = graph_engine.run()
366374
# get current iteration index
367375
current_index = variable_pool.get([self.node_id, "index"]).value
376+
iteration_run_id = parallel_mode_run_id if parallel_mode_run_id is not None else f"{current_index}"
368377
next_index = int(current_index) + 1
369378

370379
if current_index is None:
@@ -431,6 +440,8 @@ def _run_single_iter(
431440
variable_pool.add([self.node_id, "index"], next_index)
432441
if next_index < len(iterator_list_value):
433442
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
443+
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
444+
iter_run_map[iteration_run_id] = duration
434445
yield IterationRunNextEvent(
435446
iteration_id=self.id,
436447
iteration_node_id=self.node_id,
@@ -439,6 +450,7 @@ def _run_single_iter(
439450
index=next_index,
440451
parallel_mode_run_id=parallel_mode_run_id,
441452
pre_iteration_output=None,
453+
duration=duration,
442454
)
443455
return
444456
elif self.node_data.error_handle_mode == ErrorHandleMode.REMOVE_ABNORMAL_OUTPUT:
@@ -449,6 +461,8 @@ def _run_single_iter(
449461

450462
if next_index < len(iterator_list_value):
451463
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
464+
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
465+
iter_run_map[iteration_run_id] = duration
452466
yield IterationRunNextEvent(
453467
iteration_id=self.id,
454468
iteration_node_id=self.node_id,
@@ -457,6 +471,7 @@ def _run_single_iter(
457471
index=next_index,
458472
parallel_mode_run_id=parallel_mode_run_id,
459473
pre_iteration_output=None,
474+
duration=duration,
460475
)
461476
return
462477
elif self.node_data.error_handle_mode == ErrorHandleMode.TERMINATED:
@@ -485,6 +500,8 @@ def _run_single_iter(
485500

486501
if next_index < len(iterator_list_value):
487502
variable_pool.add([self.node_id, "item"], iterator_list_value[next_index])
503+
duration = (datetime.now(timezone.utc).replace(tzinfo=None) - iter_start_at).total_seconds()
504+
iter_run_map[iteration_run_id] = duration
488505
yield IterationRunNextEvent(
489506
iteration_id=self.id,
490507
iteration_node_id=self.node_id,
@@ -493,6 +510,7 @@ def _run_single_iter(
493510
index=next_index,
494511
parallel_mode_run_id=parallel_mode_run_id,
495512
pre_iteration_output=jsonable_encoder(current_iteration_output) if current_iteration_output else None,
513+
duration=duration,
496514
)
497515

498516
except IterationNodeError as e:
@@ -528,6 +546,7 @@ def _run_single_iter_parallel(
528546
iteration_graph: Graph,
529547
index: int,
530548
item: Any,
549+
iter_run_map: dict[str, float],
531550
) -> Generator[NodeEvent | InNodeEvent, None, None]:
532551
"""
533552
run single iteration in parallel mode
@@ -546,6 +565,7 @@ def _run_single_iter_parallel(
546565
start_at=start_at,
547566
graph_engine=graph_engine_copy,
548567
iteration_graph=iteration_graph,
568+
iter_run_map=iter_run_map,
549569
parallel_mode_run_id=parallel_mode_run_id,
550570
):
551571
q.put(event)

web/app/components/workflow/hooks/use-workflow-run.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,6 +445,7 @@ export const useWorkflowRun = () => {
445445
...data,
446446
status: NodeRunningStatus.Running,
447447
details: [],
448+
iterDurationMap: {},
448449
} as any)
449450
}))
450451

@@ -496,6 +497,8 @@ export const useWorkflowRun = () => {
496497
setWorkflowRunningData(produce(workflowRunningData!, (draft) => {
497498
const iteration = draft.tracing!.find(trace => trace.node_id === data.node_id)
498499
if (iteration) {
500+
if (iteration.iterDurationMap && data.duration)
501+
iteration.iterDurationMap[data.parallel_mode_run_id ?? `${data.index - 1}`] = data.duration
499502
if (iteration.details!.length >= iteration.metadata.iterator_length!)
500503
return
501504
}

web/app/components/workflow/nodes/_base/node.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -193,7 +193,7 @@ const BaseNode: FC<BaseNodeProps> = ({
193193
{
194194
data._iterationLength && data._iterationIndex && data._runningStatus === NodeRunningStatus.Running && (
195195
<div className='mr-1.5 text-xs font-medium text-primary-600'>
196-
{data._iterationIndex}/{data._iterationLength}
196+
{data._iterationIndex > data._iterationLength ? data._iterationLength : data._iterationIndex}/{data._iterationLength}
197197
</div>
198198
)
199199
}

web/app/components/workflow/panel/workflow-preview.tsx

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import IterationResultPanel from '../run/iteration-result-panel'
2828
import InputsPanel from './inputs-panel'
2929
import cn from '@/utils/classnames'
3030
import Loading from '@/app/components/base/loading'
31-
import type { NodeTracing } from '@/types/workflow'
31+
import type { IterationDurationMap, NodeTracing } from '@/types/workflow'
3232

3333
const WorkflowPreview = () => {
3434
const { t } = useTranslation()
@@ -53,12 +53,14 @@ const WorkflowPreview = () => {
5353
}, [workflowRunningData])
5454

5555
const [iterationRunResult, setIterationRunResult] = useState<NodeTracing[][]>([])
56+
const [iterDurationMap, setIterDurationMap] = useState<IterationDurationMap>({})
5657
const [isShowIterationDetail, {
5758
setTrue: doShowIterationDetail,
5859
setFalse: doHideIterationDetail,
5960
}] = useBoolean(false)
6061

61-
const handleShowIterationDetail = useCallback((detail: NodeTracing[][]) => {
62+
const handleShowIterationDetail = useCallback((detail: NodeTracing[][], iterationDurationMap: IterationDurationMap) => {
63+
setIterDurationMap(iterationDurationMap)
6264
setIterationRunResult(detail)
6365
doShowIterationDetail()
6466
}, [doShowIterationDetail])
@@ -72,6 +74,7 @@ const WorkflowPreview = () => {
7274
list={iterationRunResult}
7375
onHide={doHideIterationDetail}
7476
onBack={doHideIterationDetail}
77+
iterDurationMap={iterDurationMap}
7578
/>
7679
</div>
7780
)
@@ -94,6 +97,7 @@ const WorkflowPreview = () => {
9497
list={iterationRunResult}
9598
onHide={doHideIterationDetail}
9699
onBack={doHideIterationDetail}
100+
iterDurationMap={iterDurationMap}
97101
/>
98102
)
99103
: (

0 commit comments

Comments
 (0)