Skip to content

Commit f1adbdf

Browse files
fix: workflow tool in react agent resume once in one agent run
1 parent 6fa2acf commit f1adbdf

File tree

5 files changed

+21
-5
lines changed

5 files changed

+21
-5
lines changed

backend/domain/workflow/entity/interrupt_event.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ type InterruptEvent struct {
3535
NodeIcon string `json:"node_icon,omitempty"`
3636
EventType InterruptEventType `json:"event_type"`
3737
NodePath []string `json:"node_path,omitempty"`
38+
Popped bool `json:"popped,omitempty"`
3839

3940
// index within composite node -> interrupt info for that index
4041
// TODO: separate the following fields with InterruptEvent
@@ -60,6 +61,7 @@ type ResumeRequest struct {
6061
ExecuteID int64
6162
EventID int64
6263
ResumeData string
64+
Resumed bool
6365
}
6466

6567
func (r *ResumeRequest) GetResumeID() string {

backend/domain/workflow/internal/compose/workflow_tool.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,16 @@ func (i *invokableWorkflow) Info(_ context.Context) (*schema.ToolInfo, error) {
6767
return i.info, nil
6868
}
6969

70+
func resumeOnce(rInfo *entity.ResumeRequest, callID string, allIEs map[string]*entity.ToolInterruptEvent) {
71+
if rInfo != nil {
72+
rInfo.Resumed = true
73+
}
74+
75+
if allIEs != nil {
76+
delete(allIEs, callID)
77+
}
78+
}
79+
7080
func (i *invokableWorkflow) InvokableRun(ctx context.Context, argumentsInJSON string, opts ...tool.Option) (string, error) {
7181
rInfo, allIEs := execute.GetResumeRequest(opts...)
7282
var (
@@ -88,9 +98,10 @@ func (i *invokableWorkflow) InvokableRun(ctx context.Context, argumentsInJSON st
8898
}
8999

90100
cfg := execute.GetExecuteConfig(opts...)
101+
defer resumeOnce(rInfo, callID, allIEs)
91102

92103
var runOpts []WorkflowRunnerOption
93-
if rInfo != nil {
104+
if rInfo != nil && !rInfo.Resumed {
94105
runOpts = append(runOpts, WithResumeReq(rInfo))
95106
} else {
96107
runOpts = append(runOpts, WithInput(argumentsInJSON))
@@ -237,9 +248,10 @@ func (s *streamableWorkflow) StreamableRun(ctx context.Context, argumentsInJSON
237248
}
238249

239250
cfg := execute.GetExecuteConfig(opts...)
251+
defer resumeOnce(rInfo, callID, allIEs)
240252

241253
var runOpts []WorkflowRunnerOption
242-
if rInfo != nil {
254+
if rInfo != nil && !rInfo.Resumed {
243255
runOpts = append(runOpts, WithResumeReq(rInfo))
244256
} else {
245257
runOpts = append(runOpts, WithInput(argumentsInJSON))

backend/domain/workflow/internal/execute/context.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ import (
3636
)
3737

3838
type Context struct {
39-
RootCtx
39+
*RootCtx
4040

4141
*SubWorkflowCtx
4242

@@ -226,7 +226,7 @@ func PrepareRootExeCtx(ctx context.Context, h *WorkflowHandler) (context.Context
226226
}
227227

228228
rootExeCtx := &Context{
229-
RootCtx: RootCtx{
229+
RootCtx: &RootCtx{
230230
RootWorkflowBasic: h.rootWorkflowBasic,
231231
RootExecuteID: h.rootExecuteID,
232232
ResumeEvent: h.resumeEvent,

backend/domain/workflow/internal/execute/event_handle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,7 +283,7 @@ func handleEvent(ctx context.Context, event *Event, repo workflow.Repository,
283283
return noTerminate, fmt.Errorf("failed to update workflow execution to interrupted for execution id %d, current status is %v", exeID, currentStatus)
284284
}
285285

286-
if event.RootCtx.ResumeEvent != nil {
286+
if event.RootCtx.ResumeEvent != nil && !event.RootCtx.ResumeEvent.Popped {
287287
needPop := false
288288
for _, ie := range event.InterruptEvents {
289289
if ie.NodeKey == event.RootCtx.ResumeEvent.NodeKey {

backend/domain/workflow/internal/nodes/llm/llm.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -971,6 +971,8 @@ func (l *LLM) prepare(ctx context.Context, _ map[string]any, opts ...nodes.NodeO
971971
return ctx
972972
}
973973

974+
c.RootCtx.ResumeEvent.Popped = true
975+
974976
return ctx
975977
},
976978
}).Handler()

0 commit comments

Comments
 (0)