Skip to content

Commit c424b53

Browse files
[active-active] Remove active cluster selection policy row during workflow cleanup (#7053)
1 parent dd4e59b commit c424b53

27 files changed

+439
-29
lines changed

common/log/tag/values.go

Lines changed: 30 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -207,35 +207,36 @@ var (
207207
StoreOperationGetShard = storeOperation("get-shard")
208208
StoreOperationUpdateShard = storeOperation("update-shard")
209209

210-
StoreOperationCreateWorkflowExecution = storeOperation("create-wf-execution")
211-
StoreOperationGetWorkflowExecution = storeOperation("get-wf-execution")
212-
StoreOperationUpdateWorkflowExecution = storeOperation("update-wf-execution")
213-
StoreOperationConflictResolveWorkflowExecution = storeOperation("conflict-resolve-wf-execution")
214-
StoreOperationResetWorkflowExecution = storeOperation("reset-wf-execution")
215-
StoreOperationDeleteWorkflowExecution = storeOperation("delete-wf-execution")
216-
StoreOperationDeleteCurrentWorkflowExecution = storeOperation("delete-current-wf-execution")
217-
StoreOperationGetCurrentExecution = storeOperation("get-current-execution")
218-
StoreOperationListCurrentExecution = storeOperation("list-current-execution")
219-
StoreOperationIsWorkflowExecutionExists = storeOperation("is-wf-execution-exists")
220-
StoreOperationListConcreteExecution = storeOperation("list-concrete-execution")
221-
StoreOperationGetTransferTasks = storeOperation("get-transfer-tasks")
222-
StoreOperationGetCrossClusterTasks = storeOperation("get-cross-cluster-tasks")
223-
StoreOperationGetReplicationTasks = storeOperation("get-replication-tasks")
224-
StoreOperationCompleteTransferTask = storeOperation("complete-transfer-task")
225-
StoreOperationCompleteCrossClusterTask = storeOperation("complete-cross-cluster-task")
226-
StoreOperationCompleteReplicationTask = storeOperation("complete-replication-task")
227-
StoreOperationPutReplicationTaskToDLQ = storeOperation("put-replication-task-to-dlq")
228-
StoreOperationGetReplicationTasksFromDLQ = storeOperation("get-replication-tasks-from-dlq")
229-
StoreOperationGetReplicationDLQSize = storeOperation("get-replication-dlq-size")
230-
StoreOperationDeleteReplicationTaskFromDLQ = storeOperation("delete-replication-task-from-dlq")
231-
StoreOperationRangeDeleteReplicationTaskFromDLQ = storeOperation("range-delete-replication-task-from-dlq")
232-
StoreOperationCreateFailoverMarkerTasks = storeOperation("createFailoverMarkerTasks")
233-
StoreOperationGetTimerIndexTasks = storeOperation("get-timer-index-tasks")
234-
StoreOperationCompleteTimerTask = storeOperation("complete-timer-task")
235-
StoreOperationGetHistoryTasks = storeOperation("get-history-tasks")
236-
StoreOperationCompleteHistoryTask = storeOperation("complete-history-task")
237-
StoreOperationRangeCompleteHistoryTask = storeOperation("range-complete-history-task")
238-
StoreOperationGetActiveClusterSelectionPolicy = storeOperation("get-active-cluster-selection-policy")
210+
StoreOperationCreateWorkflowExecution = storeOperation("create-wf-execution")
211+
StoreOperationGetWorkflowExecution = storeOperation("get-wf-execution")
212+
StoreOperationUpdateWorkflowExecution = storeOperation("update-wf-execution")
213+
StoreOperationConflictResolveWorkflowExecution = storeOperation("conflict-resolve-wf-execution")
214+
StoreOperationResetWorkflowExecution = storeOperation("reset-wf-execution")
215+
StoreOperationDeleteWorkflowExecution = storeOperation("delete-wf-execution")
216+
StoreOperationDeleteCurrentWorkflowExecution = storeOperation("delete-current-wf-execution")
217+
StoreOperationGetCurrentExecution = storeOperation("get-current-execution")
218+
StoreOperationListCurrentExecution = storeOperation("list-current-execution")
219+
StoreOperationIsWorkflowExecutionExists = storeOperation("is-wf-execution-exists")
220+
StoreOperationListConcreteExecution = storeOperation("list-concrete-execution")
221+
StoreOperationGetTransferTasks = storeOperation("get-transfer-tasks")
222+
StoreOperationGetCrossClusterTasks = storeOperation("get-cross-cluster-tasks")
223+
StoreOperationGetReplicationTasks = storeOperation("get-replication-tasks")
224+
StoreOperationCompleteTransferTask = storeOperation("complete-transfer-task")
225+
StoreOperationCompleteCrossClusterTask = storeOperation("complete-cross-cluster-task")
226+
StoreOperationCompleteReplicationTask = storeOperation("complete-replication-task")
227+
StoreOperationPutReplicationTaskToDLQ = storeOperation("put-replication-task-to-dlq")
228+
StoreOperationGetReplicationTasksFromDLQ = storeOperation("get-replication-tasks-from-dlq")
229+
StoreOperationGetReplicationDLQSize = storeOperation("get-replication-dlq-size")
230+
StoreOperationDeleteReplicationTaskFromDLQ = storeOperation("delete-replication-task-from-dlq")
231+
StoreOperationRangeDeleteReplicationTaskFromDLQ = storeOperation("range-delete-replication-task-from-dlq")
232+
StoreOperationCreateFailoverMarkerTasks = storeOperation("createFailoverMarkerTasks")
233+
StoreOperationGetTimerIndexTasks = storeOperation("get-timer-index-tasks")
234+
StoreOperationCompleteTimerTask = storeOperation("complete-timer-task")
235+
StoreOperationGetHistoryTasks = storeOperation("get-history-tasks")
236+
StoreOperationCompleteHistoryTask = storeOperation("complete-history-task")
237+
StoreOperationRangeCompleteHistoryTask = storeOperation("range-complete-history-task")
238+
StoreOperationGetActiveClusterSelectionPolicy = storeOperation("get-active-cluster-selection-policy")
239+
StoreOperationDeleteActiveClusterSelectionPolicy = storeOperation("delete-active-cluster-selection-policy")
239240

240241
StoreOperationCreateTasks = storeOperation("create-tasks")
241242
StoreOperationGetTasks = storeOperation("get-tasks")

common/metrics/defs.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -307,6 +307,8 @@ const (
307307
PersistenceShardRequestCountScope
308308
// PersistenceGetActiveClusterSelectionPolicyScope tracks GetActiveClusterSelectionPolicy calls made by service to persistence layer
309309
PersistenceGetActiveClusterSelectionPolicyScope
310+
// PersistenceDeleteActiveClusterSelectionPolicyScope tracks DeleteActiveClusterSelectionPolicy calls made by service to persistence layer
311+
PersistenceDeleteActiveClusterSelectionPolicyScope
310312

311313
// ResolverHostNotFoundScope is a simple low level error indicating a lookup failed in the membership resolver
312314
ResolverHostNotFoundScope
@@ -1531,6 +1533,7 @@ var ScopeDefs = map[ServiceIdx]map[int]scopeDefinition{
15311533
PersistenceUpdateDynamicConfigScope: {operation: "UpdateDynamicConfig"},
15321534
PersistenceShardRequestCountScope: {operation: "ShardIdPersistenceRequest"},
15331535
PersistenceGetActiveClusterSelectionPolicyScope: {operation: "GetActiveClusterSelectionPolicy"},
1536+
PersistenceDeleteActiveClusterSelectionPolicyScope: {operation: "DeleteActiveClusterSelectionPolicy"},
15341537
ResolverHostNotFoundScope: {operation: "ResolverHostNotFound"},
15351538

15361539
ClusterMetadataArchivalConfigScope: {operation: "ArchivalConfig"},

common/mocks/ExecutionManager.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/data_manager_interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1552,6 +1552,7 @@ type (
15521552
ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
15531553

15541554
GetActiveClusterSelectionPolicy(ctx context.Context, domainID, wfID, rID string) (*types.ActiveClusterSelectionPolicy, error)
1555+
DeleteActiveClusterSelectionPolicy(ctx context.Context, domainID, workflowID, runID string) error
15551556
}
15561557

15571558
// ExecutionManagerFactory creates an instance of ExecutionManager for a given shard

common/persistence/data_manager_interfaces_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/data_store_interfaces.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,7 @@ type (
129129

130130
// Active cluster selection policy related methods
131131
GetActiveClusterSelectionPolicy(ctx context.Context, domainID, wfID, rID string) (*DataBlob, error)
132+
DeleteActiveClusterSelectionPolicy(ctx context.Context, domainID, wfID, rID string) error
132133
}
133134

134135
// HistoryStore is to manager workflow history events

common/persistence/data_store_interfaces_mock.go

Lines changed: 14 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/persistence/execution_manager.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -931,6 +931,13 @@ func (m *executionManagerImpl) GetActiveClusterSelectionPolicy(
931931
return policy, nil
932932
}
933933

934+
func (m *executionManagerImpl) DeleteActiveClusterSelectionPolicy(
935+
ctx context.Context,
936+
domainID, workflowID, runID string,
937+
) error {
938+
return m.persistence.DeleteActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID)
939+
}
940+
934941
func (m *executionManagerImpl) Close() {
935942
m.persistence.Close()
936943
}

common/persistence/execution_manager_test.go

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1199,6 +1199,122 @@ func TestCreateFailoverMarkerTasks(t *testing.T) {
11991199
assert.NoError(t, err)
12001200
}
12011201

1202+
func TestGetActiveClusterSelectionPolicy(t *testing.T) {
1203+
ctx := context.Background()
1204+
domainID := "domainID"
1205+
workflowID := "workflowID"
1206+
runID := "runID"
1207+
1208+
tests := []struct {
1209+
name string
1210+
prepareMocks func(*MockExecutionStore, *MockPayloadSerializer)
1211+
want *types.ActiveClusterSelectionPolicy
1212+
wantErr error
1213+
}{
1214+
{
1215+
name: "success",
1216+
prepareMocks: func(mockedStore *MockExecutionStore, mockedSerializer *MockPayloadSerializer) {
1217+
data := sampleActiveClusterSelectionPolicyData()
1218+
mockedStore.EXPECT().GetActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).
1219+
Return(data, nil)
1220+
mockedSerializer.EXPECT().DeserializeActiveClusterSelectionPolicy(data).Return(sampleActiveClusterSelectionPolicy(), nil)
1221+
},
1222+
want: sampleActiveClusterSelectionPolicy(),
1223+
},
1224+
{
1225+
name: "store error",
1226+
prepareMocks: func(mockedStore *MockExecutionStore, mockedSerializer *MockPayloadSerializer) {
1227+
mockedStore.EXPECT().GetActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).
1228+
Return(nil, assert.AnError)
1229+
},
1230+
wantErr: assert.AnError,
1231+
},
1232+
{
1233+
name: "store returned nil",
1234+
prepareMocks: func(mockedStore *MockExecutionStore, mockedSerializer *MockPayloadSerializer) {
1235+
mockedStore.EXPECT().GetActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).
1236+
Return(nil, nil)
1237+
},
1238+
wantErr: &types.EntityNotExistsError{
1239+
Message: "active cluster selection policy not found",
1240+
},
1241+
},
1242+
{
1243+
name: "deserialize error",
1244+
prepareMocks: func(mockedStore *MockExecutionStore, mockedSerializer *MockPayloadSerializer) {
1245+
data := sampleActiveClusterSelectionPolicyData()
1246+
mockedStore.EXPECT().GetActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).
1247+
Return(data, nil)
1248+
mockedSerializer.EXPECT().DeserializeActiveClusterSelectionPolicy(data).Return(nil, assert.AnError)
1249+
},
1250+
wantErr: assert.AnError,
1251+
},
1252+
}
1253+
1254+
for _, test := range tests {
1255+
t.Run(test.name, func(t *testing.T) {
1256+
ctrl := gomock.NewController(t)
1257+
mockedStore := NewMockExecutionStore(ctrl)
1258+
mockedSerializer := NewMockPayloadSerializer(ctrl)
1259+
manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), mockedSerializer)
1260+
1261+
test.prepareMocks(mockedStore, mockedSerializer)
1262+
1263+
policy, err := manager.GetActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID)
1264+
if test.wantErr != nil {
1265+
assert.EqualError(t, err, test.wantErr.Error())
1266+
} else {
1267+
assert.NoError(t, err)
1268+
assert.Equal(t, test.want, policy)
1269+
}
1270+
})
1271+
}
1272+
}
1273+
1274+
func TestDeleteActiveClusterSelectionPolicy(t *testing.T) {
1275+
ctx := context.Background()
1276+
domainID := "domainID"
1277+
workflowID := "workflowID"
1278+
runID := "runID"
1279+
1280+
tests := []struct {
1281+
name string
1282+
prepareMocks func(*MockExecutionStore)
1283+
wantErr error
1284+
}{
1285+
{
1286+
name: "success",
1287+
prepareMocks: func(mockedStore *MockExecutionStore) {
1288+
mockedStore.EXPECT().DeleteActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).Return(nil)
1289+
},
1290+
},
1291+
{
1292+
name: "store error",
1293+
prepareMocks: func(mockedStore *MockExecutionStore) {
1294+
mockedStore.EXPECT().DeleteActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID).Return(assert.AnError)
1295+
},
1296+
wantErr: assert.AnError,
1297+
},
1298+
}
1299+
1300+
for _, test := range tests {
1301+
t.Run(test.name, func(t *testing.T) {
1302+
ctrl := gomock.NewController(t)
1303+
mockedStore := NewMockExecutionStore(ctrl)
1304+
manager := NewExecutionManagerImpl(mockedStore, testlogger.New(t), nil)
1305+
1306+
test.prepareMocks(mockedStore)
1307+
1308+
err := manager.DeleteActiveClusterSelectionPolicy(ctx, domainID, workflowID, runID)
1309+
if test.wantErr != nil {
1310+
assert.EqualError(t, err, test.wantErr.Error())
1311+
} else {
1312+
assert.NoError(t, err)
1313+
}
1314+
})
1315+
}
1316+
}
1317+
12021318
func sampleInternalActivityInfo(name string) *InternalActivityInfo {
12031319
return &InternalActivityInfo{
12041320
Version: 1,
@@ -1501,6 +1617,13 @@ func sampleActiveClusterSelectionPolicyData() *DataBlob {
15011617
return NewDataBlob([]byte("test-active-cluster-selection-policy"), constants.EncodingTypeThriftRW)
15021618
}
15031619

1620+
func sampleActiveClusterSelectionPolicy() *types.ActiveClusterSelectionPolicy {
1621+
return &types.ActiveClusterSelectionPolicy{
1622+
ActiveClusterSelectionStrategy: types.ActiveClusterSelectionStrategyRegionSticky.Ptr(),
1623+
StickyRegion: "region-1",
1624+
}
1625+
}
1626+
15041627
func sampleInternalChildExecutionInfo(initEventID, startedEventID int64) *InternalChildExecutionInfo {
15051628
return &InternalChildExecutionInfo{
15061629
Version: initEventID,

common/persistence/nosql/nosql_execution_store.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -524,6 +524,17 @@ func (d *nosqlExecutionStore) DeleteCurrentWorkflowExecution(
524524
return nil
525525
}
526526

527+
func (d *nosqlExecutionStore) DeleteActiveClusterSelectionPolicy(
528+
ctx context.Context,
529+
domainID, workflowID, runID string,
530+
) error {
531+
err := d.db.DeleteActiveClusterSelectionPolicy(ctx, d.shardID, domainID, workflowID, runID)
532+
if err != nil {
533+
return convertCommonErrors(d.db, "DeleteActiveClusterSelectionPolicy", err)
534+
}
535+
return nil
536+
}
537+
527538
func (d *nosqlExecutionStore) GetCurrentExecution(
528539
ctx context.Context,
529540
request *persistence.GetCurrentExecutionRequest,

0 commit comments

Comments
 (0)