Skip to content

Commit 7db8649

Browse files
committed
optimize worker notifications
1 parent bf4835d commit 7db8649

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

backend/monoprocess/monoprocess.go

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -94,20 +94,20 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
9494
if err := b.Backend.CompleteWorkflowTask(ctx, task, instance, state, executedEvents, activityEvents, timerEvents, workflowEvents); err != nil {
9595
return err
9696
}
97+
98+
hasWorkflowTasks := false
99+
hasActivityTasks := false
100+
97101
for _, e := range executedEvents {
98-
if e.Type != history.EventType_WorkflowTaskStarted {
99-
continue
100-
}
101-
if !b.notifyWorkflowWorker(ctx) {
102-
break // no reason to notify more, queue is full
102+
if e.Type == history.EventType_WorkflowTaskStarted {
103+
hasWorkflowTasks = true
104+
break
103105
}
104106
}
105107
for _, e := range activityEvents {
106-
if e.Type != history.EventType_ActivityScheduled {
107-
continue
108-
}
109-
if !b.notifyActivityWorker(ctx) {
110-
break // no reason to notify more, queue is full
108+
if e.Type == history.EventType_ActivityScheduled {
109+
hasActivityTasks = true
110+
break
111111
}
112112
}
113113
for _, e := range timerEvents {
@@ -123,15 +123,21 @@ func (b *monoprocessBackend) CompleteWorkflowTask(
123123
time.AfterFunc(attr.At.Sub(time.Now()), func() { b.notifyWorkflowWorker(context.Background()) })
124124
}
125125
for _, e := range workflowEvents {
126-
if e.HistoryEvent.Type != history.EventType_WorkflowExecutionStarted &&
127-
e.HistoryEvent.Type != history.EventType_SubWorkflowCompleted &&
128-
e.HistoryEvent.Type != history.EventType_WorkflowExecutionCanceled {
129-
continue
130-
}
131-
if !b.notifyWorkflowWorker(ctx) {
132-
break // no reason to notify more, queue is full
126+
if e.HistoryEvent.Type == history.EventType_WorkflowExecutionStarted ||
127+
e.HistoryEvent.Type == history.EventType_SubWorkflowCompleted ||
128+
e.HistoryEvent.Type == history.EventType_WorkflowExecutionCanceled {
129+
hasWorkflowTasks = true
130+
break
133131
}
134132
}
133+
134+
// notify workers about potential new tasks
135+
if hasWorkflowTasks {
136+
b.notifyWorkflowWorker(ctx)
137+
}
138+
if hasActivityTasks {
139+
b.notifyActivityWorker(ctx)
140+
}
135141
return nil
136142
}
137143

0 commit comments

Comments
 (0)