@@ -112,6 +112,7 @@ type (
112112 timeSource clock.TimeSource
113113 failoverNotificationVersion int64
114114 ShardDistributorMatchingConfig clientcommon.Config
115+ selfAddress membership.HostInfo
115116 }
116117)
117118
@@ -142,6 +143,11 @@ func NewEngine(
142143 shardDistributorClient executorclient.Client ,
143144 ShardDistributorMatchingConfig clientcommon.Config ,
144145) Engine {
146+ selfAddress , err := resolver .WhoAmI ()
147+ if err != nil {
148+ logger .Fatal ("failed to lookup self im membership" , tag .Error (err ))
149+ }
150+
145151 e := & matchingEngineImpl {
146152 shutdown : make (chan struct {}),
147153 shutdownCompletion : & sync.WaitGroup {},
@@ -162,6 +168,7 @@ func NewEngine(
162168 isolationState : isolationState ,
163169 timeSource : timeSource ,
164170 ShardDistributorMatchingConfig : ShardDistributorMatchingConfig ,
171+ selfAddress : selfAddress ,
165172 }
166173
167174 e .setupExecutor (shardDistributorClient )
@@ -1515,66 +1522,69 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog(
15151522 }
15161523}
15171524
1518- func (e * matchingEngineImpl ) errIfShardOwnershipLost (ctx context.Context , taskList * tasklist.Identifier ) error {
1519- if ! e .config .EnableTasklistOwnershipGuard () {
1520- return nil
1525+ func (e * matchingEngineImpl ) stillOwnShard (ctx context.Context , taskList * tasklist.Identifier , reason , whoOwns * string ) (bool , error ) {
1526+ if e .isShuttingDown () {
1527+ * reason = "engine is shutting down"
1528+ return false , nil
15211529 }
15221530
1523- // We have a shard-processor shared by all the task lists with the same name.
1524- // For now there is no 1:1 mapping between shards and tasklists. (#tasklists >= #shards)
1525- sp , err := e .executor .GetShardProcess (ctx , taskList .GetName ())
15261531 if e .executor .IsOnboardedToSD () {
1532+ // We have a shard-processor shared by all the task lists with the same name.
1533+ // For now there is no 1:1 mapping between shards and tasklists. (#tasklists >= #shards)
1534+ sp , err := e .executor .GetShardProcess (ctx , taskList .GetName ())
15271535 if err != nil {
1528- return fmt .Errorf ("failed to lookup ownership in SD: %w" , err )
1536+ return false , fmt .Errorf ("failed to lookup ownership in SD: %w" , err )
15291537 }
1538+
15301539 if sp == nil {
1531- return fmt .Errorf ("failed to lookup ownership in SD: shard process is nil" )
1540+ * reason = "shard process not found"
1541+ return false , nil
15321542 }
1533- return nil
1543+ return true , nil
15341544 }
15351545
1536- self , err := e .membershipResolver .WhoAmI ( )
1546+ taskListOwner , err := e .membershipResolver .Lookup ( service . Matching , taskList . GetName () )
15371547 if err != nil {
1538- return fmt .Errorf ("failed to lookup self im membership: %w" , err )
1548+ return false , fmt .Errorf ("failed to lookup task list owner: %w" , err )
1549+ }
1550+ if taskListOwner .Identity () != e .selfAddress .Identity () {
1551+ * reason = "engine does not own this shard"
1552+ * whoOwns = taskListOwner .Identity ()
15391553 }
15401554
1541- if e .isShuttingDown () {
1542- e .logger .Warn ("request to get tasklist is being rejected because engine is shutting down" ,
1543- tag .WorkflowDomainID (taskList .GetDomainID ()),
1544- tag .WorkflowTaskListType (taskList .GetType ()),
1545- tag .WorkflowTaskListName (taskList .GetName ()),
1546- )
1555+ return true , nil
1556+ }
15471557
1548- return cadence_errors .NewTaskListNotOwnedByHostError (
1549- "not known" ,
1550- self .Identity (),
1551- taskList .GetName (),
1552- )
1558+ // Defensive check to make sure we actually own the task list
1559+ //
1560+ // If we try to create a task list manager for a task list that is not owned by us, return an error
1561+ // The new task list manager will steal the task list from the current owner, which should only happen if
1562+ // the task list is owned by the current host.
1563+ func (e * matchingEngineImpl ) errIfShardOwnershipLost (ctx context.Context , taskList * tasklist.Identifier ) error {
1564+ if ! e .config .EnableTasklistOwnershipGuard () {
1565+ return nil
15531566 }
15541567
1555- // Defensive check to make sure we actually own the task list
1556- // If we try to create a task list manager for a task list that is not owned by us, return an error
1557- // The new task list manager will steal the task list from the current owner, which should only happen if
1558- // the task list is owned by the current host.
1559- taskListOwner , err := e .membershipResolver .Lookup (service .Matching , taskList .GetName ())
1568+ reason := "unknown reason"
1569+ whoOwns := "not known"
1570+ ok , err := e .stillOwnShard (ctx , taskList , & reason , & whoOwns )
15601571 if err != nil {
1561- return fmt . Errorf ( "failed to lookup task list owner: %w" , err )
1572+ return err
15621573 }
1563-
1564- if taskListOwner .Identity () != self .Identity () {
1565- e .logger .Warn ("Request to get tasklist is being rejected because engine does not own this shard" ,
1566- tag .WorkflowDomainID (taskList .GetDomainID ()),
1567- tag .WorkflowTaskListType (taskList .GetType ()),
1568- tag .WorkflowTaskListName (taskList .GetName ()),
1569- )
1570- return cadence_errors .NewTaskListNotOwnedByHostError (
1571- taskListOwner .Identity (),
1572- self .Identity (),
1573- taskList .GetName (),
1574- )
1574+ if ok {
1575+ return nil
15751576 }
15761577
1577- return nil
1578+ e .logger .Warn (fmt .Sprintf ("request to get tasklist is being rejected because %s" , reason ),
1579+ tag .WorkflowDomainID (taskList .GetDomainID ()),
1580+ tag .WorkflowTaskListType (taskList .GetType ()),
1581+ tag .WorkflowTaskListName (taskList .GetName ()),
1582+ )
1583+ return cadence_errors .NewTaskListNotOwnedByHostError (
1584+ whoOwns ,
1585+ e .selfAddress .Identity (),
1586+ taskList .GetName (),
1587+ )
15781588}
15791589
15801590func (e * matchingEngineImpl ) isShuttingDown () bool {
0 commit comments