@@ -27,7 +27,6 @@ import (
2727 "context"
2828 "errors"
2929 "fmt"
30- "math"
3130 "sync"
3231 "time"
3332
8988 }
9089
9190 matchingEngineImpl struct {
91+ taskListCreationLock sync.Mutex
92+ taskListsRegistry tasklist.ManagerRegistry
9293 shutdownCompletion * sync.WaitGroup
9394 shutdown chan struct {}
9495 taskManager persistence.TaskManager
@@ -99,8 +100,6 @@ type (
99100 logger log.Logger
100101 metricsClient metrics.Client
101102 metricsScope tally.Scope
102- taskListsLock sync.RWMutex // locks mutation of taskLists
103- taskLists map [tasklist.Identifier ]tasklist.Manager // Convert to LRU cache
104103 executor executorclient.Executor [tasklist.ShardProcessor ]
105104 taskListsFactory * tasklist.ShardProcessorFactory
106105 config * config.Config
@@ -143,13 +142,13 @@ func NewEngine(
143142 ShardDistributorMatchingConfig clientcommon.Config ,
144143) Engine {
145144 e := & matchingEngineImpl {
145+ taskListsRegistry : tasklist .NewManagerRegistry (metricsClient ),
146146 shutdown : make (chan struct {}),
147147 shutdownCompletion : & sync.WaitGroup {},
148148 taskManager : taskManager ,
149149 clusterMetadata : clusterMetadata ,
150150 historyService : historyService ,
151151 tokenSerializer : common .NewJSONTaskTokenSerializer (),
152- taskLists : make (map [tasklist.Identifier ]tasklist.Manager ),
153152 logger : logger .WithTags (tag .ComponentMatchingEngine ),
154153 metricsClient : metricsClient ,
155154 metricsScope : metricsScope ,
@@ -180,7 +179,7 @@ func (e *matchingEngineImpl) Stop() {
180179 close (e .shutdown )
181180 e .executor .Stop ()
182181 // Executes Stop() on each task list outside of lock
183- for _ , l := range e .getTaskLists ( math . MaxInt32 ) {
182+ for _ , l := range e .taskListsRegistry . AllManagers ( ) {
184183 l .Stop ()
185184 }
186185 e .unregisterDomainFailoverCallback ()
@@ -191,10 +190,9 @@ func (e *matchingEngineImpl) setupExecutor(shardDistributorExecutorClient execut
191190 cfg , reportTTL := e .getValidatedShardDistributorConfig ()
192191
193192 taskListFactory := & tasklist.ShardProcessorFactory {
194- TaskListsLock : & e .taskListsLock ,
195- TaskLists : e .taskLists ,
196- ReportTTL : reportTTL ,
197- TimeSource : e .timeSource ,
193+ TaskListsRegistry : e .taskListsRegistry ,
194+ ReportTTL : reportTTL ,
195+ TimeSource : e .timeSource ,
198196 }
199197 e .taskListsFactory = taskListFactory
200198
@@ -241,26 +239,15 @@ func (e *matchingEngineImpl) getValidatedShardDistributorConfig() (clientcommon.
241239 return cfg , reportTTL
242240}
243241
244- func (e * matchingEngineImpl ) getTaskLists (maxCount int ) []tasklist.Manager {
245- e .taskListsLock .RLock ()
246- defer e .taskListsLock .RUnlock ()
247- lists := make ([]tasklist.Manager , 0 , len (e .taskLists ))
248- count := 0
249- for _ , tlMgr := range e .taskLists {
250- lists = append (lists , tlMgr )
251- count ++
252- if count >= maxCount {
253- break
254- }
255- }
256- return lists
257- }
258-
259242func (e * matchingEngineImpl ) String () string {
260243 // Executes taskList.String() on each task list outside of lock
261244 buf := new (bytes.Buffer )
262- for _ , l := range e .getTaskLists (1000 ) {
263- fmt .Fprintf (buf , "\n %s" , l .String ())
245+
246+ for i , tl := range e .taskListsRegistry .AllManagers () {
247+ if i >= 1000 {
248+ break
249+ }
250+ fmt .Fprintf (buf , "\n %s" , tl .String ())
264251 }
265252 return buf .String ()
266253}
@@ -274,22 +261,19 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
274261 if sp != nil {
275262 // The first check is an optimization so almost all requests will have a task list manager
276263 // and return avoiding the write lock
277- e .taskListsLock .RLock ()
278- if result , ok := e .taskLists [* taskList ]; ok {
279- e .taskListsLock .RUnlock ()
264+ if result , ok := e .taskListsRegistry .ManagerByTaskListIdentifier (* taskList ); ok {
280265 return result , nil
281266 }
282- e .taskListsLock .RUnlock ()
283267 }
284268 err := e .errIfShardOwnershipLost (ctx , taskList )
285269 if err != nil {
286270 return nil , err
287271 }
288272
289273 // If it gets here, write lock and check again in case a task list is created between the two locks
290- e .taskListsLock .Lock ()
291- if result , ok := e .taskLists [ * taskList ] ; ok {
292- e .taskListsLock .Unlock ()
274+ e .taskListCreationLock .Lock ()
275+ if result , ok := e .taskListsRegistry . ManagerByTaskListIdentifier ( * taskList ) ; ok {
276+ e .taskListCreationLock .Unlock ()
293277 return result , nil
294278 }
295279
@@ -309,7 +293,7 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
309293 ClusterMetadata : e .clusterMetadata ,
310294 IsolationState : e .isolationState ,
311295 MatchingClient : e .matchingClient ,
312- Registry : e , // Engine implements ManagerRegistry
296+ Registry : e . taskListsRegistry ,
313297 TaskList : taskList ,
314298 TaskListKind : taskListKind ,
315299 Cfg : e .config ,
@@ -319,17 +303,14 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
319303 }
320304 mgr , err := tasklist .NewManager (params )
321305 if err != nil {
322- e .taskListsLock .Unlock ()
306+ e .taskListCreationLock .Unlock ()
323307 logger .Info ("Task list manager state changed" , tag .LifeCycleStartFailed , tag .Error (err ))
324308 return nil , err
325309 }
326310
327- e .taskLists [* taskList ] = mgr
328- e .metricsClient .Scope (metrics .MatchingTaskListMgrScope ).UpdateGauge (
329- metrics .TaskListManagersGauge ,
330- float64 (len (e .taskLists )),
331- )
332- e .taskListsLock .Unlock ()
311+ e .taskListsRegistry .Register (* taskList , mgr )
312+ e .taskListCreationLock .Unlock ()
313+
333314 err = mgr .Start (context .Background ())
334315 if err != nil {
335316 logger .Info ("Task list manager state changed" , tag .LifeCycleStartFailed , tag .Error (err ))
@@ -360,44 +341,6 @@ func (e *matchingEngineImpl) getOrCreateTaskListManager(ctx context.Context, tas
360341 return mgr , nil
361342}
362343
363- func (e * matchingEngineImpl ) getTaskListByDomainLocked (domainID string , taskListKind * types.TaskListKind ) * types.GetTaskListsByDomainResponse {
364- decisionTaskListMap := make (map [string ]* types.DescribeTaskListResponse )
365- activityTaskListMap := make (map [string ]* types.DescribeTaskListResponse )
366- for tl , tlm := range e .taskLists {
367- if tl .GetDomainID () == domainID && (taskListKind == nil || tlm .GetTaskListKind () == * taskListKind ) {
368- if types .TaskListType (tl .GetType ()) == types .TaskListTypeDecision {
369- decisionTaskListMap [tl .GetRoot ()] = tlm .DescribeTaskList (false )
370- } else {
371- activityTaskListMap [tl .GetRoot ()] = tlm .DescribeTaskList (false )
372- }
373- }
374- }
375- return & types.GetTaskListsByDomainResponse {
376- DecisionTaskListMap : decisionTaskListMap ,
377- ActivityTaskListMap : activityTaskListMap ,
378- }
379- }
380-
381- // UnregisterManager implements tasklist.ManagerRegistry.
382- // It removes a task list manager from the engine's tracking map when the manager stops.
383- func (e * matchingEngineImpl ) UnregisterManager (mgr tasklist.Manager ) {
384- id := mgr .TaskListID ()
385- e .taskListsLock .Lock ()
386- defer e .taskListsLock .Unlock ()
387-
388- // we need to make sure= we still hold the given `mgr` or we
389- // already created a new one.
390- currentTlMgr , ok := e .taskLists [* id ]
391- if ok && currentTlMgr == mgr {
392- delete (e .taskLists , * id )
393- }
394-
395- e .metricsClient .Scope (metrics .MatchingTaskListMgrScope ).UpdateGauge (
396- metrics .TaskListManagersGauge ,
397- float64 (len (e .taskLists )),
398- )
399- }
400-
401344// AddDecisionTask either delivers task directly to waiting poller or save it into task list persistence.
402345func (e * matchingEngineImpl ) AddDecisionTask (
403346 hCtx * handlerContext ,
@@ -1151,6 +1094,26 @@ func (e *matchingEngineImpl) listTaskListPartitions(
11511094 return partitionHostInfo , nil
11521095}
11531096
1097+ func (e * matchingEngineImpl ) getTaskListsByDomainAndKind (domainID string , taskListKind * types.TaskListKind ) * types.GetTaskListsByDomainResponse {
1098+ decisionTaskListMap := make (map [string ]* types.DescribeTaskListResponse )
1099+ activityTaskListMap := make (map [string ]* types.DescribeTaskListResponse )
1100+
1101+ for _ , tlm := range e .taskListsRegistry .ManagersByDomainID (domainID ) {
1102+ if taskListKind == nil || tlm .GetTaskListKind () == * taskListKind {
1103+ tl := tlm .TaskListID ()
1104+ if types .TaskListType (tl .GetType ()) == types .TaskListTypeDecision {
1105+ decisionTaskListMap [tl .GetRoot ()] = tlm .DescribeTaskList (false )
1106+ } else {
1107+ activityTaskListMap [tl .GetRoot ()] = tlm .DescribeTaskList (false )
1108+ }
1109+ }
1110+ }
1111+ return & types.GetTaskListsByDomainResponse {
1112+ DecisionTaskListMap : decisionTaskListMap ,
1113+ ActivityTaskListMap : activityTaskListMap ,
1114+ }
1115+ }
1116+
11541117func (e * matchingEngineImpl ) GetTaskListsByDomain (
11551118 hCtx * handlerContext ,
11561119 request * types.GetTaskListsByDomainRequest ,
@@ -1165,9 +1128,7 @@ func (e *matchingEngineImpl) GetTaskListsByDomain(
11651128 tlKind = nil
11661129 }
11671130
1168- e .taskListsLock .RLock ()
1169- defer e .taskListsLock .RUnlock ()
1170- return e .getTaskListByDomainLocked (domainID , tlKind ), nil
1131+ return e .getTaskListsByDomainAndKind (domainID , tlKind ), nil
11711132}
11721133
11731134func (e * matchingEngineImpl ) UpdateTaskListPartitionConfig (
@@ -1276,17 +1237,10 @@ func (e *matchingEngineImpl) getAllPartitions(
12761237}
12771238
12781239func (e * matchingEngineImpl ) unloadTaskList (tlMgr tasklist.Manager ) {
1279- id := tlMgr .TaskListID ()
1280- e .taskListsLock .Lock ()
1281- currentTlMgr , ok := e .taskLists [* id ]
1282- if ! ok || tlMgr != currentTlMgr {
1283- e .taskListsLock .Unlock ()
1284- return
1240+ unregistered := e .taskListsRegistry .Unregister (tlMgr )
1241+ if unregistered {
1242+ tlMgr .Stop ()
12851243 }
1286- delete (e .taskLists , * id )
1287- e .taskListsLock .Unlock ()
1288- // added a new taskList
1289- tlMgr .Stop ()
12901244}
12911245
12921246// Populate the decision task response based on context and scheduled/started events.
@@ -1600,9 +1554,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac
16001554
16011555 taskListNormal := types .TaskListKindNormal
16021556
1603- e .taskListsLock .RLock ()
1604- resp := e .getTaskListByDomainLocked (domain .GetInfo ().ID , & taskListNormal )
1605- e .taskListsLock .RUnlock ()
1557+ resp := e .getTaskListsByDomainAndKind (domain .GetInfo ().ID , & taskListNormal )
16061558
16071559 for taskListName := range resp .DecisionTaskListMap {
16081560 e .disconnectTaskListPollersAfterDomainFailover (taskListName , domain , persistence .TaskListTypeDecision , taskListNormal )
@@ -1614,9 +1566,7 @@ func (e *matchingEngineImpl) domainChangeCallback(nextDomains []*cache.DomainCac
16141566
16151567 taskListSticky := types .TaskListKindSticky
16161568
1617- e .taskListsLock .RLock ()
1618- resp = e .getTaskListByDomainLocked (domain .GetInfo ().ID , & taskListSticky )
1619- e .taskListsLock .RUnlock ()
1569+ resp = e .getTaskListsByDomainAndKind (domain .GetInfo ().ID , & taskListSticky )
16201570
16211571 for taskListName := range resp .DecisionTaskListMap {
16221572 e .disconnectTaskListPollersAfterDomainFailover (taskListName , domain , persistence .TaskListTypeDecision , taskListSticky )
0 commit comments