Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions service/sharddistributor/client/clientcommon/drain_observer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package clientcommon

//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination drain_observer_mock.go . DrainSignalObserver

// DrainSignalObserver observes infrastructure drain signals.
// Drain is reversible: if the instance reappears in discovery,
// Undrain() fires, allowing the consumer to resume operations.
//
// Implementations use close-to-broadcast semantics: the returned channel is
// closed when the event occurs, so all goroutines selecting on it wake up.
// After each close, a fresh channel is created for the next cycle.
type DrainSignalObserver interface {
// Drain returns a channel closed when the instance is
// removed from service discovery.
Drain() <-chan struct{}

// Undrain returns a channel closed when the instance is
// added back to service discovery after a drain.
Undrain() <-chan struct{}
}

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion service/sharddistributor/client/executorclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ type Params[SP ShardProcessor] struct {
ShardProcessorFactory ShardProcessorFactory[SP]
Config clientcommon.Config
TimeSource clock.TimeSource
Metadata ExecutorMetadata `optional:"true"`
Metadata ExecutorMetadata `optional:"true"`
DrainObserver clientcommon.DrainSignalObserver `optional:"true"`
}

// NewExecutorWithNamespace creates an executor for a specific namespace
Expand Down Expand Up @@ -136,6 +137,7 @@ func newExecutorWithConfig[SP ShardProcessor](params Params[SP], namespaceConfig
metadata: syncExecutorMetadata{
data: params.Metadata,
},
drainObserver: params.DrainObserver,
}
executor.setMigrationMode(namespaceConfig.GetMigrationMode())

Expand Down
38 changes: 38 additions & 0 deletions service/sharddistributor/client/executorclient/clientimpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
"github.com/uber/cadence/service/sharddistributor/client/executorclient/metricsconstants"
"github.com/uber/cadence/service/sharddistributor/client/executorclient/syncgeneric"
)
Expand Down Expand Up @@ -104,6 +105,7 @@ type executorImpl[SP ShardProcessor] struct {
metrics tally.Scope
migrationMode atomic.Int32
metadata syncExecutorMetadata
drainObserver clientcommon.DrainSignalObserver
}

func (e *executorImpl[SP]) setMigrationMode(mode types.MigrationMode) {
Expand Down Expand Up @@ -202,6 +204,11 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
heartBeatTimer := e.timeSource.NewTimer(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
defer heartBeatTimer.Stop()

var drainCh <-chan struct{}
if e.drainObserver != nil {
drainCh = e.drainObserver.Drain()
}

for {
select {
case <-ctx.Done():
Expand All @@ -214,6 +221,18 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
e.stopShardProcessors()
e.sendDrainingHeartbeat()
return
case <-drainCh:
e.logger.Info("drain signal received, stopping shard processors")
e.stopShardProcessors()
e.sendDrainingHeartbeat()

if !e.waitForUndrain(ctx) {
return
}

e.logger.Info("undrain signal received, resuming heartbeat")
drainCh = e.drainObserver.Drain()
heartBeatTimer.Reset(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
case <-heartBeatTimer.Chan():
heartBeatTimer.Reset(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
err := e.heartbeatAndUpdateAssignment(ctx)
Expand All @@ -229,6 +248,25 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
}
}

// waitForUndrain blocks until the undrain signal fires or the executor is stopped.
// Returns true if undrained (caller should resume), false if stopped.
func (e *executorImpl[SP]) waitForUndrain(ctx context.Context) bool {
if e.drainObserver == nil {
return false
}

undrainCh := e.drainObserver.Undrain()

select {
case <-ctx.Done():
return false
case <-e.stopC:
return false
case <-undrainCh:
return true
}
}

func (e *executorImpl[SP]) heartbeatAndUpdateAssignment(ctx context.Context) error {
if !e.assignmentMutex.TryLock() {
e.logger.Error("still doing assignment, skipping heartbeat")
Expand Down
134 changes: 93 additions & 41 deletions service/sharddistributor/leader/namespace/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/leader/election"
)
Expand All @@ -19,21 +20,27 @@ var Module = fx.Module(
fx.Invoke(NewManager),
)

// stateFn represents a state in the election state machine.
// Each state is a function that blocks until a transition occurs
// and returns the next state function, or nil to stop.
type stateFn func(ctx context.Context) stateFn

type Manager struct {
cfg config.ShardDistribution
logger log.Logger
electionFactory election.Factory
drainObserver clientcommon.DrainSignalObserver
namespaces map[string]*namespaceHandler
ctx context.Context
cancel context.CancelFunc
}

type namespaceHandler struct {
logger log.Logger
elector election.Elector
cancel context.CancelFunc
namespaceCfg config.Namespace
cleanupWg sync.WaitGroup
logger log.Logger
electionFactory election.Factory
namespaceCfg config.Namespace
drainObserver clientcommon.DrainSignalObserver
cleanupWg sync.WaitGroup
}

type ManagerParams struct {
Expand All @@ -43,6 +50,7 @@ type ManagerParams struct {
Logger log.Logger
ElectionFactory election.Factory
Lifecycle fx.Lifecycle
DrainObserver clientcommon.DrainSignalObserver `optional:"true"`
}

// NewManager creates a new namespace manager
Expand All @@ -51,6 +59,7 @@ func NewManager(p ManagerParams) *Manager {
cfg: p.Cfg,
logger: p.Logger.WithTags(tag.ComponentNamespaceManager),
electionFactory: p.ElectionFactory,
drainObserver: p.DrainObserver,
namespaces: make(map[string]*namespaceHandler),
}

Expand All @@ -73,7 +82,9 @@ func (m *Manager) Start(ctx context.Context) error {
return nil
}

// Stop gracefully stops all namespace handlers
// Stop gracefully stops all namespace handlers.
// Cancels the manager context which cascades to all handler contexts,
// then waits for all election goroutines to finish.
func (m *Manager) Stop(ctx context.Context) error {
if m.cancel == nil {
return fmt.Errorf("manager was not running")
Expand All @@ -82,69 +93,110 @@ func (m *Manager) Stop(ctx context.Context) error {
m.cancel()

for ns, handler := range m.namespaces {
m.logger.Info("Stopping namespace handler", tag.ShardNamespace(ns))
if handler.cancel != nil {
handler.cancel()
}
m.logger.Info("Waiting for namespace handler to stop", tag.ShardNamespace(ns))
handler.cleanupWg.Wait()
}

return nil
}

// handleNamespace sets up leadership election for a namespace
// handleNamespace sets up a namespace handler and starts its election goroutine.
func (m *Manager) handleNamespace(namespaceCfg config.Namespace) error {
if _, exists := m.namespaces[namespaceCfg.Name]; exists {
return fmt.Errorf("namespace %s already running", namespaceCfg.Name)
}

m.logger.Info("Setting up namespace handler", tag.ShardNamespace(namespaceCfg.Name))

ctx, cancel := context.WithCancel(m.ctx)

// Create elector for this namespace
elector, err := m.electionFactory.CreateElector(ctx, namespaceCfg)
if err != nil {
cancel()
return err
}

handler := &namespaceHandler{
logger: m.logger.WithTags(tag.ShardNamespace(namespaceCfg.Name)),
elector: elector,
}
// cancel cancels the context and ensures that electionRunner is stopped.
handler.cancel = func() {
cancel()
handler.cleanupWg.Wait()
logger: m.logger.WithTags(tag.ShardNamespace(namespaceCfg.Name)),
electionFactory: m.electionFactory,
namespaceCfg: namespaceCfg,
drainObserver: m.drainObserver,
}

m.namespaces[namespaceCfg.Name] = handler
handler.cleanupWg.Add(1)
// Start leadership election
go handler.runElection(ctx)

go handler.runElection(m.ctx)

return nil
}

// runElection manages the leadership election for a namespace
func (handler *namespaceHandler) runElection(ctx context.Context) {
defer handler.cleanupWg.Done()
// runElection drives the election state machine for a namespace.
// It starts in the campaigning state and follows state transitions
// until a state returns nil (stop).
func (h *namespaceHandler) runElection(ctx context.Context) {
defer h.cleanupWg.Done()

handler.logger.Info("Starting election for namespace")
for state := h.campaigning; state != nil; {
state = state(ctx)
}
}

leaderCh := handler.elector.Run(ctx)
// campaigning creates an elector and participates in leader election.
// Transitions: h.idle on drain, h.campaigning on recoverable error, nil on stop.
func (h *namespaceHandler) campaigning(ctx context.Context) stateFn {
h.logger.Info("Entering campaigning state")

var drainCh <-chan struct{}
if h.drainObserver != nil {
drainCh = h.drainObserver.Drain()
}

// Check if already drained before creating an elector.
select {
case <-drainCh:
h.logger.Info("Drain signal detected before election start")
return h.idle
default:
}

electorCtx, cancel := context.WithCancel(ctx)
defer cancel()

elector, err := h.electionFactory.CreateElector(electorCtx, h.namespaceCfg)
if err != nil {
h.logger.Error("Failed to create elector", tag.Error(err))
return nil
}

leaderCh := elector.Run(electorCtx)

for {
select {
case <-ctx.Done():
handler.logger.Info("Context cancelled, stopping election")
return
case isLeader := <-leaderCh:
return nil
case <-drainCh:
h.logger.Info("Drain signal received, resigning from election")
return h.idle
case isLeader, ok := <-leaderCh:
if !ok {
h.logger.Error("Election channel closed unexpectedly")
return h.campaigning
}
if isLeader {
handler.logger.Info("Became leader for namespace")
h.logger.Info("Became leader for namespace")
} else {
handler.logger.Info("Lost leadership for namespace")
h.logger.Info("Lost leadership for namespace")
}
}
}
}

// idle waits for an undrain signal to resume campaigning.
// Transitions: h.campaigning on undrain, nil on stop.
func (h *namespaceHandler) idle(ctx context.Context) stateFn {
h.logger.Info("Entering idle state (drained)")

var undrainCh <-chan struct{}
if h.drainObserver != nil {
undrainCh = h.drainObserver.Undrain()
}

select {
case <-ctx.Done():
return nil
case <-undrainCh:
h.logger.Info("Undrain signal received, resuming election")
return h.campaigning
}
}
Loading
Loading