Skip to content

Commit dc91b29

Browse files
committed
feat(matching): use shardprocessors instead of taskListmanagers
1 parent db4bd6b commit dc91b29

File tree

13 files changed

+217
-122
lines changed

13 files changed

+217
-122
lines changed

service/matching/handler/engine.go

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import (
5454
"github.com/uber/cadence/service/matching/config"
5555
"github.com/uber/cadence/service/matching/event"
5656
"github.com/uber/cadence/service/matching/tasklist"
57+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
5758
)
5859

5960
// If sticky poller is not seem in last 10s, we treat it as sticky worker unavailable
@@ -88,8 +89,10 @@ type (
8889
tokenSerializer common.TaskTokenSerializer
8990
logger log.Logger
9091
metricsClient metrics.Client
91-
taskListsLock sync.RWMutex // locks mutation of taskLists
92-
taskLists map[tasklist.Identifier]tasklist.Manager // Convert to LRU cache
92+
taskListsLock sync.RWMutex // locks mutation of taskLists
93+
taskLists map[tasklist.Identifier]tasklist.ShardProcessor // Convert to LRU cache
94+
executor executorclient.Executor[tasklist.ShardProcessor]
95+
taskListsFactory *tasklist.ShardProcessorFactory
9396
config *config.Config
9497
lockableQueryTaskMap lockableQueryTaskMap
9598
domainCache cache.DomainCache
@@ -135,15 +138,14 @@ func NewEngine(
135138
isolationState isolationgroup.State,
136139
timeSource clock.TimeSource,
137140
) Engine {
138-
139141
e := &matchingEngineImpl{
140142
shutdown: make(chan struct{}),
141143
shutdownCompletion: &sync.WaitGroup{},
142144
taskManager: taskManager,
143145
clusterMetadata: clusterMetadata,
144146
historyService: historyService,
145147
tokenSerializer: common.NewJSONTaskTokenSerializer(),
146-
taskLists: make(map[tasklist.Identifier]tasklist.Manager),
148+
taskLists: make(map[tasklist.Identifier]tasklist.ShardProcessor),
147149
logger: logger.WithTags(tag.ComponentMatchingEngine),
148150
metricsClient: metricsClient,
149151
matchingClient: matchingClient,
@@ -156,6 +158,7 @@ func NewEngine(
156158
timeSource: timeSource,
157159
}
158160

161+
e.setupTaskListFactory()
159162
e.shutdownCompletion.Add(1)
160163
go e.runMembershipChangeLoop()
161164

@@ -176,10 +179,27 @@ func (e *matchingEngineImpl) Stop() {
176179
e.shutdownCompletion.Wait()
177180
}
178181

179-
func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.Manager {
182+
func (e *matchingEngineImpl) setupTaskListFactory() {
183+
taskListFactory := &tasklist.ShardProcessorFactory{
184+
DomainCache: e.domainCache,
185+
Logger: e.logger,
186+
MetricsClient: e.metricsClient,
187+
TaskManager: e.taskManager,
188+
ClusterMetadata: e.clusterMetadata,
189+
IsolationState: e.isolationState,
190+
MatchingClient: e.matchingClient,
191+
CloseCallback: e.removeTaskListManager,
192+
Cfg: e.config,
193+
TimeSource: e.timeSource,
194+
CreateTime: e.timeSource.Now(),
195+
HistoryService: e.historyService}
196+
e.taskListsFactory = taskListFactory
197+
}
198+
199+
func (e *matchingEngineImpl) getTaskLists(maxCount int) []tasklist.ShardProcessor {
180200
e.taskListsLock.RLock()
181201
defer e.taskListsLock.RUnlock()
182-
lists := make([]tasklist.Manager, 0, len(e.taskLists))
202+
lists := make([]tasklist.ShardProcessor, 0, len(e.taskLists))
183203
count := 0
184204
for _, tlMgr := range e.taskLists {
185205
lists = append(lists, tlMgr)
@@ -202,7 +222,7 @@ func (e *matchingEngineImpl) String() string {
202222

203223
// Returns taskListManager for a task list. If not already cached gets new range from DB and
204224
// if successful creates one.
205-
func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, taskListKind types.TaskListKind) (tasklist.Manager, error) {
225+
func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, taskListKind types.TaskListKind) (tasklist.ShardProcessor, error) {
206226
// The first check is an optimization so almost all requests will have a task list manager
207227
// and return avoiding the write lock
208228
e.taskListsLock.RLock()
@@ -232,22 +252,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
232252
)
233253

234254
logger.Info("Task list manager state changed", tag.LifeCycleStarting)
235-
params := tasklist.ManagerParams{
236-
DomainCache: e.domainCache,
237-
Logger: e.logger,
238-
MetricsClient: e.metricsClient,
239-
TaskManager: e.taskManager,
240-
ClusterMetadata: e.clusterMetadata,
241-
IsolationState: e.isolationState,
242-
MatchingClient: e.matchingClient,
243-
CloseCallback: e.removeTaskListManager,
244-
TaskList: taskList,
245-
TaskListKind: taskListKind,
246-
Cfg: e.config,
247-
TimeSource: e.timeSource,
248-
CreateTime: e.timeSource.Now(),
249-
HistoryService: e.historyService}
250-
mgr, err := tasklist.NewManager(params)
255+
mgr, err := e.taskListsFactory.NewShardProcessorWithTaskListIdentifier(taskList, taskListKind)
251256
if err != nil {
252257
e.taskListsLock.Unlock()
253258
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
@@ -260,7 +265,7 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
260265
float64(len(e.taskLists)),
261266
)
262267
e.taskListsLock.Unlock()
263-
err = mgr.Start()
268+
err = mgr.Start(context.Background())
264269
if err != nil {
265270
logger.Info("Task list manager state changed", tag.LifeCycleStartFailed, tag.Error(err))
266271
return nil, err
@@ -298,18 +303,19 @@ func (e *matchingEngineImpl) getTaskListByDomainLocked(domainID string, taskList
298303
}
299304

300305
// For use in tests
301-
func (e *matchingEngineImpl) updateTaskList(taskList *tasklist.Identifier, mgr tasklist.Manager) {
306+
func (e *matchingEngineImpl) updateTaskList(taskList *tasklist.Identifier, mgr tasklist.ShardProcessor) {
302307
e.taskListsLock.Lock()
303308
defer e.taskListsLock.Unlock()
304309
e.taskLists[*taskList] = mgr
305310
}
306311

307-
func (e *matchingEngineImpl) removeTaskListManager(tlMgr tasklist.Manager) {
312+
func (e *matchingEngineImpl) removeTaskListManager(tlMgr tasklist.ShardProcessor) {
308313
id := tlMgr.TaskListID()
309314
e.taskListsLock.Lock()
310315
defer e.taskListsLock.Unlock()
316+
311317
currentTlMgr, ok := e.taskLists[*id]
312-
if ok && tlMgr == currentTlMgr {
318+
if ok && currentTlMgr.String() == tlMgr.String() {
313319
delete(e.taskLists, *id)
314320
}
315321

service/matching/handler/engine_integration_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,7 @@ func (s *matchingEngineSuite) SyncMatchTasks(taskType int, enableIsolation bool)
681681
// So we can get snapshots
682682
scope := tally.NewTestScope("test", nil)
683683
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
684+
s.matchingEngine.taskListsFactory.MetricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
684685

685686
testParam := newTestParam(s.T(), taskType)
686687
s.taskManager.SetRangeID(testParam.TaskListID, initialRangeID)
@@ -840,6 +841,7 @@ func (s *matchingEngineSuite) ConcurrentAddAndPollTasks(taskType int, workerCoun
840841
}
841842
scope := tally.NewTestScope("test", nil)
842843
s.matchingEngine.metricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
844+
s.matchingEngine.taskListsFactory.MetricsClient = metrics.NewClient(scope, metrics.Matching, metrics.HistogramMigration{})
843845

844846
const initialRangeID = 0
845847
const rangeSize = 3

0 commit comments

Comments
 (0)