Skip to content

Commit 30b305c

Browse files
committed
Address comments, parse Archetype from TemporalNamespaceDivision
1 parent b2f7454 commit 30b305c

File tree

17 files changed

+312
-260
lines changed

17 files changed

+312
-260
lines changed

api/adminservice/v1/request_response.go-helpers.pb.go

Lines changed: 10 additions & 10 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/adminservice/v1/request_response.pb.go

Lines changed: 82 additions & 91 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

api/batch/v1/request_response.pb.go

Lines changed: 4 additions & 3 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/dynamicconfig/constants.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,11 @@ so forwarding by endpoint ID will not work out of the box.`,
991991
true,
992992
`FrontendEnableBatcher enables batcher-related RPCs in the frontend`,
993993
)
994+
FrontendMaxConcurrentAdminBatchOperationPerNamespace = NewNamespaceIntSetting(
995+
"frontend.MaxConcurrentAdminBatchOperationPerNamespace",
996+
1,
997+
`FrontendMaxConcurrentAdminBatchOperationPerNamespace is the max concurrent admin batch operation job count per namespace`,
998+
)
994999

9951000
FrontendEnableUpdateWorkflowExecution = NewNamespaceBoolSetting(
9961001
"frontend.enableUpdateWorkflowExecution",

common/resource/fx.go

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,6 @@ type (
6161
MatchingRawClient matchingservice.MatchingServiceClient
6262
MatchingClient matchingservice.MatchingServiceClient
6363

64-
AdminRawClient adminservice.AdminServiceClient
65-
AdminClient adminservice.AdminServiceClient
66-
6764
RuntimeMetricsReporterParams struct {
6865
fx.In
6966

@@ -95,7 +92,6 @@ var Module = fx.Options(
9592
fx.Provide(ClientFactoryProvider),
9693
fx.Provide(ClientBeanProvider),
9794
fx.Provide(FrontendClientProvider),
98-
fx.Provide(AdminRawClientProvider),
9995
fx.Provide(AdminClientProvider),
10096
fx.Provide(GrpcListenerProvider),
10197
fx.Provide(RuntimeMetricsReporterProvider),
@@ -253,16 +249,16 @@ func FrontendClientProvider(clientBean client.Bean) workflowservice.WorkflowServ
253249
)
254250
}
255251

256-
func AdminRawClientProvider(clientBean client.Bean, clusterMetadata cluster.Metadata) (AdminRawClient, error) {
257-
return clientBean.GetRemoteAdminClient(clusterMetadata.GetCurrentClusterName())
258-
}
259-
260-
func AdminClientProvider(adminRawClient AdminRawClient) AdminClient {
252+
func AdminClientProvider(clientBean client.Bean, clusterMetadata cluster.Metadata) (adminservice.AdminServiceClient, error) {
253+
adminRawClient, err := clientBean.GetRemoteAdminClient(clusterMetadata.GetCurrentClusterName())
254+
if err != nil {
255+
return nil, err
256+
}
261257
return admin.NewRetryableClient(
262258
adminRawClient,
263259
common.CreateFrontendClientRetryPolicy(),
264260
common.IsServiceClientTransientError,
265-
)
261+
), nil
266262
}
267263

268264
func RuntimeMetricsReporterProvider(

proto/internal/temporal/server/api/adminservice/v1/request_response.proto

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -606,6 +606,7 @@ message ForceUnloadTaskQueuePartitionResponse {
606606
}
607607

608608
// StartAdminBatchOperationRequest starts an admin batch operation.
609+
// WARNING: Batch Operations are exposed to all users of the namespace. Admin Batch Operations should be exercised with caution.
609610
message StartAdminBatchOperationRequest {
610611
// Namespace that contains the batch operation.
611612
string namespace = 1;
@@ -626,19 +627,16 @@ message StartAdminBatchOperationRequest {
626627

627628
// The admin batch operation to perform.
628629
oneof operation {
629-
BatchOperationRefreshWorkflowTasks refresh_workflow_tasks_operation = 10;
630+
BatchOperationRefreshTasks refresh_tasks_operation = 10;
630631
}
631632
}
632633

633634
message StartAdminBatchOperationResponse {
634635
}
635636

636-
// BatchOperationRefreshWorkflowTasks refreshes workflow tasks for batch workflows.
637-
// This regenerates all pending tasks for each workflow.
638-
message BatchOperationRefreshWorkflowTasks {
637+
// BatchOperationRefreshTasks refreshes tasks for batch executions.
638+
// This regenerates all pending tasks for each execution.
639+
message BatchOperationRefreshTasks {
639640
// The identity of the worker/client.
640641
string identity = 1;
641-
642-
// Archetype to use for the refresh operation. If not specified, defaults to workflow archetype.
643-
string archetype = 2;
644642
}

proto/internal/temporal/server/api/batch/v1/request_response.proto

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,12 @@ message BatchOperationInput {
1919

2020
repeated string non_retryable_errors = 5;
2121

22+
// Only needed if StartBatchOperationRequest request is set.
2223
temporal.api.enums.v1.BatchOperationType batch_type = 6;
2324

24-
// The request to start the batch operation.
25+
// The request to start the batch operation. Mutually exclusive with StartAdminBatchOperationRequest admin_request.
2526
temporal.api.workflowservice.v1.StartBatchOperationRequest request = 7;
2627

27-
// The request to start an admin batch operation.
28+
// The request to start an admin batch operation. Mutually exclusive with StartBatchOperationRequest request.
2829
temporal.server.api.adminservice.v1.StartAdminBatchOperationRequest admin_request = 8;
2930
}

service/frontend/admin_handler.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1598,18 +1598,18 @@ func (adh *AdminHandler) StartAdminBatchOperation(
15981598
countResp, err := adh.visibilityMgr.CountWorkflowExecutions(ctx, &manager.CountWorkflowExecutionsRequest{
15991599
NamespaceID: namespaceID,
16001600
Namespace: namespace.Name(request.GetNamespace()),
1601-
Query: batcher.OpenBatchOperationQuery,
1601+
Query: batcher.OpenAdminBatchOperationQuery,
16021602
})
16031603
if err != nil {
16041604
return nil, err
16051605
}
16061606

1607-
openBatchOperationCount := int(countResp.Count)
1608-
if openBatchOperationCount >= maxConcurrentBatchOperation {
1607+
openAdminBatchOperationCount := int(countResp.Count)
1608+
if openAdminBatchOperationCount >= maxConcurrentBatchOperation {
16091609
return nil, &serviceerror.ResourceExhausted{
16101610
Cause: enumspb.RESOURCE_EXHAUSTED_CAUSE_CONCURRENT_LIMIT,
16111611
Scope: enumspb.RESOURCE_EXHAUSTED_SCOPE_NAMESPACE,
1612-
Message: "Max concurrent batch operations is reached",
1612+
Message: "Max concurrent admin batch operations is reached",
16131613
}
16141614
}
16151615

@@ -1621,9 +1621,9 @@ func (adh *AdminHandler) StartAdminBatchOperation(
16211621
var identity string
16221622
var batchTypeMemo string
16231623
switch op := request.Operation.(type) {
1624-
case *adminservice.StartAdminBatchOperationRequest_RefreshWorkflowTasksOperation:
1624+
case *adminservice.StartAdminBatchOperationRequest_RefreshTasksOperation:
16251625
batchTypeMemo = "refresh_workflow_tasks"
1626-
identity = op.RefreshWorkflowTasksOperation.GetIdentity()
1626+
identity = op.RefreshTasksOperation.GetIdentity()
16271627
default:
16281628
return nil, serviceerror.NewInvalidArgumentf("The operation type %T is not supported", op)
16291629
}
@@ -1642,7 +1642,7 @@ func (adh *AdminHandler) StartAdminBatchOperation(
16421642

16431643
var searchAttributes *commonpb.SearchAttributes
16441644
searchattribute.AddSearchAttribute(&searchAttributes, sadefs.BatcherUser, payload.EncodeString(identity))
1645-
searchattribute.AddSearchAttribute(&searchAttributes, sadefs.TemporalNamespaceDivision, payload.EncodeString(batcher.NamespaceDivision))
1645+
searchattribute.AddSearchAttribute(&searchAttributes, sadefs.TemporalNamespaceDivision, payload.EncodeString(batcher.AdminNamespaceDivision))
16461646

16471647
startReq := &workflowservice.StartWorkflowExecutionRequest{
16481648
Namespace: request.Namespace,
@@ -1656,7 +1656,6 @@ func (adh *AdminHandler) StartAdminBatchOperation(
16561656
WorkflowIdReusePolicy: enumspb.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE,
16571657
Memo: memo,
16581658
SearchAttributes: searchAttributes,
1659-
Priority: &commonpb.Priority{}, // ie default priority
16601659
}
16611660

16621661
_, err = adh.historyClient.StartWorkflowExecution(
@@ -1691,7 +1690,7 @@ func validateAdminBatchOperation(params *adminservice.StartAdminBatchOperationRe
16911690
}
16921691

16931692
switch op := params.GetOperation().(type) {
1694-
case *adminservice.StartAdminBatchOperationRequest_RefreshWorkflowTasksOperation:
1693+
case *adminservice.StartAdminBatchOperationRequest_RefreshTasksOperation:
16951694
// No additional validation needed
16961695
return nil
16971696
default:

service/frontend/service.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,8 @@ type Config struct {
173173
// Batch operation dynamic configs
174174
MaxConcurrentBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
175175
MaxExecutionCountBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
176+
// Admin Batch operation dynamic config
177+
MaxConcurrentAdminBatchOperation dynamicconfig.IntPropertyFnWithNamespaceFilter
176178

177179
EnableUpdateWorkflowExecution dynamicconfig.BoolPropertyFnWithNamespaceFilter
178180
EnableUpdateWorkflowExecutionAsyncAccepted dynamicconfig.BoolPropertyFnWithNamespaceFilter

0 commit comments

Comments
 (0)