Skip to content

Commit 078daf9

Browse files
feat(shard-manager): Add support for watching drains (#7697)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** This PR introduces DrainSignalObserver interface in clientcommon to allow shard-distributor components to react to infrastructure drain signals. DrainSignalObserver is a simple interface that allows deployment-specific implementations to signal when this instance has been removed from or added back to service discovery. The leader namespace manager subscribes to drain and signal to proactively resign from etcd elections, it also listens to undrain signal to resume back leadership operations to campain again for the namespace. <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** The shard-distributor leader holds an etcd lease to coordinate shard assignments across all executors. In production environments, infrastructure operations (e.g. host drains) can remove a service instance from service discovery while the process continues running. Without active detection, the leader in a drained zone continues holding its etcd lease and operating normally - unaware that it is no longer reachable by other components. <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** Added unit tests and checked with `go test -v ./service/sharddistributor/leader/namespace` <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** NA <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** NA <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** NA --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain) --------- Signed-off-by: Gaziza Yestemirova <gaziza@uber.com>
1 parent ac363d7 commit 078daf9

File tree

4 files changed

+440
-96
lines changed

4 files changed

+440
-96
lines changed
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package clientcommon
2+
3+
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination drain_observer_mock.go . DrainSignalObserver
4+
5+
// DrainSignalObserver observes infrastructure drain signals.
6+
// Drain is reversible: if the instance reappears in discovery,
7+
// Undrain() fires, allowing the consumer to resume operations.
8+
//
9+
// Implementations use close-to-broadcast semantics: the returned channel is
10+
// closed when the event occurs, so all goroutines selecting on it wake up.
11+
// After each close, a fresh channel is created for the next cycle.
12+
type DrainSignalObserver interface {
13+
// Drain returns a channel closed when the instance is
14+
// removed from service discovery.
15+
Drain() <-chan struct{}
16+
17+
// Undrain returns a channel closed when the instance is
18+
// added back to service discovery after a drain.
19+
Undrain() <-chan struct{}
20+
}

service/sharddistributor/client/clientcommon/drain_observer_mock.go

Lines changed: 68 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

service/sharddistributor/leader/namespace/manager.go

Lines changed: 103 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99

1010
"github.com/uber/cadence/common/log"
1111
"github.com/uber/cadence/common/log/tag"
12+
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
1213
"github.com/uber/cadence/service/sharddistributor/config"
1314
"github.com/uber/cadence/service/sharddistributor/leader/election"
1415
)
@@ -19,21 +20,28 @@ var Module = fx.Module(
1920
fx.Invoke(NewManager),
2021
)
2122

23+
// stateFn is a recursive function type representing a state in the election
24+
// state machine.
25+
// Each state function blocks until a transition occurs and returns the next state function,
26+
// or nil to stop the machine.
27+
type stateFn func(ctx context.Context) stateFn
28+
2229
type Manager struct {
2330
cfg config.ShardDistribution
2431
logger log.Logger
2532
electionFactory election.Factory
33+
drainObserver clientcommon.DrainSignalObserver
2634
namespaces map[string]*namespaceHandler
2735
ctx context.Context
2836
cancel context.CancelFunc
2937
}
3038

3139
type namespaceHandler struct {
32-
logger log.Logger
33-
elector election.Elector
34-
cancel context.CancelFunc
35-
namespaceCfg config.Namespace
36-
cleanupWg sync.WaitGroup
40+
logger log.Logger
41+
electionFactory election.Factory
42+
namespaceCfg config.Namespace
43+
drainObserver clientcommon.DrainSignalObserver
44+
cleanupWg sync.WaitGroup
3745
}
3846

3947
type ManagerParams struct {
@@ -43,6 +51,7 @@ type ManagerParams struct {
4351
Logger log.Logger
4452
ElectionFactory election.Factory
4553
Lifecycle fx.Lifecycle
54+
DrainObserver clientcommon.DrainSignalObserver `optional:"true"`
4655
}
4756

4857
// NewManager creates a new namespace manager
@@ -51,6 +60,7 @@ func NewManager(p ManagerParams) *Manager {
5160
cfg: p.Cfg,
5261
logger: p.Logger.WithTags(tag.ComponentNamespaceManager),
5362
electionFactory: p.ElectionFactory,
63+
drainObserver: p.DrainObserver,
5464
namespaces: make(map[string]*namespaceHandler),
5565
}
5666

@@ -73,7 +83,9 @@ func (m *Manager) Start(ctx context.Context) error {
7383
return nil
7484
}
7585

76-
// Stop gracefully stops all namespace handlers
86+
// Stop gracefully stops all namespace handlers.
87+
// Cancels the manager context which cascades to all handler contexts,
88+
// then waits for all election goroutines to finish.
7789
func (m *Manager) Stop(ctx context.Context) error {
7890
if m.cancel == nil {
7991
return fmt.Errorf("manager was not running")
@@ -82,69 +94,119 @@ func (m *Manager) Stop(ctx context.Context) error {
8294
m.cancel()
8395

8496
for ns, handler := range m.namespaces {
85-
m.logger.Info("Stopping namespace handler", tag.ShardNamespace(ns))
86-
if handler.cancel != nil {
87-
handler.cancel()
88-
}
97+
m.logger.Info("Waiting for namespace handler to stop", tag.ShardNamespace(ns))
98+
handler.cleanupWg.Wait()
8999
}
90100

91101
return nil
92102
}
93103

94-
// handleNamespace sets up leadership election for a namespace
104+
// handleNamespace sets up a namespace handler and starts its election goroutine.
95105
func (m *Manager) handleNamespace(namespaceCfg config.Namespace) error {
96106
if _, exists := m.namespaces[namespaceCfg.Name]; exists {
97107
return fmt.Errorf("namespace %s already running", namespaceCfg.Name)
98108
}
99109

100-
m.logger.Info("Setting up namespace handler", tag.ShardNamespace(namespaceCfg.Name))
101-
102-
ctx, cancel := context.WithCancel(m.ctx)
103-
104-
// Create elector for this namespace
105-
elector, err := m.electionFactory.CreateElector(ctx, namespaceCfg)
106-
if err != nil {
107-
cancel()
108-
return err
109-
}
110-
111110
handler := &namespaceHandler{
112-
logger: m.logger.WithTags(tag.ShardNamespace(namespaceCfg.Name)),
113-
elector: elector,
114-
}
115-
// cancel cancels the context and ensures that electionRunner is stopped.
116-
handler.cancel = func() {
117-
cancel()
118-
handler.cleanupWg.Wait()
111+
logger: m.logger.WithTags(tag.ShardNamespace(namespaceCfg.Name)),
112+
electionFactory: m.electionFactory,
113+
namespaceCfg: namespaceCfg,
114+
drainObserver: m.drainObserver,
119115
}
120116

121117
m.namespaces[namespaceCfg.Name] = handler
122118
handler.cleanupWg.Add(1)
123-
// Start leadership election
124-
go handler.runElection(ctx)
119+
120+
go handler.runElection(m.ctx)
125121

126122
return nil
127123
}
128124

129-
// runElection manages the leadership election for a namespace
130-
func (handler *namespaceHandler) runElection(ctx context.Context) {
131-
defer handler.cleanupWg.Done()
125+
// runElection drives the election state machine for a namespace.
126+
// It starts in the campaigning state and follows state transitions
127+
// until a state returns nil (stop).
128+
func (h *namespaceHandler) runElection(ctx context.Context) {
129+
defer h.cleanupWg.Done()
132130

133-
handler.logger.Info("Starting election for namespace")
131+
for state := h.campaigning; state != nil; {
132+
state = state(ctx)
133+
}
134+
}
134135

135-
leaderCh := handler.elector.Run(ctx)
136+
func (h *namespaceHandler) drainChannel() <-chan struct{} {
137+
if h.drainObserver != nil {
138+
return h.drainObserver.Drain()
139+
}
140+
return nil
141+
}
142+
143+
func (h *namespaceHandler) startElection(ctx context.Context) (<-chan bool, context.CancelFunc, error) {
144+
electorCtx, cancel := context.WithCancel(ctx)
145+
elector, err := h.electionFactory.CreateElector(electorCtx, h.namespaceCfg)
146+
if err != nil {
147+
cancel()
148+
return nil, nil, err
149+
}
150+
return elector.Run(electorCtx), cancel, nil
151+
}
152+
153+
// campaigning creates an elector and participates in leader election.
154+
// Transitions: h.idle on drain, h.campaigning on recoverable error, nil on stop.
155+
func (h *namespaceHandler) campaigning(ctx context.Context) stateFn {
156+
h.logger.Info("Entering campaigning state")
157+
158+
drainCh := h.drainChannel()
159+
160+
select {
161+
case <-drainCh:
162+
h.logger.Info("Drain signal detected before election start")
163+
return h.idle
164+
default:
165+
}
166+
167+
leaderCh, cancel, err := h.startElection(ctx)
168+
if err != nil {
169+
h.logger.Error("Failed to create elector", tag.Error(err))
170+
return nil
171+
}
172+
defer cancel()
136173

137174
for {
138175
select {
139176
case <-ctx.Done():
140-
handler.logger.Info("Context cancelled, stopping election")
141-
return
142-
case isLeader := <-leaderCh:
177+
return nil
178+
case <-drainCh:
179+
h.logger.Info("Drain signal received, resigning from election")
180+
return h.idle
181+
case isLeader, ok := <-leaderCh:
182+
if !ok {
183+
h.logger.Error("Election channel closed unexpectedly")
184+
return h.campaigning
185+
}
143186
if isLeader {
144-
handler.logger.Info("Became leader for namespace")
187+
h.logger.Info("Became leader for namespace")
145188
} else {
146-
handler.logger.Info("Lost leadership for namespace")
189+
h.logger.Info("Lost leadership for namespace")
147190
}
148191
}
149192
}
150193
}
194+
195+
// idle waits for an undrain signal to resume campaigning.
196+
// Transitions: h.campaigning on undrain, nil on stop.
197+
func (h *namespaceHandler) idle(ctx context.Context) stateFn {
198+
h.logger.Info("Entering idle state (drained)")
199+
200+
var undrainCh <-chan struct{}
201+
if h.drainObserver != nil {
202+
undrainCh = h.drainObserver.Undrain()
203+
}
204+
205+
select {
206+
case <-ctx.Done():
207+
return nil
208+
case <-undrainCh:
209+
h.logger.Info("Undrain signal received, resuming election")
210+
return h.campaigning
211+
}
212+
}

0 commit comments

Comments
 (0)