Skip to content

Commit fb1239a

Browse files
committed
chore(matching): introduce task list registry
Abstracting the way task-lists are managed simplifies matcher: it doesn't need to care about how task-lists managers are stored, retrieved and deleted and don't need to care about locks anymore. In addition, this brings ability to introduce optimized views (like per-domain or per-shard-name) transparently (preserving the same interface). This will avoid quadratic complexity in shard-processor' functions which previously had to iterate over the entire list of task list managers for every single shard. Which could greatly impact performance in production where we have thousands of task-lists in a single matching instance. Signed-off-by: Jan Kisel <dkrot@uber.com>
1 parent 29c6b41 commit fb1239a

14 files changed

+607
-411
lines changed

service/matching/handler/engine.go

Lines changed: 48 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ import (
2727
"context"
2828
"errors"
2929
"fmt"
30-
"math"
3130
"sync"
3231
"time"
3332

@@ -89,6 +88,8 @@ type (
8988
}
9089

9190
matchingEngineImpl struct {
91+
taskListCreationLock sync.Mutex
92+
taskListsRegistry tasklist.ManagerRegistry
9293
shutdownCompletion *sync.WaitGroup
9394
shutdown chan struct{}
9495
taskManager persistence.TaskManager
@@ -99,8 +100,6 @@ type (
99100
logger log.Logger
100101
metricsClient metrics.Client
101102
metricsScope tally.Scope
102-
taskListsLock sync.RWMutex // locks mutation of taskLists
103-
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
104103
executor executorclient.Executor[tasklist.ShardProcessor]
105104
taskListsFactory *tasklist.ShardProcessorFactory
106105
config *config.Config
@@ -143,13 +142,13 @@ func NewEngine(
143142
ShardDistributorMatchingConfig clientcommon.Config,
144143
) Engine {
145144
e := &matchingEngineImpl{
145+
taskListsRegistry: tasklist.NewManagerRegistry(metricsClient),
146146
shutdown: make(chan struct{}),
147147
shutdownCompletion: &sync.WaitGroup{},
148148
taskManager: taskManager,
149149
clusterMetadata: clusterMetadata,
150150
historyService: historyService,
151151
tokenSerializer: common.NewJSONTaskTokenSerializer(),
152-
taskLists: make(map[tasklist.Identifier]tasklist.Manager),
153152
logger: logger.WithTags(tag.ComponentMatchingEngine),
154153
metricsClient: metricsClient,
155154
metricsScope: metricsScope,
@@ -180,7 +179,7 @@ func (e *matchingEngineImpl) Stop() {
180179
close(e.shutdown)
181180
e.executor.Stop()
182181
// Executes Stop() on each task list outside of lock
183-
for _, l := range e.getTaskLists(math.MaxInt32) {
182+
for _, l := range e.taskListsRegistry.AllManagers() {
184183
l.Stop()
185184
}
186185
e.unregisterDomainFailoverCallback()
@@ -191,10 +190,9 @@ func (e *matchingEngineImpl) setupExecutor(shardDistributorExecutorClient execut
191190
cfg, reportTTL := e.getValidatedShardDistributorConfig()
192191

193192
taskListFactory := &tasklist.ShardProcessorFactory{
194-
TaskListsLock: &e.taskListsLock,
195-
TaskLists: e.taskLists,
196-
ReportTTL: reportTTL,
197-
TimeSource: e.timeSource,
193+
TaskListsRegistry: e.taskListsRegistry,
194+
ReportTTL: reportTTL,
195+
TimeSource: e.timeSource,
198196
}
199197
e.taskListsFactory = taskListFactory
200198

@@ -241,26 +239,15 @@ func (e *matchingEngineImpl) getValidatedShardDistributorConfig() (clientcommon.
241239
return cfg, reportTTL
242240
}
243241

244-
func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
245-
e.taskListsLock.RLock()
246-
defer e.taskListsLock.RUnlock()
247-
lists := make([]tasklist.Manager, 0, len(e.taskLists))
248-
count := 0
249-
for _, tlMgr := range e.taskLists {
250-
lists = append(lists, tlMgr)
251-
count++
252-
if count >= maxCount {
253-
break
254-
}
255-
}
256-
return lists
257-
}
258-
259242
func (e *matchingEngineImpl) String() string {
260243
// Executes taskList.String() on each task list outside of lock
261244
buf := new(bytes.Buffer)
262-
for _, l := range e.getTaskLists(1000) {
263-
fmt.Fprintf(buf, "\n%s", l.String())
245+
246+
for i, tl := range e.taskListsRegistry.AllManagers() {
247+
if i >= 1000 {
248+
break
249+
}
250+
fmt.Fprintf(buf, "\n%s", tl.String())
264251
}
265252
return buf.String()
266253
}
@@ -274,22 +261,19 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
274261
if sp != nil {
275262
// The first check is an optimization so almost all requests will have a task list manager
276263
// and return avoiding the write lock
277-
e.taskListsLock.RLock()
278-
if result, ok := e.taskLists[*taskList]; ok {
279-
e.taskListsLock.RUnlock()
264+
if result, ok := e.taskListsRegistry.ManagerByTaskListIdentifier(*taskList); ok {
280265
return result, nil
281266
}
282-
e.taskListsLock.RUnlock()
283267
}
284268
err := e.errIfShardOwnershipLost(ctx, taskList)
285269
if err != nil {
286270
return nil, err
287271
}
288272

289273
// If it gets here, write lock and check again in case a task list is created between the two locks
290-
e.taskListsLock.Lock()
291-
if result, ok := e.taskLists[*taskList]; ok {
292-
e.taskListsLock.Unlock()
274+
e.taskListCreationLock.Lock()
275+
if result, ok := e.taskListsRegistry.ManagerByTaskListIdentifier(*taskList); ok {
276+
e.taskListCreationLock.Unlock()
293277
return result, nil
294278
}
295279

@@ -309,7 +293,7 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
309293
ClusterMetadata: e.clusterMetadata,
310294
IsolationState: e.isolationState,
311295
MatchingClient: e.matchingClient,
312-
Registry: e, // Engine implements ManagerRegistry
296+
Registry: e.taskListsRegistry,
313297
TaskList: taskList,
314298
TaskListKind: taskListKind,
315299
Cfg: e.config,
@@ -319,17 +303,14 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
319303
}
320304
mgr, err := tasklist.NewManager(params)
321305
if err != nil {
322-
e.taskListsLock.Unlock()
306+
e.taskListCreationLock.Unlock()
323307
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
324308
return nil, err
325309
}
326310

327-
e.taskLists[*taskList] = mgr
328-
e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge(
329-
metrics.TaskListManagersGauge,
330-
float64(len(e.taskLists)),
331-
)
332-
e.taskListsLock.Unlock()
311+
e.taskListsRegistry.Register(*taskList, mgr)
312+
e.taskListCreationLock.Unlock()
313+
333314
err = mgr.Start(context.Background())
334315
if err != nil {
335316
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
@@ -360,44 +341,6 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
360341
return mgr, nil
361342
}
362343

363-
func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string, taskListKind *types.TaskListKind) *types.GetTaskListsByDomainResponse {
364-
decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse)
365-
activityTaskListMap := make(map[string]*types.DescribeTaskListResponse)
366-
for tl, tlm := range e.taskLists {
367-
if tl.GetDomainID() == domainID && (taskListKind == nil || tlm.GetTaskListKind() == *taskListKind) {
368-
if types.TaskListType(tl.GetType()) == types.TaskListTypeDecision {
369-
decisionTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
370-
} else {
371-
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
372-
}
373-
}
374-
}
375-
return &types.GetTaskListsByDomainResponse{
376-
DecisionTaskListMap: decisionTaskListMap,
377-
ActivityTaskListMap: activityTaskListMap,
378-
}
379-
}
380-
381-
// UnregisterManager implements tasklist.ManagerRegistry.
382-
// It removes a task list manager from the engine's tracking map when the manager stops.
383-
func (e *matchingEngineImpl) UnregisterManager(mgr tasklist.Manager) {
384-
id := mgr.TaskListID()
385-
e.taskListsLock.Lock()
386-
defer e.taskListsLock.Unlock()
387-
388-
// we need to make sure= we still hold the given `mgr` or we
389-
// already created a new one.
390-
currentTlMgr, ok := e.taskLists[*id]
391-
if ok && currentTlMgr == mgr {
392-
delete(e.taskLists, *id)
393-
}
394-
395-
e.metricsClient.Scope(metrics.MatchingTaskListMgrScope).UpdateGauge(
396-
metrics.TaskListManagersGauge,
397-
float64(len(e.taskLists)),
398-
)
399-
}
400-
401344
// AddDecisionTask either delivers task directly to waiting poller or save it into task list persistence.
402345
func (e *matchingEngineImpl) AddDecisionTask(
403346
hCtx *handlerContext,
@@ -1151,6 +1094,26 @@ func (e *matchingEngineImpl) listTaskListPartitions(
11511094
return partitionHostInfo, nil
11521095
}
11531096

1097+
func (e *matchingEngineImpl) getTaskListsByDomainAndKind(domainID string, taskListKind *types.TaskListKind) *types.GetTaskListsByDomainResponse {
1098+
decisionTaskListMap := make(map[string]*types.DescribeTaskListResponse)
1099+
activityTaskListMap := make(map[string]*types.DescribeTaskListResponse)
1100+
1101+
for _, tlm := range e.taskListsRegistry.ManagersByDomainID(domainID) {
1102+
if taskListKind == nil || tlm.GetTaskListKind() == *taskListKind {
1103+
tl := tlm.TaskListID()
1104+
if types.TaskListType(tl.GetType()) == types.TaskListTypeDecision {
1105+
decisionTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
1106+
} else {
1107+
activityTaskListMap[tl.GetRoot()] = tlm.DescribeTaskList(false)
1108+
}
1109+
}
1110+
}
1111+
return &types.GetTaskListsByDomainResponse{
1112+
DecisionTaskListMap: decisionTaskListMap,
1113+
ActivityTaskListMap: activityTaskListMap,
1114+
}
1115+
}
1116+
11541117
func (e *matchingEngineImpl) GetTaskListsByDomain(
11551118
hCtx *handlerContext,
11561119
request *types.GetTaskListsByDomainRequest,
@@ -1165,9 +1128,7 @@ func (e *matchingEngineImpl) GetTaskListsByDomain(
11651128
tlKind = nil
11661129
}
11671130

1168-
e.taskListsLock.RLock()
1169-
defer e.taskListsLock.RUnlock()
1170-
return e.getTaskListByDomainLocked(domainID, tlKind), nil
1131+
return e.getTaskListsByDomainAndKind(domainID, tlKind), nil
11711132
}
11721133

11731134
func (e *matchingEngineImpl) UpdateTaskListPartitionConfig(
@@ -1276,17 +1237,10 @@ func (e *matchingEngineImpl) getAllPartitions(
12761237
}
12771238

12781239
func (e *matchingEngineImpl) unloadTaskList(tlMgr tasklist.Manager) {
1279-
id := tlMgr.TaskListID()
1280-
e.taskListsLock.Lock()
1281-
currentTlMgr, ok := e.taskLists[*id]
1282-
if !ok || tlMgr != currentTlMgr {
1283-
e.taskListsLock.Unlock()
1284-
return
1240+
unregistered := e.taskListsRegistry.Unregister(tlMgr)
1241+
if unregistered {
1242+
tlMgr.Stop()
12851243
}
1286-
delete(e.taskLists, *id)
1287-
e.taskListsLock.Unlock()
1288-
// added a new taskList
1289-
tlMgr.Stop()
12901244
}
12911245

12921246
// Populate the decision task response based on context and scheduled/started events.
@@ -1600,9 +1554,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac
16001554

16011555
taskListNormal := types.TaskListKindNormal
16021556

1603-
e.taskListsLock.RLock()
1604-
resp := e.getTaskListByDomainLocked(domain.GetInfo().ID, &taskListNormal)
1605-
e.taskListsLock.RUnlock()
1557+
resp := e.getTaskListsByDomainAndKind(domain.GetInfo().ID, &taskListNormal)
16061558

16071559
for taskListName := range resp.DecisionTaskListMap {
16081560
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision, taskListNormal)
@@ -1614,9 +1566,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac
16141566

16151567
taskListSticky := types.TaskListKindSticky
16161568

1617-
e.taskListsLock.RLock()
1618-
resp = e.getTaskListByDomainLocked(domain.GetInfo().ID, &taskListSticky)
1619-
e.taskListsLock.RUnlock()
1569+
resp = e.getTaskListsByDomainAndKind(domain.GetInfo().ID, &taskListSticky)
16201570

16211571
for taskListName := range resp.DecisionTaskListMap {
16221572
e.disconnectTaskListPollersAfterDomainFailover(taskListName, domain, persistence.TaskListTypeDecision, taskListSticky)

service/matching/handler/engine_integration_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,8 +236,8 @@ func (s *matchingEngineSuite) TestOnlyUnloadMatchingInstance() {
236236
ClusterMetadata: s.matchingEngine.clusterMetadata,
237237
IsolationState: s.matchingEngine.isolationState,
238238
MatchingClient: s.matchingEngine.matchingClient,
239-
Registry: s.matchingEngine, // Engine implements ManagerRegistry
240-
TaskList: taskListID, // same taskListID as above
239+
Registry: s.matchingEngine.taskListsRegistry,
240+
TaskList: taskListID, // same taskListID as above
241241
TaskListKind: tlKind,
242242
Cfg: s.matchingEngine.config,
243243
TimeSource: s.matchingEngine.timeSource,

0 commit comments

Comments
 (0)