Skip to content

Commit d44afb8

Browse files
feat(cadence-matching): Add drain observer for SD executor client (#7751)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** This PR adds DrainObserver field to resource.Params to inject a DrainSignalObserver into the matching service at initialization. We wire the drain observer through matching.Service -> handler.NewEngine -> setupExecutor -> executorclient.Params, enabling the executor client's heartbeat loop to react to infrastructure drain/undrain signals. <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** The shard-distributor executor client already supports drain-aware behavior via the DrainObserver field on executorclient.Params. However, there was no way to wire a DrainSignalObserver into the matching service's executor client because resource.Params didn't have it. PR follows the same pattern used by ShardDistributorMatchingConfig initialization. <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** Unit tests with `go test -v ./service/matching/handler/` <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** Since drainObserver field is optional there wouldn't be any changes in cadence-matching's behaviour. <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** N/A <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** N/A --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain) Signed-off-by: Gaziza Yestemirova <gaziza@uber.com>
1 parent 18b584f commit d44afb8

File tree

5 files changed

+15
-0
lines changed

5 files changed

+15
-0
lines changed

common/resource/params.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,5 +101,11 @@ type (
101101

102102
// ShardDistributorMatchingConfig is the config for shard distributor executor client in matching service
103103
ShardDistributorMatchingConfig clientcommon.Config
104+
105+
// DrainObserver is an optional observer that signals when this instance is
106+
// drained from service discovery.
107+
// It is used by shard-distributor executor clients to
108+
// gracefully stop processing during drains.
109+
DrainObserver clientcommon.DrainSignalObserver
104110
}
105111
)

service/matching/handler/engine.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ type (
111111
timeSource clock.TimeSource
112112
failoverNotificationVersion int64
113113
ShardDistributorMatchingConfig clientcommon.Config
114+
drainObserver clientcommon.DrainSignalObserver
114115
}
115116
)
116117

@@ -140,6 +141,7 @@ func NewEngine(
140141
timeSource clock.TimeSource,
141142
shardDistributorClient executorclient.Client,
142143
ShardDistributorMatchingConfig clientcommon.Config,
144+
drainObserver clientcommon.DrainSignalObserver,
143145
) Engine {
144146
e := &matchingEngineImpl{
145147
taskListRegistry: tasklist.NewTaskListRegistry(metricsClient),
@@ -161,6 +163,7 @@ func NewEngine(
161163
isolationState: isolationState,
162164
timeSource: timeSource,
163165
ShardDistributorMatchingConfig: ShardDistributorMatchingConfig,
166+
drainObserver: drainObserver,
164167
}
165168

166169
e.setupExecutor(shardDistributorClient)
@@ -215,6 +218,7 @@ func (e *matchingEngineImpl) setupExecutor(shardDistributorExecutorClient execut
215218
"grpc": fmt.Sprintf("%d", e.config.RPCConfig.GRPCPort),
216219
"hostIP": hostIP.String(),
217220
},
221+
DrainObserver: e.drainObserver,
218222
}
219223
executor, err := executorclient.NewExecutor[tasklist.ShardProcessor](params)
220224
if err != nil {

service/matching/handler/engine_integration_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,7 @@ func (s *matchingEngineSuite) newMatchingEngine(
194194
s.mockTimeSource,
195195
s.mockShardExecutorClient,
196196
defaultSDExecutorConfig(),
197+
nil,
197198
).(*matchingEngineImpl)
198199
}
199200

service/matching/handler/membership_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ func TestGetTaskListManager_OwnerShip(t *testing.T) {
139139
mockTimeSource,
140140
mockShardDistributorExecutorClient,
141141
defaultSDExecutorConfig(),
142+
nil,
142143
).(*matchingEngineImpl)
143144

144145
resolverMock.EXPECT().Lookup(gomock.Any(), gomock.Any()).Return(

service/matching/service.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type Service struct {
4545
stopC chan struct{}
4646
config *config.Config
4747
ShardDistributorMatchingConfig clientcommon.Config
48+
drainObserver clientcommon.DrainSignalObserver
4849
}
4950

5051
// NewService builds a new cadence-matching service
@@ -84,6 +85,7 @@ func NewService(
8485
config: serviceConfig,
8586
stopC: make(chan struct{}),
8687
ShardDistributorMatchingConfig: params.ShardDistributorMatchingConfig,
88+
drainObserver: params.DrainObserver,
8789
}, nil
8890
}
8991

@@ -111,6 +113,7 @@ func (s *Service) Start() {
111113
s.GetTimeSource(),
112114
s.GetShardDistributorExecutorClient(),
113115
s.ShardDistributorMatchingConfig,
116+
s.drainObserver,
114117
)
115118

116119
s.handler = handler.NewHandler(engine, s.config, s.GetDomainCache(), s.GetMetricsClient(), s.GetLogger(), s.GetThrottledLogger())

0 commit comments

Comments
 (0)