Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion service/history/api/create_workflow_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func NewWorkflowWithSignal(
signalWithStartRequest.GetSignalInput(),
signalWithStartRequest.GetIdentity(),
signalWithStartRequest.GetHeader(),
signalWithStartRequest.GetSkipGenerateWorkflowTask(),
signalWithStartRequest.GetLinks(),
); err != nil {
return nil, err
Expand Down
17 changes: 1 addition & 16 deletions service/history/api/respondworkflowtaskcompleted/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -472,16 +472,11 @@ func (handler *WorkflowTaskCompletedHandler) Invoke(
}
}

bufferedEventShouldCreateNewTask := hasBufferedEventsOrMessages && ms.HasAnyBufferedEvent(eventShouldGenerateNewTaskFilter)
if hasBufferedEventsOrMessages && !bufferedEventShouldCreateNewTask {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there is buffer events, we should still flush those events?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Buffered events are flushed when new WFT is scheduled. This code was just to flush them if new WFT will not be created.

// Make sure tasks that should not create a new event don't get stuck in ms forever
ms.FlushBufferedEvents()
}
newWorkflowTaskType := enumsspb.WORKFLOW_TASK_TYPE_UNSPECIFIED
if ms.IsWorkflowExecutionRunning() {
if request.GetForceCreateNewWorkflowTask() || // Heartbeat WT is always of Normal type.
wtFailedShouldCreateNewTask ||
bufferedEventShouldCreateNewTask ||
hasBufferedEventsOrMessages ||
activityNotStartedCancelled {

newWorkflowTaskType = enumsspb.WORKFLOW_TASK_TYPE_NORMAL
Expand Down Expand Up @@ -1039,13 +1034,3 @@ func (handler *WorkflowTaskCompletedHandler) clearStickyTaskQueue(ctx context.Co
}
return nil
}

// Filter function to be passed to mutable_state.HasAnyBufferedEvent
// Returns true if the event should generate a new workflow task
// Currently only signal events with SkipGenerateWorkflowTask=true flag set do not generate tasks
func eventShouldGenerateNewTaskFilter(event *historypb.HistoryEvent) bool {
if event.GetEventType() != enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED {
return true
}
return !event.GetWorkflowExecutionSignaledEventAttributes().GetSkipGenerateWorkflowTask()
}
Original file line number Diff line number Diff line change
Expand Up @@ -316,14 +316,13 @@ func signalWorkflow(
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
); err != nil {
return err
}

// Create a transfer task to schedule a workflow task
if !mutableState.HasPendingWorkflowTask() && !request.GetSkipGenerateWorkflowTask() {
if !mutableState.HasPendingWorkflowTask() {

executionInfo := mutableState.GetExecutionInfo()
executionState := mutableState.GetExecutionState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
s.currentMutableState,
)
request := s.randomRequest()
request.SkipGenerateWorkflowTask = false

s.currentMutableState.EXPECT().IsWorkflowCloseAttempted().Return(false)
s.currentMutableState.EXPECT().IsSignalRequested(request.GetRequestId()).Return(false)
Expand All @@ -166,7 +165,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NewWorkflowTask() {
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
).Return(&history.HistoryEvent{}, nil)
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(false)
Expand Down Expand Up @@ -200,7 +198,6 @@ func (s *signalWithStartWorkflowSuite) TestSignalWorkflow_NoNewWorkflowTask() {
request.GetSignalInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
request.GetLinks(),
).Return(&history.HistoryEvent{}, nil)
s.currentMutableState.EXPECT().HasPendingWorkflowTask().Return(true)
Expand Down
3 changes: 1 addition & 2 deletions service/history/api/signalworkflow/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func Invoke(
executionInfo := mutableState.GetExecutionInfo()

// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff() && !request.GetSkipGenerateWorkflowTask()
createWorkflowTask := !mutableState.IsWorkflowPendingOnWorkflowTaskBackoff()

if childWorkflowOnly {
parentWorkflowID := executionInfo.ParentWorkflowId
Expand All @@ -110,7 +110,6 @@ func Invoke(
request.GetInput(),
request.GetIdentity(),
request.GetHeader(),
request.GetSkipGenerateWorkflowTask(),
externalWorkflowExecution,
request.GetLinks(),
)
Expand Down
34 changes: 13 additions & 21 deletions service/history/history_engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1712,16 +1712,11 @@ func (s *engineSuite) TestRespondWorkflowTaskCompletedSingleActivityScheduledWor
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_SignalTaskGeneration() {
resp := s.testRespondWorkflowTaskCompletedSignalGeneration(false)
resp := s.testRespondWorkflowTaskCompletedSignalGeneration()
s.NotNil(resp.GetStartedResponse())
}

func (s *engineSuite) TestRespondWorkflowTaskCompleted_SkipSignalTaskGeneration() {
resp := s.testRespondWorkflowTaskCompletedSignalGeneration(true)
s.Nil(resp.GetStartedResponse())
}

func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration(skipGenerateTask bool) *historyservice.RespondWorkflowTaskCompletedResponse {
func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration() *historyservice.RespondWorkflowTaskCompletedResponse {
we := commonpb.WorkflowExecution{
WorkflowId: tests.WorkflowID,
RunId: tests.RunID,
Expand All @@ -1738,13 +1733,12 @@ func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration(skipGener
identity := "testIdentity"

signal := workflowservice.SignalWorkflowExecutionRequest{
Namespace: tests.NamespaceID.String(),
WorkflowExecution: &we,
Identity: identity,
SignalName: "test signal name",
Input: payloads.EncodeString("test input"),
SkipGenerateWorkflowTask: skipGenerateTask,
RequestId: uuid.New(),
Namespace: tests.NamespaceID.String(),
WorkflowExecution: &we,
Identity: identity,
SignalName: "test signal name",
Input: payloads.EncodeString("test input"),
RequestId: uuid.New(),
}
signalRequest := &historyservice.SignalWorkflowExecutionRequest{
NamespaceId: tests.NamespaceID.String(),
Expand All @@ -1765,13 +1759,11 @@ func (s *engineSuite) testRespondWorkflowTaskCompletedSignalGeneration(skipGener
_, err := s.historyEngine.SignalWorkflowExecution(context.Background(), signalRequest)
s.NoError(err)

if !skipGenerateTask {
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false).Return(searchattribute.TestNameTypeMap, nil)
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(tests.Namespace).Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(tests.NamespaceID).Return(tests.Namespace, nil)
s.mockVisibilityMgr.EXPECT().GetIndexName().Return(esIndexName).AnyTimes()
s.mockExecutionMgr.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{HistoryEvents: []*historypb.HistoryEvent{}}, nil)
}
s.mockSearchAttributesProvider.EXPECT().GetSearchAttributes(gomock.Any(), false).Return(searchattribute.TestNameTypeMap, nil)
s.mockSearchAttributesMapperProvider.EXPECT().GetMapper(tests.Namespace).Return(&searchattribute.TestMapper{Namespace: tests.Namespace.String()}, nil).AnyTimes()
s.mockNamespaceCache.EXPECT().GetNamespaceName(tests.NamespaceID).Return(tests.Namespace, nil)
s.mockVisibilityMgr.EXPECT().GetIndexName().Return(esIndexName).AnyTimes()
s.mockExecutionMgr.EXPECT().ReadHistoryBranch(gomock.Any(), gomock.Any()).Return(&persistence.ReadHistoryBranchResponse{HistoryEvents: []*historypb.HistoryEvent{}}, nil)

var commands []*commandpb.Command
resp, err := s.historyEngine.RespondWorkflowTaskCompleted(context.Background(), &historyservice.RespondWorkflowTaskCompletedRequest{
Expand Down
2 changes: 0 additions & 2 deletions service/history/historybuilder/event_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -778,7 +778,6 @@ func (b *EventFactory) CreateWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) *historypb.HistoryEvent {
Expand All @@ -789,7 +788,6 @@ func (b *EventFactory) CreateWorkflowExecutionSignaledEvent(
Input: input,
Identity: identity,
Header: header,
SkipGenerateWorkflowTask: skipGenerateWorkflowTask,
ExternalWorkflowExecution: externalWorkflowExecution,
},
}
Expand Down
2 changes: 0 additions & 2 deletions service/history/historybuilder/history_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,7 +668,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) *historypb.HistoryEvent {
Expand All @@ -677,7 +676,6 @@ func (b *HistoryBuilder) AddWorkflowExecutionSignaledEvent(
input,
identity,
header,
skipGenerateWorkflowTask,
externalWorkflowExecution,
links,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1459,7 +1459,6 @@ func (s *sutTestingAdapter) AddWorkflowExecutionSignaledEvent(_ ...eventConfig)
nil,
"identity-1",
nil,
false,
nil,
nil,
)
Expand Down
3 changes: 1 addition & 2 deletions service/history/historybuilder/history_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ func (s *historyBuilderSuite) TestWorkflowExecutionCancelRequested() {
func (s *historyBuilderSuite) TestWorkflowExecutionSignaled() {
signalName := "random signal name"
event := s.historyBuilder.AddWorkflowExecutionSignaledEvent(
signalName, testPayloads, testIdentity, testHeader, false, nil, nil,
signalName, testPayloads, testIdentity, testHeader, nil, nil,
)
s.Equal(event, s.flush())
s.Equal(&historypb.HistoryEvent{
Expand Down Expand Up @@ -2370,7 +2370,6 @@ func (s *historyBuilderSuite) TestBufferSize_Memory() {
&commonpb.Payloads{},
"identity",
&commonpb.Header{},
false,
nil,
nil,
)
Expand Down
16 changes: 0 additions & 16 deletions service/history/ndc/events_reapplier.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ package ndc
import (
"context"

enumspb "go.temporal.io/api/enums/v1"
historypb "go.temporal.io/api/history/v1"
"go.temporal.io/api/serviceerror"
enumsspb "go.temporal.io/server/api/enums/v1"
Expand Down Expand Up @@ -90,26 +89,11 @@ func (r *EventsReapplierImpl) ReapplyEvents(
return nil, nil
}

shouldScheduleWorkflowTask := false
for _, event := range reappliedEvents {
switch event.GetEventType() { //nolint:exhaustive
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_SIGNALED:
signal := event.GetWorkflowExecutionSignaledEventAttributes()
shouldScheduleWorkflowTask = shouldScheduleWorkflowTask || !signal.GetSkipGenerateWorkflowTask()
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED, enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ACCEPTED:
shouldScheduleWorkflowTask = true
}
}

// After reapply event, checking if we should schedule a workflow task
if ms.IsWorkflowPendingOnWorkflowTaskBackoff() {
// Do not create workflow task when the workflow has first workflow task backoff and execution is not started yet
return reappliedEvents, nil
}
if !shouldScheduleWorkflowTask {
// Do not create workflow task when all reapplied signals had SkipGenerateWorkflowTask=true flag set
return reappliedEvents, nil
}

if !ms.HasPendingWorkflowTask() {
if _, err := ms.AddWorkflowTaskScheduledEvent(
Expand Down
3 changes: 0 additions & 3 deletions service/history/ndc/events_reapplier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_AppliedEvent_Signal() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(event, nil)
msCurrent.EXPECT().HSM().Return(s.hsmNode).AnyTimes()
Expand Down Expand Up @@ -275,7 +274,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_PartialAppliedEvent() {
attr1.GetInput(),
attr1.GetIdentity(),
attr1.GetHeader(),
attr1.GetSkipGenerateWorkflowTask(),
event1.Links,
).Return(event1, nil)
msCurrent.EXPECT().IsWorkflowPendingOnWorkflowTaskBackoff().Return(true)
Expand Down Expand Up @@ -323,7 +321,6 @@ func (s *nDCEventReapplicationSuite) TestReapplyEvents_Error() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(nil, fmt.Errorf("test"))
dedupResource := definition.NewEventReappliedID(runID, event.GetEventId(), event.GetVersion())
Expand Down
1 change: 0 additions & 1 deletion service/history/ndc/workflow_resetter.go
Original file line number Diff line number Diff line change
Expand Up @@ -815,7 +815,6 @@ func reapplyEvents(
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
); err != nil {
return reappliedEvents, err
Expand Down
3 changes: 1 addition & 2 deletions service/history/ndc/workflow_resetter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -897,7 +897,6 @@ func (s *workflowResetterSuite) TestReapplyEvents() {
attr.GetInput(),
attr.GetIdentity(),
attr.GetHeader(),
attr.GetSkipGenerateWorkflowTask(),
event.Links,
).Return(&historypb.HistoryEvent{}, nil)
case enumspb.EVENT_TYPE_WORKFLOW_EXECUTION_UPDATE_ADMITTED:
Expand Down Expand Up @@ -971,7 +970,7 @@ func (s *workflowResetterSuite) TestReapplyEvents_Excludes() {
ms := workflow.NewMockMutableState(s.controller)
// Assert that none of these following methods are invoked.
arg := gomock.Any()
ms.EXPECT().AddWorkflowExecutionSignaled(arg, arg, arg, arg, arg, arg).Times(0)
ms.EXPECT().AddWorkflowExecutionSignaled(arg, arg, arg, arg, arg).Times(0)
ms.EXPECT().AddWorkflowExecutionUpdateAdmittedEvent(arg, arg).Times(0)
ms.EXPECT().AddHistoryEvent(arg, arg).Times(0)

Expand Down
2 changes: 0 additions & 2 deletions service/history/workflow/mutable_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,15 +198,13 @@ type (
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error)
AddWorkflowExecutionSignaledEvent(
signalName string,
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error)
Expand Down
4 changes: 0 additions & 4 deletions service/history/workflow/mutable_state_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -4261,15 +4261,13 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaled(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error) {
return ms.AddWorkflowExecutionSignaledEvent(
signalName,
input,
identity,
header,
skipGenerateWorkflowTask,
nil,
links,
)
Expand All @@ -4280,7 +4278,6 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaledEvent(
input *commonpb.Payloads,
identity string,
header *commonpb.Header,
skipGenerateWorkflowTask bool,
externalWorkflowExecution *commonpb.WorkflowExecution,
links []*commonpb.Link,
) (*historypb.HistoryEvent, error) {
Expand All @@ -4294,7 +4291,6 @@ func (ms *MutableStateImpl) AddWorkflowExecutionSignaledEvent(
input,
identity,
header,
skipGenerateWorkflowTask,
externalWorkflowExecution,
links,
)
Expand Down
2 changes: 0 additions & 2 deletions service/history/workflow/mutable_state_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1442,7 +1442,6 @@ func (s *mutableStateSuite) TestTotalEntitiesCount() {
&commonpb.Payloads{},
"identity",
&commonpb.Header{},
false,
nil,
)
s.NoError(err)
Expand Down Expand Up @@ -1915,7 +1914,6 @@ func (s *mutableStateSuite) TestCloseTransactionUpdateTransition() {
&commonpb.Payloads{},
"identity",
&commonpb.Header{},
false,
nil,
nil,
)
Expand Down
Loading