Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 48 additions & 98 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"context"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -89,6 +88,8 @@ type (
}

matchingEngineImpl struct {
taskListCreationLock sync.Mutex
taskListsRegistry tasklist.ManagerRegistry
shutdownCompletion *sync.WaitGroup
shutdown chan struct{}
taskManager persistence.TaskManager
Expand All @@ -99,8 +100,6 @@ type (
logger log.Logger
metricsClient metrics.Client
metricsScope tally.Scope
taskListsLock sync.RWMutex // locks mutation of taskLists
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
executor executorclient.Executor[tasklist.ShardProcessor]
taskListsFactory *tasklist.ShardProcessorFactory
config *config.Config
Expand Down Expand Up @@ -143,13 +142,13 @@ func NewEngine(
ShardDistributorMatchingConfig clientcommon.Config,
) Engine {
e := &matchingEngineImpl{
taskListsRegistry: tasklist.NewManagerRegistry(metricsClient),
shutdown: make(chan struct{}),
shutdownCompletion: &sync.WaitGroup{},
taskManager: taskManager,
clusterMetadata: clusterMetadata,
historyService: historyService,
tokenSerializer: common.NewJSONTaskTokenSerializer(),
taskLists: make(map[tasklist.Identifier]tasklist.Manager),
logger: logger.WithTags(tag.ComponentMatchingEngine),
metricsClient: metricsClient,
metricsScope: metricsScope,
Expand Down Expand Up @@ -180,7 +179,7 @@ func (e *matchingEngineImpl) Stop() {
close(e.shutdown)
e.executor.Stop()
// Executes Stop() on each task list outside of lock
for _, l := range e.getTaskLists(math.MaxInt32) {
for _, l := range e.taskListsRegistry.AllManagers() {
l.Stop()
}
e.unregisterDomainFailoverCallback()
Expand All @@ -191,10 +190,9 @@ func (e *matchingEngineImpl) setupExecutor(shardDistributorExecutorClient execut
cfg, reportTTL := e.getValidatedShardDistributorConfig()

taskListFactory := &tasklist.ShardProcessorFactory{
TaskListsLock: &e.taskListsLock,
TaskLists: e.taskLists,
ReportTTL: reportTTL,
TimeSource: e.timeSource,
TaskListsRegistry: e.taskListsRegistry,
ReportTTL: reportTTL,
TimeSource: e.timeSource,
}
e.taskListsFactory = taskListFactory

Expand Down Expand Up @@ -241,26 +239,15 @@ func (e *matchingEngineImpl) getValidatedShardDistributorConfig() (clientcommon.
return cfg, reportTTL
}

func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()
lists := make([]tasklist.Manager, 0, len(e.taskLists))
count := 0
for _, tlMgr := range e.taskLists {
lists = append(lists, tlMgr)
count++
if count >= maxCount {
break
}
}
return lists
}

func (e *matchingEngineImpl) String() string {
// Executes taskList.String() on each task list outside of lock
buf := new(bytes.Buffer)
for _, l := range e.getTaskLists(1000) {
fmt.Fprintf(buf, "\n%s", l.String())

for i, tl := range e.taskListsRegistry.AllManagers() {
if i >= 1000 {
break
}
fmt.Fprintf(buf, "\n%s", tl.String())
}
return buf.String()
}
Expand All @@ -274,22 +261,19 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
if sp != nil {
// The first check is an optimization so almost all requests will have a task list manager
// and return avoiding the write lock
e.taskListsLock.RLock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.RUnlock()
if result, ok := e.taskListsRegistry.ManagerByTaskListIdentifier(*taskList); ok {
return result, nil
}
e.taskListsLock.RUnlock()
}
err := e.errIfShardOwnershipLost(ctx, taskList)
if err != nil {
return nil, err
}

// If it gets here, write lock and check again in case a task list is created between the two locks
e.taskListsLock.Lock()
if result, ok := e.taskLists[*taskList]; ok {
e.taskListsLock.Unlock()
e.taskListCreationLock.Lock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move the task list creation logic into the registry so that the synchronization happens in that component? We can remove this lock then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this originally, but task creation / initialisation is quite big by itself and has a lot of dependencies on engine.
I think this lock is OK since we need to guard task-creation anyway. I can do a small refactoring afterwards to have this lock in defer, though, so it's only mentioned once with a very clear scope. But currently I kept this exactly as the lock (for taskLists) we had previously.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's fair

if result, ok := e.taskListsRegistry.ManagerByTaskListIdentifier(*taskList); ok {
e.taskListCreationLock.Unlock()
return result, nil
}

Expand All @@ -309,7 +293,7 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
ClusterMetadata: e.clusterMetadata,
IsolationState: e.isolationState,
MatchingClient: e.matchingClient,
Registry: e, // Engine implements ManagerRegistry
Registry: e.taskListsRegistry,
TaskList: taskList,
TaskListKind: taskListKind,
Cfg: e.config,
Expand All @@ -319,17 +303,14 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
}
mgr, err := tasklist.NewManager(params)
if err != nil {
e.taskListsLock.Unlock()
e.taskListCreationLock.Unlock()
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
return nil, err
}

e.taskLists[*taskList] = mgr
e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge(
metrics.TaskListManagersGauge,
float64(len(e.taskLists)),
)
e.taskListsLock.Unlock()
e.taskListsRegistry.Register(*taskList, mgr)
e.taskListCreationLock.Unlock()

err = mgr.Start(context.Background())
if err != nil {
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
Expand Down Expand Up @@ -360,44 +341,6 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
return mgr, nil
}

func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string, taskListKind *types.TaskListKind) *types.GetTaskListsByDomainResponse {
decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse)
activityTaskListMap := make(map[string]*types.DescribeTaskListResponse)
for tl, tlm := range e.taskLists {
if tl.GetDomainID() == domainID && (taskListKind == nil || tlm.GetTaskListKind() == *taskListKind) {
if types.TaskListType(tl.GetType()) == types.TaskListTypeDecision {
decisionTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
} else {
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
}
}
}
return &types.GetTaskListsByDomainResponse{
DecisionTaskListMap: decisionTaskListMap,
ActivityTaskListMap: activityTaskListMap,
}
}

// UnregisterManager implements tasklist.ManagerRegistry.
// It removes a task list manager from the engine's tracking map when the manager stops.
func (e *matchingEngineImpl) UnregisterManager(mgr tasklist.Manager) {
id := mgr.TaskListID()
e.taskListsLock.Lock()
defer e.taskListsLock.Unlock()

// we need to make sure= we still hold the given `mgr` or we
// already created a new one.
currentTlMgr, ok := e.taskLists[*id]
if ok && currentTlMgr == mgr {
delete(e.taskLists, *id)
}

e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge(
metrics.TaskListManagersGauge,
float64(len(e.taskLists)),
)
}

// AddDecisionTask either delivers task directly to waiting poller or save it into task list persistence.
func (e *matchingEngineImpl) AddDecisionTask(
hCtx *handlerContext,
Expand Down Expand Up @@ -1151,6 +1094,26 @@ func (e *matchingEngineImpl) listTaskListPartitions(
return partitionHostInfo, nil
}

func (e *matchingEngineImpl) getTaskListsByDomainAndKind(domainID string, taskListKind *types.TaskListKind) *types.GetTaskListsByDomainResponse {
decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse)
activityTaskListMap := make(map[string]*types.DescribeTaskListResponse)

for _, tlm := range e.taskListsRegistry.ManagersByDomainID(domainID) {
if taskListKind == nil || tlm.GetTaskListKind() == *taskListKind {
tl := tlm.TaskListID()
if types.TaskListType(tl.GetType()) == types.TaskListTypeDecision {
decisionTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
} else {
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
}
}
}
return &types.GetTaskListsByDomainResponse{
DecisionTaskListMap: decisionTaskListMap,
ActivityTaskListMap: activityTaskListMap,
}
}

func (e *matchingEngineImpl) GetTaskListsByDomain(
hCtx *handlerContext,
request *types.GetTaskListsByDomainRequest,
Expand All @@ -1165,9 +1128,7 @@ func (e *matchingEngineImpl) GetTaskListsByDomain(
tlKind = nil
}

e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()
return e.getTaskListByDomainLocked(domainID, tlKind), nil
return e.getTaskListsByDomainAndKind(domainID, tlKind), nil
}

func (e *matchingEngineImpl) UpdateTaskListPartitionConfig(
Expand Down Expand Up @@ -1276,17 +1237,10 @@ func (e *matchingEngineImpl) getAllPartitions(
}

func (e *matchingEngineImpl) unloadTaskList(tlMgr tasklist.Manager) {
id := tlMgr.TaskListID()
e.taskListsLock.Lock()
currentTlMgr, ok := e.taskLists[*id]
if !ok || tlMgr != currentTlMgr {
e.taskListsLock.Unlock()
return
unregistered := e.taskListsRegistry.Unregister(tlMgr)
if unregistered {
tlMgr.Stop()
}
delete(e.taskLists, *id)
e.taskListsLock.Unlock()
// added a new taskList
tlMgr.Stop()
}

// Populate the decision task response based on context and scheduled/started events.
Expand Down Expand Up @@ -1600,9 +1554,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac

taskListNormal := types.TaskListKindNormal

e.taskListsLock.RLock()
resp := e.getTaskListByDomainLocked(domain.GetInfo().ID, &taskListNormal)
e.taskListsLock.RUnlock()
resp := e.getTaskListsByDomainAndKind(domain.GetInfo().ID, &taskListNormal)

for taskListName := range resp.DecisionTaskListMap {
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision, taskListNormal)
Expand All @@ -1614,9 +1566,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac

taskListSticky := types.TaskListKindSticky

e.taskListsLock.RLock()
resp = e.getTaskListByDomainLocked(domain.GetInfo().ID, &taskListSticky)
e.taskListsLock.RUnlock()
resp = e.getTaskListsByDomainAndKind(domain.GetInfo().ID, &taskListSticky)

for taskListName := range resp.DecisionTaskListMap {
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision, taskListSticky)
Expand Down
4 changes: 2 additions & 2 deletions service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ func (s *matchingEngineSuite) TestOnlyUnloadMatchingInstance() {
ClusterMetadata: s.matchingEngine.clusterMetadata,
IsolationState: s.matchingEngine.isolationState,
MatchingClient: s.matchingEngine.matchingClient,
Registry: s.matchingEngine, // Engine implements ManagerRegistry
TaskList: taskListID, // same taskListID as above
Registry: s.matchingEngine.taskListsRegistry,
TaskList: taskListID, // same taskListID as above
TaskListKind: tlKind,
Cfg: s.matchingEngine.config,
TimeSource: s.matchingEngine.timeSource,
Expand Down
Loading
Loading