Skip to content

Commit d08c141

Browse files
authored
Refactor data manager interface (#7229)
* chore: remove unused deprecated constants * fix: rename receiver and update function comment * fix: update comments to align with Go coding styles * fix: typo errors * refactor: address coding issues
1 parent dad83ff commit d08c141

File tree

1 file changed

+46
-49
lines changed

1 file changed

+46
-49
lines changed

common/persistence/data_manager_interfaces.go

Lines changed: 46 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
2020
// THE SOFTWARE.
2121

22-
// Geneate rate limiter wrappers.
22+
// Generate rate limiter wrappers.
2323
//go:generate mockgen -package $GOPACKAGE -destination data_manager_interfaces_mock.go github.com/uber/cadence/common/persistence Task,ShardManager,ExecutionManager,ExecutionManagerFactory,TaskManager,HistoryManager,DomainManager,QueueManager,ConfigStoreManager
2424
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/configstore_generated.go
2525
//go:generate gowrap gen -g -p . -i DomainManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/domain_generated.go
@@ -29,7 +29,7 @@
2929
//go:generate gowrap gen -g -p . -i TaskManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/task_generated.go
3030
//go:generate gowrap gen -g -p . -i ShardManager -t ./wrappers/templates/ratelimited.tmpl -o wrappers/ratelimited/shard_generated.go
3131

32-
// Geneate error injector wrappers.
32+
// Generate error injector wrappers.
3333
//go:generate gowrap gen -g -p . -i ConfigStoreManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/configstore_generated.go
3434
//go:generate gowrap gen -g -p . -i ShardManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/shard_generated.go
3535
//go:generate gowrap gen -g -p . -i ExecutionManager -t ./wrappers/templates/errorinjector.tmpl -o wrappers/errorinjectors/execution_generated.go
@@ -53,6 +53,7 @@ package persistence
5353

5454
import (
5555
"context"
56+
"errors"
5657
"fmt"
5758
"strings"
5859
"time"
@@ -89,7 +90,7 @@ type CreateWorkflowMode int
8990
// QueueType is an enum that represents various queue types in persistence
9091
type QueueType int
9192

92-
// Queue types used in queue table
93+
// DomainReplicationQueueType queue types used in queue table
9394
// Use positive numbers for queue type
9495
// Negative numbers are reserved for DLQ
9596
const (
@@ -98,16 +99,16 @@ const (
9899

99100
// Create Workflow Execution Mode
100101
const (
101-
// Fail if current record exists
102+
// CreateWorkflowModeBrandNew Fail if current record exists
102103
// Only applicable for CreateWorkflowExecution
103104
CreateWorkflowModeBrandNew CreateWorkflowMode = iota
104-
// Update current record only if workflow is closed
105+
// CreateWorkflowModeWorkflowIDReuse Update current record only if workflow is closed
105106
// Only applicable for CreateWorkflowExecution
106107
CreateWorkflowModeWorkflowIDReuse
107-
// Update current record only if workflow is open
108+
// CreateWorkflowModeContinueAsNew Update current record only if workflow is open
108109
// Only applicable for UpdateWorkflowExecution
109110
CreateWorkflowModeContinueAsNew
110-
// Do not update current record since workflow to
111+
// CreateWorkflowModeZombie Do not update current record since workflow to
111112
// applicable for CreateWorkflowExecution, UpdateWorkflowExecution
112113
CreateWorkflowModeZombie
113114
)
@@ -117,13 +118,13 @@ type UpdateWorkflowMode int
117118

118119
// Update Workflow Execution Mode
119120
const (
120-
// Update workflow, including current record
121+
// UpdateWorkflowModeUpdateCurrent Update workflow, including current record
121122
// NOTE: update on current record is a condition update
122123
UpdateWorkflowModeUpdateCurrent UpdateWorkflowMode = iota
123-
// Update workflow, without current record
124+
// UpdateWorkflowModeBypassCurrent Update workflow, without current record
124125
// NOTE: current record CANNOT point to the workflow to be updated
125126
UpdateWorkflowModeBypassCurrent
126-
// Update workflow, ignoring current record
127+
// UpdateWorkflowModeIgnoreCurrent Update workflow, ignoring current record
127128
// NOTE: current record may or may not point to the workflow
128129
// this mode should only be used for (re-)generating workflow tasks
129130
// and there's no other changes to the workflow
@@ -135,10 +136,10 @@ type ConflictResolveWorkflowMode int
135136

136137
// Conflict Resolve Workflow Mode
137138
const (
138-
// Conflict resolve workflow, including current record
139+
// ConflictResolveWorkflowModeUpdateCurrent Conflict resolve workflow, including current record
139140
// NOTE: update on current record is a condition update
140141
ConflictResolveWorkflowModeUpdateCurrent ConflictResolveWorkflowMode = iota
141-
// Conflict resolve workflow, without current record
142+
// ConflictResolveWorkflowModeBypassCurrent Conflict resolve workflow, without current record
142143
// NOTE: current record CANNOT point to the workflow to be updated
143144
ConflictResolveWorkflowModeBypassCurrent
144145
)
@@ -241,16 +242,6 @@ const (
241242
TransferTaskTypeApplyParentClosePolicy // Deprecated: this is related to cross-cluster tasks
242243
)
243244

244-
// Deprecated: Types of cross-cluster tasks. These are deprecated as of
245-
// May 2024
246-
const (
247-
CrossClusterTaskTypeStartChildExecution = iota + 1
248-
CrossClusterTaskTypeCancelExecution
249-
CrossClusterTaskTypeSignalExecution
250-
CrossClusterTaskTypeRecordChildExeuctionCompleted
251-
CrossClusterTaskTypeApplyParentClosePolicy
252-
)
253-
254245
// Types of replication tasks
255246
const (
256247
ReplicationTaskTypeHistory = iota
@@ -285,11 +276,11 @@ type CreateWorkflowRequestMode int
285276

286277
// Modes of create workflow request
287278
const (
288-
// Fail if data with the same domain_id, workflow_id, request_id exists
279+
// CreateWorkflowRequestModeNew Fail if data with the same domain_id, workflow_id, request_id exists
289280
// It is used for transactions started by external API requests
290281
// to allow us detecting duplicate requests
291282
CreateWorkflowRequestModeNew CreateWorkflowRequestMode = iota
292-
// Upsert the data without checking duplication
283+
// CreateWorkflowRequestModeReplicated Upsert the data without checking duplication
293284
// It is used for transactions started by replication stack to achieve
294285
// eventual consistency
295286
CreateWorkflowRequestModeReplicated
@@ -308,16 +299,12 @@ const (
308299
// InitialFailoverNotificationVersion is the initial failover version for a domain
309300
InitialFailoverNotificationVersion int64 = 0
310301

311-
// TransferTaskTransferTargetWorkflowID is the the dummy workflow ID for transfer tasks of types
302+
// TransferTaskTransferTargetWorkflowID is the dummy workflow ID for transfer tasks of types
312303
// that do not have a target workflow
313304
TransferTaskTransferTargetWorkflowID = "20000000-0000-f000-f000-000000000001"
314-
// TransferTaskTransferTargetRunID is the the dummy run ID for transfer tasks of types
305+
// TransferTaskTransferTargetRunID is the dummy run ID for transfer tasks of types
315306
// that do not have a target workflow
316307
TransferTaskTransferTargetRunID = "30000000-0000-f000-f000-000000000002"
317-
// Deprecated: This is deprecated as of May 24
318-
// CrossClusterTaskDefaultTargetRunID is the the dummy run ID for cross-cluster tasks of types
319-
// that do not have a target workflow
320-
CrossClusterTaskDefaultTargetRunID = TransferTaskTransferTargetRunID
321308

322309
// indicate invalid workflow state transition
323310
invalidStateTransitionMsg = "unable to change workflow state from %v to %v, close status %v"
@@ -731,7 +718,7 @@ type (
731718
RangeID int64
732719
}
733720

734-
// GetWorkflowExecutionResponse is the response to GetworkflowExecutionRequest
721+
// GetWorkflowExecutionResponse is the response to GetWorkflowExecutionRequest
735722
GetWorkflowExecutionResponse struct {
736723
State *WorkflowMutableState
737724
MutableStateStats *MutableStateStats
@@ -819,7 +806,7 @@ type (
819806

820807
Mode ConflictResolveWorkflowMode
821808

822-
// workflow to be resetted
809+
// workflow to be reset
823810
ResetWorkflowSnapshot WorkflowSnapshot
824811

825812
// maybe new workflow
@@ -1066,7 +1053,7 @@ type (
10661053
Size int64
10671054
}
10681055

1069-
// CreateTasksRequest is used to create a new task for a workflow exectution
1056+
// CreateTasksRequest is used to create a new task for a workflow execution
10701057
CreateTasksRequest struct {
10711058
TaskListInfo *TaskListInfo
10721059
Tasks []*CreateTaskInfo
@@ -1537,6 +1524,7 @@ type (
15371524
IsWorkflowExecutionExists(ctx context.Context, request *IsWorkflowExecutionExistsRequest) (*IsWorkflowExecutionExistsResponse, error)
15381525

15391526
// Replication task related methods
1527+
15401528
PutReplicationTaskToDLQ(ctx context.Context, request *PutReplicationTaskToDLQRequest) error
15411529
GetReplicationTasksFromDLQ(ctx context.Context, request *GetReplicationTasksFromDLQRequest) (*GetHistoryTasksResponse, error)
15421530
GetReplicationDLQSize(ctx context.Context, request *GetReplicationDLQSizeRequest) (*GetReplicationDLQSizeResponse, error)
@@ -1549,6 +1537,7 @@ type (
15491537
RangeCompleteHistoryTask(ctx context.Context, request *RangeCompleteHistoryTaskRequest) (*RangeCompleteHistoryTaskResponse, error)
15501538

15511539
// Scan operations
1540+
15521541
ListConcreteExecutions(ctx context.Context, request *ListConcreteExecutionsRequest) (*ListConcreteExecutionsResponse, error)
15531542
ListCurrentExecutions(ctx context.Context, request *ListCurrentExecutionsRequest) (*ListCurrentExecutionsResponse, error)
15541543

@@ -1597,7 +1586,7 @@ type (
15971586
// ReadRawHistoryBranch returns history node raw data for a branch ByBatch
15981587
// NOTE: this API should only be used by 3+DC
15991588
ReadRawHistoryBranch(ctx context.Context, request *ReadHistoryBranchRequest) (*ReadRawHistoryBranchResponse, error)
1600-
// ForkHistoryBranch forks a new branch from a old branch
1589+
// ForkHistoryBranch forks a new branch from an old branch
16011590
ForkHistoryBranch(ctx context.Context, request *ForkHistoryBranchRequest) (*ForkHistoryBranchResponse, error)
16021591
// DeleteHistoryBranch removes a branch
16031592
// If this is the last branch to delete, it will also remove the root node
@@ -1657,7 +1646,8 @@ type (
16571646

16581647
// IsTimeoutError check whether error is TimeoutError
16591648
func IsTimeoutError(err error) bool {
1660-
_, ok := err.(*TimeoutError)
1649+
var timeoutError *TimeoutError
1650+
ok := errors.As(err, &timeoutError)
16611651
return ok
16621652
}
16631653

@@ -1940,6 +1930,7 @@ func (t *TimerTaskInfo) ToTask() (Task, error) {
19401930
}
19411931
}
19421932

1933+
// ToNilSafeCopy
19431934
// TODO: it seems that we just need a nil safe shardInfo, deep copy is not necessary
19441935
func (s *ShardInfo) ToNilSafeCopy() *ShardInfo {
19451936
if s == nil {
@@ -2041,23 +2032,23 @@ func (s *ShardInfo) copy() *ShardInfo {
20412032
// SerializeClusterConfigs makes an array of *ClusterReplicationConfig serializable
20422033
// by flattening them into map[string]interface{}
20432034
func SerializeClusterConfigs(replicationConfigs []*ClusterReplicationConfig) []map[string]interface{} {
2044-
seriaizedReplicationConfigs := []map[string]interface{}{}
2035+
var serializedReplicationConfigs []map[string]interface{}
20452036
for index := range replicationConfigs {
2046-
seriaizedReplicationConfigs = append(seriaizedReplicationConfigs, replicationConfigs[index].serialize())
2037+
serializedReplicationConfigs = append(serializedReplicationConfigs, replicationConfigs[index].serialize())
20472038
}
2048-
return seriaizedReplicationConfigs
2039+
return serializedReplicationConfigs
20492040
}
20502041

20512042
// DeserializeClusterConfigs creates an array of ClusterReplicationConfigs from an array of map representations
20522043
func DeserializeClusterConfigs(replicationConfigs []map[string]interface{}) []*ClusterReplicationConfig {
2053-
deseriaizedReplicationConfigs := []*ClusterReplicationConfig{}
2044+
deserializedReplicationConfigs := []*ClusterReplicationConfig{}
20542045
for index := range replicationConfigs {
2055-
deseriaizedReplicationConfig := &ClusterReplicationConfig{}
2056-
deseriaizedReplicationConfig.deserialize(replicationConfigs[index])
2057-
deseriaizedReplicationConfigs = append(deseriaizedReplicationConfigs, deseriaizedReplicationConfig)
2046+
deserializedReplicationConfig := &ClusterReplicationConfig{}
2047+
deserializedReplicationConfig.deserialize(replicationConfigs[index])
2048+
deserializedReplicationConfigs = append(deserializedReplicationConfigs, deserializedReplicationConfig)
20582049
}
20592050

2060-
return deseriaizedReplicationConfigs
2051+
return deserializedReplicationConfigs
20612052
}
20622053

20632054
func (config *ClusterReplicationConfig) serialize() map[string]interface{} {
@@ -2175,8 +2166,11 @@ func NewGetReplicationTasksFromDLQRequest(
21752166

21762167
// IsTransientError checks if the error is a transient persistence error
21772168
func IsTransientError(err error) bool {
2178-
switch err.(type) {
2179-
case *types.InternalServiceError, *types.ServiceBusyError, *TimeoutError:
2169+
var internalServiceError *types.InternalServiceError
2170+
var serviceBusyError *types.ServiceBusyError
2171+
var timeoutError *TimeoutError
2172+
switch {
2173+
case errors.As(err, &internalServiceError), errors.As(err, &serviceBusyError), errors.As(err, &timeoutError):
21802174
return true
21812175
}
21822176

@@ -2185,8 +2179,10 @@ func IsTransientError(err error) bool {
21852179

21862180
// IsBackgroundTransientError checks if the error is a transient error on background jobs
21872181
func IsBackgroundTransientError(err error) bool {
2188-
switch err.(type) {
2189-
case *types.InternalServiceError, *TimeoutError:
2182+
var internalServiceError *types.InternalServiceError
2183+
var timeoutError *TimeoutError
2184+
switch {
2185+
case errors.As(err, &internalServiceError), errors.As(err, &timeoutError):
21902186
return true
21912187
}
21922188

@@ -2286,7 +2282,8 @@ func (p *TaskListPartition) ToInternalType() *types.TaskListPartition {
22862282
return &types.TaskListPartition{IsolationGroups: p.IsolationGroups}
22872283
}
22882284

2285+
// IsActiveActive
22892286
// TODO(active-active): Update unit tests of all components that use this function to cover active-active case
2290-
func (d *DomainReplicationConfig) IsActiveActive() bool {
2291-
return d != nil && d.ActiveClusters != nil && len(d.ActiveClusters.ActiveClustersByRegion) > 0
2287+
func (c *DomainReplicationConfig) IsActiveActive() bool {
2288+
return c != nil && c.ActiveClusters != nil && len(c.ActiveClusters.ActiveClustersByRegion) > 0
22922289
}

0 commit comments

Comments
 (0)