diff --git a/chasm/lib/activity/frontend.go b/chasm/lib/activity/frontend.go index 37f3cf3d3c..18c9fbae9c 100644 --- a/chasm/lib/activity/frontend.go +++ b/chasm/lib/activity/frontend.go @@ -19,8 +19,7 @@ type FrontendHandler interface { StartActivityExecution(ctx context.Context, req *workflowservice.StartActivityExecutionRequest) (*workflowservice.StartActivityExecutionResponse, error) CountActivityExecutions(context.Context, *workflowservice.CountActivityExecutionsRequest) (*workflowservice.CountActivityExecutionsResponse, error) DeleteActivityExecution(context.Context, *workflowservice.DeleteActivityExecutionRequest) (*workflowservice.DeleteActivityExecutionResponse, error) - DescribeActivityExecution(context.Context, *workflowservice.DescribeActivityExecutionRequest) (*workflowservice.DescribeActivityExecutionResponse, error) - GetActivityExecutionResult(context.Context, *workflowservice.GetActivityExecutionResultRequest) (*workflowservice.GetActivityExecutionResultResponse, error) + PollActivityExecution(context.Context, *workflowservice.PollActivityExecutionRequest) (*workflowservice.PollActivityExecutionResponse, error) ListActivityExecutions(context.Context, *workflowservice.ListActivityExecutionsRequest) (*workflowservice.ListActivityExecutionsResponse, error) RequestCancelActivityExecution(context.Context, *workflowservice.RequestCancelActivityExecutionRequest) (*workflowservice.RequestCancelActivityExecutionResponse, error) TerminateActivityExecution(context.Context, *workflowservice.TerminateActivityExecutionRequest) (*workflowservice.TerminateActivityExecutionResponse, error) diff --git a/client/frontend/client_gen.go b/client/frontend/client_gen.go index b4abf92410..024ee54235 100644 --- a/client/frontend/client_gen.go +++ b/client/frontend/client_gen.go @@ -119,16 +119,6 @@ func (c *clientImpl) DeprecateNamespace( return c.client.DeprecateNamespace(ctx, request, opts...) } -func (c *clientImpl) DescribeActivityExecution( - ctx context.Context, - request *workflowservice.DescribeActivityExecutionRequest, - opts ...grpc.CallOption, -) (*workflowservice.DescribeActivityExecutionResponse, error) { - ctx, cancel := c.createContext(ctx) - defer cancel() - return c.client.DescribeActivityExecution(ctx, request, opts...) -} - func (c *clientImpl) DescribeBatchOperation( ctx context.Context, request *workflowservice.DescribeBatchOperationRequest, @@ -249,16 +239,6 @@ func (c *clientImpl) FetchWorkerConfig( return c.client.FetchWorkerConfig(ctx, request, opts...) } -func (c *clientImpl) GetActivityExecutionResult( - ctx context.Context, - request *workflowservice.GetActivityExecutionResultRequest, - opts ...grpc.CallOption, -) (*workflowservice.GetActivityExecutionResultResponse, error) { - ctx, cancel := c.createContext(ctx) - defer cancel() - return c.client.GetActivityExecutionResult(ctx, request, opts...) -} - func (c *clientImpl) GetClusterInfo( ctx context.Context, request *workflowservice.GetClusterInfoRequest, @@ -529,6 +509,16 @@ func (c *clientImpl) PauseActivityExecution( return c.client.PauseActivityExecution(ctx, request, opts...) } +func (c *clientImpl) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollActivityExecutionResponse, error) { + ctx, cancel := c.createContext(ctx) + defer cancel() + return c.client.PollActivityExecution(ctx, request, opts...) +} + func (c *clientImpl) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, diff --git a/client/frontend/metric_client_gen.go b/client/frontend/metric_client_gen.go index 62d4751324..e50cd12c72 100644 --- a/client/frontend/metric_client_gen.go +++ b/client/frontend/metric_client_gen.go @@ -163,20 +163,6 @@ func (c *metricClient) DeprecateNamespace( return c.client.DeprecateNamespace(ctx, request, opts...) } -func (c *metricClient) DescribeActivityExecution( - ctx context.Context, - request *workflowservice.DescribeActivityExecutionRequest, - opts ...grpc.CallOption, -) (_ *workflowservice.DescribeActivityExecutionResponse, retError error) { - - metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientDescribeActivityExecution") - defer func() { - c.finishMetricsRecording(metricsHandler, startTime, retError) - }() - - return c.client.DescribeActivityExecution(ctx, request, opts...) -} - func (c *metricClient) DescribeBatchOperation( ctx context.Context, request *workflowservice.DescribeBatchOperationRequest, @@ -345,20 +331,6 @@ func (c *metricClient) FetchWorkerConfig( return c.client.FetchWorkerConfig(ctx, request, opts...) } -func (c *metricClient) GetActivityExecutionResult( - ctx context.Context, - request *workflowservice.GetActivityExecutionResultRequest, - opts ...grpc.CallOption, -) (_ *workflowservice.GetActivityExecutionResultResponse, retError error) { - - metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientGetActivityExecutionResult") - defer func() { - c.finishMetricsRecording(metricsHandler, startTime, retError) - }() - - return c.client.GetActivityExecutionResult(ctx, request, opts...) -} - func (c *metricClient) GetClusterInfo( ctx context.Context, request *workflowservice.GetClusterInfoRequest, @@ -737,6 +709,20 @@ func (c *metricClient) PauseActivityExecution( return c.client.PauseActivityExecution(ctx, request, opts...) } +func (c *metricClient) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (_ *workflowservice.PollActivityExecutionResponse, retError error) { + + metricsHandler, startTime := c.startMetricsRecording(ctx, "FrontendClientPollActivityExecution") + defer func() { + c.finishMetricsRecording(metricsHandler, startTime, retError) + }() + + return c.client.PollActivityExecution(ctx, request, opts...) +} + func (c *metricClient) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, diff --git a/client/frontend/retryable_client_gen.go b/client/frontend/retryable_client_gen.go index f2d6301b8a..55b241d077 100644 --- a/client/frontend/retryable_client_gen.go +++ b/client/frontend/retryable_client_gen.go @@ -176,21 +176,6 @@ func (c *retryableClient) DeprecateNamespace( return resp, err } -func (c *retryableClient) DescribeActivityExecution( - ctx context.Context, - request *workflowservice.DescribeActivityExecutionRequest, - opts ...grpc.CallOption, -) (*workflowservice.DescribeActivityExecutionResponse, error) { - var resp *workflowservice.DescribeActivityExecutionResponse - op := func(ctx context.Context) error { - var err error - resp, err = c.client.DescribeActivityExecution(ctx, request, opts...) - return err - } - err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) - return resp, err -} - func (c *retryableClient) DescribeBatchOperation( ctx context.Context, request *workflowservice.DescribeBatchOperationRequest, @@ -371,21 +356,6 @@ func (c *retryableClient) FetchWorkerConfig( return resp, err } -func (c *retryableClient) GetActivityExecutionResult( - ctx context.Context, - request *workflowservice.GetActivityExecutionResultRequest, - opts ...grpc.CallOption, -) (*workflowservice.GetActivityExecutionResultResponse, error) { - var resp *workflowservice.GetActivityExecutionResultResponse - op := func(ctx context.Context) error { - var err error - resp, err = c.client.GetActivityExecutionResult(ctx, request, opts...) - return err - } - err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) - return resp, err -} - func (c *retryableClient) GetClusterInfo( ctx context.Context, request *workflowservice.GetClusterInfoRequest, @@ -791,6 +761,21 @@ func (c *retryableClient) PauseActivityExecution( return resp, err } +func (c *retryableClient) PollActivityExecution( + ctx context.Context, + request *workflowservice.PollActivityExecutionRequest, + opts ...grpc.CallOption, +) (*workflowservice.PollActivityExecutionResponse, error) { + var resp *workflowservice.PollActivityExecutionResponse + op := func(ctx context.Context) error { + var err error + resp, err = c.client.PollActivityExecution(ctx, request, opts...) + return err + } + err := backoff.ThrottleRetryContext(ctx, op, c.policy, c.isRetryable) + return resp, err +} + func (c *retryableClient) PollActivityTaskQueue( ctx context.Context, request *workflowservice.PollActivityTaskQueueRequest, diff --git a/cmd/tools/getproto/files.go b/cmd/tools/getproto/files.go index f22f906872..5a269db008 100644 --- a/cmd/tools/getproto/files.go +++ b/cmd/tools/getproto/files.go @@ -1,4 +1,3 @@ - // Code generated by getproto. DO NOT EDIT. // If you get build errors in this file, just delete it. It will be regenerated. @@ -58,7 +57,6 @@ func init() { importMap["temporal/api/enums/v1/deployment.proto"] = enums.File_temporal_api_enums_v1_deployment_proto importMap["temporal/api/enums/v1/event_type.proto"] = enums.File_temporal_api_enums_v1_event_type_proto importMap["temporal/api/enums/v1/failed_cause.proto"] = enums.File_temporal_api_enums_v1_failed_cause_proto - importMap["temporal/api/enums/v1/id.proto"] = enums.File_temporal_api_enums_v1_id_proto importMap["temporal/api/enums/v1/namespace.proto"] = enums.File_temporal_api_enums_v1_namespace_proto importMap["temporal/api/enums/v1/nexus.proto"] = enums.File_temporal_api_enums_v1_nexus_proto importMap["temporal/api/enums/v1/query.proto"] = enums.File_temporal_api_enums_v1_query_proto diff --git a/common/api/metadata.go b/common/api/metadata.go index c136ca8328..38987539d9 100644 --- a/common/api/metadata.go +++ b/common/api/metadata.go @@ -90,8 +90,7 @@ var ( "RespondActivityTaskCanceledById": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "CountActivityExecutions": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "DeleteActivityExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, - "DescribeActivityExecution": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, - "GetActivityExecutionResult": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, + "PollActivityExecution": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingCapable}, "ListActivityExecutions": {Scope: ScopeNamespace, Access: AccessReadOnly, Polling: PollingNone}, "PauseActivityExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, "RequestCancelActivityExecution": {Scope: ScopeNamespace, Access: AccessWrite, Polling: PollingNone}, diff --git a/common/rpc/interceptor/logtags/workflow_service_server_gen.go b/common/rpc/interceptor/logtags/workflow_service_server_gen.go index 9277fb8a3f..02a2047b6f 100644 --- a/common/rpc/interceptor/logtags/workflow_service_server_gen.go +++ b/common/rpc/interceptor/logtags/workflow_service_server_gen.go @@ -58,12 +58,6 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.DeprecateNamespaceResponse: return nil - case *workflowservice.DescribeActivityExecutionRequest: - return []tag.Tag{ - tag.WorkflowRunID(r.GetRunId()), - } - case *workflowservice.DescribeActivityExecutionResponse: - return nil case *workflowservice.DescribeBatchOperationRequest: return nil case *workflowservice.DescribeBatchOperationResponse: @@ -115,14 +109,6 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t return nil case *workflowservice.FetchWorkerConfigResponse: return nil - case *workflowservice.GetActivityExecutionResultRequest: - return []tag.Tag{ - tag.WorkflowRunID(r.GetRunId()), - } - case *workflowservice.GetActivityExecutionResultResponse: - return []tag.Tag{ - tag.WorkflowRunID(r.GetRunId()), - } case *workflowservice.GetClusterInfoRequest: return nil case *workflowservice.GetClusterInfoResponse: @@ -243,6 +229,14 @@ func (wt *WorkflowTags) extractFromWorkflowServiceServerMessage(message any) []t } case *workflowservice.PauseActivityExecutionResponse: return nil + case *workflowservice.PollActivityExecutionRequest: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } + case *workflowservice.PollActivityExecutionResponse: + return []tag.Tag{ + tag.WorkflowRunID(r.GetRunId()), + } case *workflowservice.PollActivityTaskQueueRequest: return nil case *workflowservice.PollActivityTaskQueueResponse: diff --git a/common/rpc/interceptor/redirection.go b/common/rpc/interceptor/redirection.go index 5962468d1d..86860007a7 100644 --- a/common/rpc/interceptor/redirection.go +++ b/common/rpc/interceptor/redirection.go @@ -139,8 +139,7 @@ var ( "StartActivityExecution": func() any { return &workflowservice.StartActivityExecutionResponse{} }, "CountActivityExecutions": func() any { return &workflowservice.CountActivityExecutionsResponse{} }, "ListActivityExecutions": func() any { return &workflowservice.ListActivityExecutionsResponse{} }, - "DescribeActivityExecution": func() any { return &workflowservice.DescribeActivityExecutionResponse{} }, - "GetActivityExecutionResult": func() any { return &workflowservice.GetActivityExecutionResultResponse{} }, + "PollActivityExecution": func() any { return &workflowservice.PollActivityExecutionResponse{} }, "RequestCancelActivityExecution": func() any { return &workflowservice.RequestCancelActivityExecutionResponse{} }, "TerminateActivityExecution": func() any { return &workflowservice.TerminateActivityExecutionResponse{} }, "DeleteActivityExecution": func() any { return &workflowservice.DeleteActivityExecutionResponse{} }, diff --git a/common/rpc/interceptor/redirection_test.go b/common/rpc/interceptor/redirection_test.go index bc229eddd5..cc4396b12b 100644 --- a/common/rpc/interceptor/redirection_test.go +++ b/common/rpc/interceptor/redirection_test.go @@ -194,8 +194,7 @@ func (s *redirectionInterceptorSuite) TestGlobalAPI() { "StartActivityExecution": {}, "CountActivityExecutions": {}, "ListActivityExecutions": {}, - "DescribeActivityExecution": {}, - "GetActivityExecutionResult": {}, + "PollActivityExecution": {}, "RequestCancelActivityExecution": {}, "TerminateActivityExecution": {}, "DeleteActivityExecution": {}, diff --git a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go index 92e917e3d6..b1d175ad02 100644 --- a/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go +++ b/common/testing/mockapi/workflowservicemock/v1/service_grpc.pb.mock.go @@ -262,26 +262,6 @@ func (mr *MockWorkflowServiceClientMockRecorder) DeprecateNamespace(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeprecateNamespace", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DeprecateNamespace), varargs...) } -// DescribeActivityExecution mocks base method. -func (m *MockWorkflowServiceClient) DescribeActivityExecution(ctx context.Context, in *workflowservice.DescribeActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.DescribeActivityExecutionResponse, error) { - m.ctrl.T.Helper() - varargs := []any{ctx, in} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "DescribeActivityExecution", varargs...) - ret0, _ := ret[0].(*workflowservice.DescribeActivityExecutionResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// DescribeActivityExecution indicates an expected call of DescribeActivityExecution. -func (mr *MockWorkflowServiceClientMockRecorder) DescribeActivityExecution(ctx, in any, opts ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{ctx, in}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DescribeActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).DescribeActivityExecution), varargs...) -} - // DescribeBatchOperation mocks base method. func (m *MockWorkflowServiceClient) DescribeBatchOperation(ctx context.Context, in *workflowservice.DescribeBatchOperationRequest, opts ...grpc.CallOption) (*workflowservice.DescribeBatchOperationResponse, error) { m.ctrl.T.Helper() @@ -522,26 +502,6 @@ func (mr *MockWorkflowServiceClientMockRecorder) FetchWorkerConfig(ctx, in any, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchWorkerConfig", reflect.TypeOf((*MockWorkflowServiceClient)(nil).FetchWorkerConfig), varargs...) } -// GetActivityExecutionResult mocks base method. -func (m *MockWorkflowServiceClient) GetActivityExecutionResult(ctx context.Context, in *workflowservice.GetActivityExecutionResultRequest, opts ...grpc.CallOption) (*workflowservice.GetActivityExecutionResultResponse, error) { - m.ctrl.T.Helper() - varargs := []any{ctx, in} - for _, a := range opts { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "GetActivityExecutionResult", varargs...) - ret0, _ := ret[0].(*workflowservice.GetActivityExecutionResultResponse) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetActivityExecutionResult indicates an expected call of GetActivityExecutionResult. -func (mr *MockWorkflowServiceClientMockRecorder) GetActivityExecutionResult(ctx, in any, opts ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{ctx, in}, opts...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetActivityExecutionResult", reflect.TypeOf((*MockWorkflowServiceClient)(nil).GetActivityExecutionResult), varargs...) -} - // GetClusterInfo mocks base method. func (m *MockWorkflowServiceClient) GetClusterInfo(ctx context.Context, in *workflowservice.GetClusterInfoRequest, opts ...grpc.CallOption) (*workflowservice.GetClusterInfoResponse, error) { m.ctrl.T.Helper() @@ -1082,6 +1042,26 @@ func (mr *MockWorkflowServiceClientMockRecorder) PauseActivityExecution(ctx, in return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PauseActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PauseActivityExecution), varargs...) } +// PollActivityExecution mocks base method. +func (m *MockWorkflowServiceClient) PollActivityExecution(ctx context.Context, in *workflowservice.PollActivityExecutionRequest, opts ...grpc.CallOption) (*workflowservice.PollActivityExecutionResponse, error) { + m.ctrl.T.Helper() + varargs := []any{ctx, in} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "PollActivityExecution", varargs...) + ret0, _ := ret[0].(*workflowservice.PollActivityExecutionResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// PollActivityExecution indicates an expected call of PollActivityExecution. +func (mr *MockWorkflowServiceClientMockRecorder) PollActivityExecution(ctx, in any, opts ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{ctx, in}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "PollActivityExecution", reflect.TypeOf((*MockWorkflowServiceClient)(nil).PollActivityExecution), varargs...) +} + // PollActivityTaskQueue mocks base method. func (m *MockWorkflowServiceClient) PollActivityTaskQueue(ctx context.Context, in *workflowservice.PollActivityTaskQueueRequest, opts ...grpc.CallOption) (*workflowservice.PollActivityTaskQueueResponse, error) { m.ctrl.T.Helper() diff --git a/go.mod b/go.mod index 7e65e125f7..191d92f8b1 100644 --- a/go.mod +++ b/go.mod @@ -60,7 +60,7 @@ require ( go.opentelemetry.io/otel/sdk v1.34.0 go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 - go.temporal.io/api v1.53.1-0.20251013183714-d8d86d018c50 + go.temporal.io/api v1.53.1-0.20251030194917-59ee62fd34be go.temporal.io/sdk v1.35.0 go.uber.org/fx v1.23.0 go.uber.org/mock v0.6.0 diff --git a/go.sum b/go.sum index 2e6a36e7d0..b512f9ddee 100644 --- a/go.sum +++ b/go.sum @@ -397,6 +397,10 @@ go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= go.temporal.io/api v1.53.1-0.20251013183714-d8d86d018c50 h1:oHDlsZ9dXFWFgYAnxB3kKj/n3uk1aM8GYUDhZv5gTXk= go.temporal.io/api v1.53.1-0.20251013183714-d8d86d018c50/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.53.1-0.20251025101057-28854633c4af h1:Gx/YInPOk+EiikphRsDLQzdBlgJ8OwYlyEptXTI9fTA= +go.temporal.io/api v1.53.1-0.20251025101057-28854633c4af/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= +go.temporal.io/api v1.53.1-0.20251030194917-59ee62fd34be h1:54bFSQitEpvQ1qj4qkiYLYTtZQd2AYUw29d7FC+VkzI= +go.temporal.io/api v1.53.1-0.20251030194917-59ee62fd34be/go.mod h1:iaxoP/9OXMJcQkETTECfwYq4cw/bj4nwov8b3ZLVnXM= go.temporal.io/sdk v1.35.0 h1:lRNAQ5As9rLgYa7HBvnmKyzxLcdElTuoFJ0FXM/AsLQ= go.temporal.io/sdk v1.35.0/go.mod h1:1q5MuLc2MEJ4lneZTHJzpVebW2oZnyxoIOWX3oFVebw= go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= diff --git a/service/frontend/configs/quotas.go b/service/frontend/configs/quotas.go index 1885c0ac90..b0c231aa53 100644 --- a/service/frontend/configs/quotas.go +++ b/service/frontend/configs/quotas.go @@ -27,23 +27,28 @@ const ( ) var ( - // ExecutionAPICountLimitOverride determines how many tokens each of these API calls consumes from their - // corresponding quota, which is determined by dynamicconfig.FrontendMaxConcurrentLongRunningRequestsPerInstance. If - // the value is not set, then the method is not considered a long-running request and the number of concurrent - // requests will not be throttled. The Poll* methods here are long-running because they block until there is a task - // available. The GetWorkflowExecutionHistory method is blocking only if WaitNewEvent is true, otherwise it is not - // long-running. The QueryWorkflow and UpdateWorkflowExecution methods are long-running because they both block - // until a background WFT is complete. + // ExecutionAPICountLimitOverride determines how many tokens each of these API calls consumes + // from their corresponding quota, which is determined by + // dynamicconfig.FrontendMaxConcurrentLongRunningRequestsPerInstance. If the value is not set, + // then the method is not considered a long-running request and the number of concurrent + // requests will not be throttled. PollActivityTaskQueue and PollWorkflowTaskQueue are + // long-running because they block until there is a task available. PollWorkflowExecutionUpdate + // is long-running because it waits for an update lifecycle stage. GetWorkflowExecutionHistory + // is long-running when the request is configured to perform a long-poll (WaitNewEvent is true). + // PollActivityExecution is long-running when the request is configured to perform a long-poll + // (wait_policy present). The QueryWorkflow and UpdateWorkflowExecution methods are long-running + // because they both block until a background WFT is complete. ExecutionAPICountLimitOverride = map[string]int{ "/temporal.api.workflowservice.v1.WorkflowService/PollActivityTaskQueue": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowTaskQueue": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowExecutionUpdate": 1, + // TODO: categorize appropriately as long-poll if wait_policy is present, similar to what is + // done with PollWorkflowHistoryAPIName below. + "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution": 1, "/temporal.api.workflowservice.v1.WorkflowService/QueryWorkflow": 1, "/temporal.api.workflowservice.v1.WorkflowService/UpdateWorkflowExecution": 1, "/temporal.api.workflowservice.v1.WorkflowService/GetWorkflowExecutionHistory": 1, "/temporal.api.workflowservice.v1.WorkflowService/PollNexusTaskQueue": 1, - // TODO: Map to PollActivityResult if request is long-polling - "/temporal.api.workflowservice.v1.WorkflowService/GetActivityExecutionResult": 1, // potentially long-running, depending on the operations "/temporal.api.workflowservice.v1.WorkflowService/ExecuteMultiOperation": 1, @@ -124,7 +129,6 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/RequestCancelActivityExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/TerminateActivityExecution": 2, "/temporal.api.workflowservice.v1.WorkflowService/DeleteActivityExecution": 2, - "/temporal.api.workflowservice.v1.WorkflowService/GetActivityExecutionResult": 2, // P3: Status Querying APIs "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkflowExecution": 3, @@ -141,7 +145,6 @@ var ( "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeploymentVersion": 3, "/temporal.api.workflowservice.v1.WorkflowService/DescribeWorkerDeployment": 3, "/temporal.api.workflowservice.v1.WorkflowService/ListWorkerDeployments": 3, - "/temporal.api.workflowservice.v1.WorkflowService/DescribeActivityExecution": 3, // P3: Progress APIs for reporting cancellations and failures. // They are relatively low priority as the tasks need to be retried anyway. @@ -155,6 +158,7 @@ var ( // P4: Poll APIs and other low priority APIs "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowTaskQueue": 4, "/temporal.api.workflowservice.v1.WorkflowService/PollActivityTaskQueue": 4, + "/temporal.api.workflowservice.v1.WorkflowService/PollActivityExecution": 4, "/temporal.api.workflowservice.v1.WorkflowService/PollWorkflowExecutionUpdate": 4, "/temporal.api.workflowservice.v1.WorkflowService/PollNexusTaskQueue": 4, "/temporal.api.workflowservice.v1.WorkflowService/ResetStickyTaskQueue": 4, diff --git a/service/frontend/workflow_handler.go b/service/frontend/workflow_handler.go index dddb90d790..1e1d019bf6 100644 --- a/service/frontend/workflow_handler.go +++ b/service/frontend/workflow_handler.go @@ -891,29 +891,6 @@ func (wh *WorkflowHandler) PollWorkflowTaskQueue(ctx context.Context, request *w childCtx := wh.registerOutstandingPollContext(ctx, pollerID, namespaceID.String()) defer wh.unregisterOutstandingPollContext(pollerID, namespaceID.String()) - if request.WorkerHeartbeat != nil { - heartbeats := []*workerpb.WorkerHeartbeat{request.WorkerHeartbeat} - request.WorkerHeartbeat = nil // clear the heartbeat from the request to avoid sending it to matching service - - // route heartbeat to the matching service only if the request is valid (all validation checks passed) - go func() { - _, err := wh.matchingClient.RecordWorkerHeartbeat(ctx, &matchingservice.RecordWorkerHeartbeatRequest{ - NamespaceId: namespaceID.String(), - HeartbeartRequest: &workflowservice.RecordWorkerHeartbeatRequest{ - Namespace: request.Namespace, - Identity: request.Identity, - WorkerHeartbeat: heartbeats, - }, - }) - - if err != nil { - wh.logger.Error("Failed to record worker heartbeat.", - tag.WorkflowTaskQueueName(request.TaskQueue.GetName()), - tag.Error(err)) - } - }() - } - matchingResp, err := wh.matchingClient.PollWorkflowTaskQueue(childCtx, &matchingservice.PollWorkflowTaskQueueRequest{ NamespaceId: namespaceID.String(), PollerId: pollerID,