Skip to content

Commit 910e707

Browse files
authored
Add option to dispatch activity tasks locally (#1029)
1 parent 2e11d9b commit 910e707

8 files changed

+553
-82
lines changed

internal/common/metrics/constants.go

Lines changed: 31 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,32 +51,37 @@ const (
5151
DecisionTaskCompletedCounter = CadenceMetricsPrefix + "decision-task-completed"
5252
DecisionTaskForceCompleted = CadenceMetricsPrefix + "decision-task-force-completed"
5353

54-
ActivityPollCounter = CadenceMetricsPrefix + "activity-poll-total"
55-
ActivityPollFailedCounter = CadenceMetricsPrefix + "activity-poll-failed"
56-
ActivityPollTransientFailedCounter = CadenceMetricsPrefix + "activity-poll-transient-failed"
57-
ActivityPollNoTaskCounter = CadenceMetricsPrefix + "activity-poll-no-task"
58-
ActivityPollSucceedCounter = CadenceMetricsPrefix + "activity-poll-succeed"
59-
ActivityPollLatency = CadenceMetricsPrefix + "activity-poll-latency"
60-
ActivityScheduledToStartLatency = CadenceMetricsPrefix + "activity-scheduled-to-start-latency"
61-
ActivityExecutionFailedCounter = CadenceMetricsPrefix + "activity-execution-failed"
62-
ActivityExecutionLatency = CadenceMetricsPrefix + "activity-execution-latency"
63-
ActivityResponseLatency = CadenceMetricsPrefix + "activity-response-latency"
64-
ActivityResponseFailedCounter = CadenceMetricsPrefix + "activity-response-failed"
65-
ActivityEndToEndLatency = CadenceMetricsPrefix + "activity-endtoend-latency"
66-
ActivityTaskPanicCounter = CadenceMetricsPrefix + "activity-task-panic"
67-
ActivityTaskCompletedCounter = CadenceMetricsPrefix + "activity-task-completed"
68-
ActivityTaskFailedCounter = CadenceMetricsPrefix + "activity-task-failed"
69-
ActivityTaskCanceledCounter = CadenceMetricsPrefix + "activity-task-canceled"
70-
ActivityTaskCompletedByIDCounter = CadenceMetricsPrefix + "activity-task-completed-by-id"
71-
ActivityTaskFailedByIDCounter = CadenceMetricsPrefix + "activity-task-failed-by-id"
72-
ActivityTaskCanceledByIDCounter = CadenceMetricsPrefix + "activity-task-canceled-by-id"
73-
LocalActivityTotalCounter = CadenceMetricsPrefix + "local-activity-total"
74-
LocalActivityTimeoutCounter = CadenceMetricsPrefix + "local-activity-timeout"
75-
LocalActivityCanceledCounter = CadenceMetricsPrefix + "local-activity-canceled"
76-
LocalActivityFailedCounter = CadenceMetricsPrefix + "local-activity-failed"
77-
LocalActivityPanicCounter = CadenceMetricsPrefix + "local-activity-panic"
78-
LocalActivityExecutionLatency = CadenceMetricsPrefix + "local-activity-execution-latency"
79-
WorkerPanicCounter = CadenceMetricsPrefix + "worker-panic"
54+
ActivityPollCounter = CadenceMetricsPrefix + "activity-poll-total"
55+
ActivityPollFailedCounter = CadenceMetricsPrefix + "activity-poll-failed"
56+
ActivityPollTransientFailedCounter = CadenceMetricsPrefix + "activity-poll-transient-failed"
57+
ActivityPollNoTaskCounter = CadenceMetricsPrefix + "activity-poll-no-task"
58+
ActivityPollSucceedCounter = CadenceMetricsPrefix + "activity-poll-succeed"
59+
ActivityPollLatency = CadenceMetricsPrefix + "activity-poll-latency"
60+
ActivityScheduledToStartLatency = CadenceMetricsPrefix + "activity-scheduled-to-start-latency"
61+
ActivityExecutionFailedCounter = CadenceMetricsPrefix + "activity-execution-failed"
62+
ActivityExecutionLatency = CadenceMetricsPrefix + "activity-execution-latency"
63+
ActivityResponseLatency = CadenceMetricsPrefix + "activity-response-latency"
64+
ActivityResponseFailedCounter = CadenceMetricsPrefix + "activity-response-failed"
65+
ActivityEndToEndLatency = CadenceMetricsPrefix + "activity-endtoend-latency"
66+
ActivityTaskPanicCounter = CadenceMetricsPrefix + "activity-task-panic"
67+
ActivityTaskCompletedCounter = CadenceMetricsPrefix + "activity-task-completed"
68+
ActivityTaskFailedCounter = CadenceMetricsPrefix + "activity-task-failed"
69+
ActivityTaskCanceledCounter = CadenceMetricsPrefix + "activity-task-canceled"
70+
ActivityTaskCompletedByIDCounter = CadenceMetricsPrefix + "activity-task-completed-by-id"
71+
ActivityTaskFailedByIDCounter = CadenceMetricsPrefix + "activity-task-failed-by-id"
72+
ActivityTaskCanceledByIDCounter = CadenceMetricsPrefix + "activity-task-canceled-by-id"
73+
LocalActivityTotalCounter = CadenceMetricsPrefix + "local-activity-total"
74+
LocalActivityTimeoutCounter = CadenceMetricsPrefix + "local-activity-timeout"
75+
LocalActivityCanceledCounter = CadenceMetricsPrefix + "local-activity-canceled"
76+
LocalActivityFailedCounter = CadenceMetricsPrefix + "local-activity-failed"
77+
LocalActivityPanicCounter = CadenceMetricsPrefix + "local-activity-panic"
78+
LocalActivityExecutionLatency = CadenceMetricsPrefix + "local-activity-execution-latency"
79+
LocallyDispatchedActivityPollCounter = CadenceMetricsPrefix + "locally-dispatched-activity-poll-total"
80+
LocallyDispatchedActivityPollNoTaskCounter = CadenceMetricsPrefix + "locally-dispatched-activity-poll-no-task"
81+
LocallyDispatchedActivityPollSucceedCounter = CadenceMetricsPrefix + "locally-dispatched-activity-poll-succeed"
82+
ActivityLocalDispatchFailedCounter = CadenceMetricsPrefix + "activity-local-dispatch-failed"
83+
ActivityLocalDispatchSucceedCounter = CadenceMetricsPrefix + "activity-local-dispatch-succeed"
84+
WorkerPanicCounter = CadenceMetricsPrefix + "worker-panic"
8085

8186
UnhandledSignalsCounter = CadenceMetricsPrefix + "unhandled-signals"
8287
CorruptedSignalsCounter = CadenceMetricsPrefix + "corrupted-signals"

internal/internal_event_handlers.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,25 @@ type (
135135
header *shared.Header
136136
}
137137

138+
locallyDispatchedActivityTask struct {
139+
// used to notify the poller the response from server is completed and the task is ready
140+
readyCh chan bool
141+
TaskToken []byte
142+
WorkflowExecution *shared.WorkflowExecution
143+
ActivityId *string
144+
ActivityType *shared.ActivityType
145+
Input []byte
146+
ScheduledTimestamp *int64
147+
ScheduleToCloseTimeoutSeconds *int32
148+
StartedTimestamp *int64
149+
StartToCloseTimeoutSeconds *int32
150+
HeartbeatTimeoutSeconds *int32
151+
ScheduledTimestampOfThisAttempt *int64
152+
WorkflowType *shared.WorkflowType
153+
WorkflowDomain *string
154+
Header *shared.Header
155+
}
156+
138157
localActivityMarkerData struct {
139158
ActivityID string `json:"activityId,omitempty"`
140159
ActivityType string `json:"activityType,omitempty"`

internal/internal_pressure_points.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func newWorkflowWorkerWithPressurePoints(
7070
params,
7171
&pressurePointMgrImpl{config: pressurePoints, logger: params.Logger},
7272
registry,
73+
nil,
7374
)
7475
}
7576

internal/internal_task_handlers_test.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,7 @@ func (t *TaskHandlersTestSuite) TestCacheEvictionWhenErrorOccurs() {
555555
task := createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
556556
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
557557
// will fail as it can't find laTunnel in getWorkflowCache().
558-
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}))
558+
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}), nil)
559559
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
560560

561561
t.Error(err)
@@ -588,7 +588,7 @@ func (t *TaskHandlersTestSuite) TestWithMissingHistoryEvents() {
588588
task := createWorkflowTask(testEvents, startEventID, "HelloWorld_Workflow")
589589
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
590590
// will fail as it can't find laTunnel in getWorkflowCache().
591-
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}))
591+
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}), nil)
592592
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
593593

594594
t.Error(err)
@@ -640,7 +640,7 @@ func (t *TaskHandlersTestSuite) TestWithTruncatedHistory() {
640640
task.StartedEventId = common.Int64Ptr(tc.startedEventID)
641641
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
642642
// will fail as it can't find laTunnel in getWorkflowCache().
643-
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}))
643+
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}), nil)
644644
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
645645

646646
if tc.isResultErr {
@@ -754,7 +754,7 @@ func (t *TaskHandlersTestSuite) TestWorkflowTask_NondeterministicDetection() {
754754
task = createWorkflowTask(testEvents, 3, "HelloWorld_Workflow")
755755
// newWorkflowTaskWorkerInternal will set the laTunnel in taskHandler, without it, ProcessWorkflowTask()
756756
// will fail as it can't find laTunnel in getWorkflowCache().
757-
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, stopC)
757+
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, stopC, nil)
758758
request, err = taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
759759
t.Error(err)
760760
t.Nil(request)
@@ -927,7 +927,7 @@ func (t *TaskHandlersTestSuite) TestConsistentQuery_InvalidQueryTask() {
927927
task := createWorkflowTask(nil, 3, "HelloWorld_Workflow")
928928
task.Query = &s.WorkflowQuery{}
929929
task.Queries = map[string]*s.WorkflowQuery{"query_id": {}}
930-
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}))
930+
newWorkflowTaskWorkerInternal(taskHandler, t.service, testDomain, params, make(chan struct{}), nil)
931931
// query and queries are both specified so this is an invalid task
932932
request, err := taskHandler.ProcessWorkflowTask(&workflowTask{task: task}, nil)
933933

0 commit comments

Comments
 (0)