Skip to content

Commit 4e6f35e

Browse files
authored
fix(shard-distributor): separate watch event processing from the rebalancing loop (#7669)
<!-- 1-2 line summary of WHAT changed technically: - Always link the relevant projects GitHub issue, unless it is a minor bugfix - Good: "Modified FailoverDomain mapper to allow null ActiveClusterName #320" - Bad: "added nil check" --> **What changed?** * Add `runRebalanceTriggeringLoop` method for `namespaceProcessor` <!-- Your goal is to provide all the required context for a future maintainer to understand the reasons for making this change (see https://cbea.ms/git-commit/#why-not-how). How did this work previously (and what was wrong with it)? What has changed, and why did you solve it this way? - Good: "Active-active domains have independent cluster attributes per region. Previously, modifying cluster attributes required spedifying the default ActiveClusterName which updates the global domain default. This prevents operators from updating regional configurations without affecting the primary cluster designation. This change allows attribute updates to be independent of active cluster selection." - Bad: "Improves domain handling" --> **Why?** We observed a server instabily In case of a high number of events and a long shard rebalancing. It caused a growing number of events not processed on the server side and etcd's side. <!-- Include specific test commands and setup. Please include the exact commands such that another maintainer or contributor can reproduce the test steps taken. - e.g Unit test commands with exact invocation `go test -v ./common/types/mapper/proto -run TestFailoverDomainRequest` - For integration tests include setup steps and test commands Example: "Started local server with `./cadence start`, then ran `make test_e2e`" - For local simulation testing include setup steps for the server and how you ran the tests - Good: Full commands that reviewers can copy-paste to verify - Bad: "Tested locally" or "Added tests" --> **How did you test it?** * Unit test * Run on dev cluster <!-- If there are risks that the release engineer should know about document them here. For example: - Has an API/IDL been modified? Is it backwards/forwards compatible? If not, what are the repecussions? - Has a schema change been introduced? Is it possible to roll back? - Has a feature flag been re-used for a new purpose? - Is there a potential performance concern? Is the change modifying core task processing logic? - If truly N/A, you can mark it as such --> **Potential risks** N/A <!-- If this PR completes a user facing feature or changes functionality add release notes here. Your release notes should allow a user and the release engineer to understand the changes with little context. Always ensure that the description contains a link to the relevant GitHub issue. --> **Release notes** N/A <!-- Consider whether this change requires documentation updates in the Cadence-Docs repo - If yes: mention what needs updating (or link to docs PR in cadence-docs repo) - If in doubt, add a note about potential doc needs - Only mark N/A if you're certain no docs are affected --> **Documentation Changes** N/A --- ## Reviewer Validation **PR Description Quality** (check these before reviewing code): - [ ] **"What changed"** provides a clear 1-2 line summary - [ ] Project Issue is linked - [ ] **"Why"** explains the full motivation with sufficient context - [ ] **Testing is documented:** - [ ] Unit test commands are included (with exact `go test` invocation) - [ ] Integration test setup/commands included (if integration tests were run) - [ ] Canary testing details included (if canary was mentioned) - [ ] **Potential risks** section is thoughtfully filled out (or legitimately N/A) - [ ] **Release notes** included if this completes a user-facing feature - [ ] **Documentation** needs are addressed (or noted if uncertain)
1 parent 3c8a7b8 commit 4e6f35e

File tree

2 files changed

+228
-13
lines changed

2 files changed

+228
-13
lines changed

service/sharddistributor/leader/process/processor.go

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,16 @@ func (p *namespaceProcessor) runProcess(ctx context.Context) {
180180

181181
// runRebalancingLoop handles shard assignment and redistribution.
182182
func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
183-
ticker := p.timeSource.NewTicker(p.cfg.Period)
184-
defer ticker.Stop()
185183

186184
// Perform an initial rebalance on startup.
187185
err := p.rebalanceShards(ctx)
188186
if err != nil {
189187
p.logger.Error("initial rebalance failed", tag.Error(err))
190188
}
191189

192-
updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
190+
updateChan, err := p.runRebalanceTriggeringLoop(ctx)
193191
if err != nil {
194-
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
192+
p.logger.Error("failed to start rebalance triggering loop", tag.Error(err))
195193
return
196194
}
197195

@@ -200,22 +198,65 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
200198
case <-ctx.Done():
201199
p.logger.Info("Rebalancing loop cancelled.")
202200
return
201+
202+
case update := <-updateChan:
203+
p.logger.Info("Rebalancing triggered", tag.Dynamic("reason", update))
204+
if err := p.rebalanceShards(ctx); err != nil {
205+
p.logger.Error("rebalance failed", tag.Error(err))
206+
}
207+
}
208+
}
209+
}
210+
211+
// runRebalanceTriggeringLoop monitors for state changes and periodic triggers to initiate rebalancing.
212+
// it doesn't block Subscribe calls to avoid a growing backlog of updates.
213+
func (p *namespaceProcessor) runRebalanceTriggeringLoop(ctx context.Context) (<-chan string, error) {
214+
// Buffered channel to allow one pending rebalance trigger.
215+
triggerChan := make(chan string, 1)
216+
217+
updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
218+
if err != nil {
219+
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
220+
return nil, err
221+
}
222+
223+
go p.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
224+
return triggerChan, nil
225+
}
226+
227+
func (p *namespaceProcessor) rebalanceTriggeringLoop(ctx context.Context, updateChan <-chan int64, triggerChan chan<- string) {
228+
defer close(triggerChan)
229+
230+
ticker := p.timeSource.NewTicker(p.cfg.Period)
231+
defer ticker.Stop()
232+
233+
tryTriggerRebalancing := func(reason string) {
234+
select {
235+
case triggerChan <- reason:
236+
default:
237+
p.logger.Info("Rebalance already pending, skipping trigger attempt", tag.Dynamic("reason", reason))
238+
}
239+
}
240+
241+
for {
242+
select {
243+
case <-ctx.Done():
244+
p.logger.Info("Rebalance triggering loop cancelled")
245+
return
246+
247+
case <-ticker.Chan():
248+
tryTriggerRebalancing("Periodic reconciliation triggered")
249+
203250
case latestRevision, ok := <-updateChan:
204251
if !ok {
205-
p.logger.Info("Update channel closed, stopping rebalancing loop.")
252+
p.logger.Info("Update channel closed, stopping rebalance triggering loop")
206253
return
207254
}
208255
if latestRevision <= p.lastAppliedRevision {
209256
continue
210257
}
211-
p.logger.Info("State change detected, triggering rebalance.")
212-
err = p.rebalanceShards(ctx)
213-
case <-ticker.Chan():
214-
p.logger.Info("Periodic reconciliation triggered, rebalancing.")
215-
err = p.rebalanceShards(ctx)
216-
}
217-
if err != nil {
218-
p.logger.Error("rebalance failed", tag.Error(err))
258+
259+
tryTriggerRebalancing("State change detected")
219260
}
220261
}
221262
}

service/sharddistributor/leader/process/processor_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,3 +1414,177 @@ func TestEmitOldestExecutorHeartbeatLag(t *testing.T) {
14141414
})
14151415
}
14161416
}
1417+
1418+
func TestRunRebalanceTriggeringLoop(t *testing.T) {
1419+
t.Run("no events from subscribe, trigger from ticker", func(t *testing.T) {
1420+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1421+
defer mocks.ctrl.Finish()
1422+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1423+
1424+
ctx, cancel := context.WithCancel(context.Background())
1425+
defer cancel()
1426+
1427+
updateChan := make(chan int64)
1428+
triggerChan := make(chan string, 1)
1429+
1430+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1431+
1432+
// Wait for ticker to be created
1433+
mocks.timeSource.BlockUntil(1)
1434+
1435+
// Advance time to trigger the ticker
1436+
mocks.timeSource.Advance(processor.cfg.Period)
1437+
1438+
// Expect trigger from periodic reconciliation
1439+
select {
1440+
case reason := <-triggerChan:
1441+
assert.Equal(t, "Periodic reconciliation triggered", reason)
1442+
case <-time.After(time.Second):
1443+
t.Fatal("expected trigger from ticker, but timed out")
1444+
}
1445+
1446+
cancel()
1447+
})
1448+
1449+
t.Run("events from subscribe before period, trigger from state change", func(t *testing.T) {
1450+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1451+
defer mocks.ctrl.Finish()
1452+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1453+
processor.lastAppliedRevision = 0
1454+
1455+
ctx, cancel := context.WithCancel(context.Background())
1456+
defer cancel()
1457+
1458+
updateChan := make(chan int64, 1)
1459+
triggerChan := make(chan string, 1)
1460+
1461+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1462+
1463+
// Wait for ticker to be created
1464+
mocks.timeSource.BlockUntil(1)
1465+
1466+
// Send a state change event before the ticker fires
1467+
updateChan <- 1
1468+
1469+
// Expect trigger from state change
1470+
select {
1471+
case reason := <-triggerChan:
1472+
assert.Equal(t, "State change detected", reason)
1473+
case <-time.After(time.Second):
1474+
t.Fatal("expected trigger from state change, but timed out")
1475+
}
1476+
1477+
cancel()
1478+
})
1479+
1480+
t.Run("triggerChan full, multiple subscribe events, loop not stuck", func(t *testing.T) {
1481+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1482+
defer mocks.ctrl.Finish()
1483+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1484+
processor.lastAppliedRevision = 0
1485+
1486+
ctx, cancel := context.WithCancel(context.Background())
1487+
defer cancel()
1488+
1489+
// Use unbuffered channel for updates to ensure they are processed one at a time
1490+
updateChan := make(chan int64)
1491+
triggerChan := make(chan string, 1)
1492+
1493+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1494+
1495+
// Wait for ticker to be created
1496+
mocks.timeSource.BlockUntil(1)
1497+
1498+
// Don't read from triggerChan yet to keep it full
1499+
// Send multiple state change events
1500+
for i := int64(0); i <= 10; i++ {
1501+
select {
1502+
case updateChan <- i:
1503+
case <-time.After(time.Second):
1504+
// Expect that the loop is not stuck
1505+
t.Fatalf("failed to send update %d, channel blocked", i)
1506+
}
1507+
}
1508+
1509+
// Expect trigger from state change
1510+
select {
1511+
case reason := <-triggerChan:
1512+
assert.Equal(t, "State change detected", reason)
1513+
case <-time.After(time.Second):
1514+
t.Fatal("expected trigger from state change, but timed out")
1515+
}
1516+
1517+
cancel()
1518+
})
1519+
1520+
t.Run("stale revision ignored", func(t *testing.T) {
1521+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1522+
defer mocks.ctrl.Finish()
1523+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1524+
processor.lastAppliedRevision = 5
1525+
1526+
ctx, cancel := context.WithCancel(context.Background())
1527+
defer cancel()
1528+
1529+
updateChan := make(chan int64, 1)
1530+
triggerChan := make(chan string, 1)
1531+
1532+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1533+
1534+
// Wait for ticker to be created
1535+
mocks.timeSource.BlockUntil(1)
1536+
1537+
// Send stale revision (less than or equal to lastAppliedRevision)
1538+
updateChan <- 3
1539+
1540+
// Should not trigger - verify by advancing ticker and getting that trigger instead
1541+
mocks.timeSource.Advance(processor.cfg.Period)
1542+
1543+
select {
1544+
case reason := <-triggerChan:
1545+
assert.Equal(t, "Periodic reconciliation triggered", reason)
1546+
case <-time.After(time.Second):
1547+
t.Fatal("expected trigger from ticker")
1548+
}
1549+
1550+
cancel()
1551+
})
1552+
1553+
t.Run("update channel closed stops loop", func(t *testing.T) {
1554+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1555+
defer mocks.ctrl.Finish()
1556+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1557+
1558+
ctx := context.Background()
1559+
1560+
updateChan := make(chan int64)
1561+
triggerChan := make(chan string, 1)
1562+
1563+
var wg sync.WaitGroup
1564+
wg.Add(1)
1565+
go func() {
1566+
defer wg.Done()
1567+
processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1568+
}()
1569+
1570+
// Wait for ticker to be created
1571+
mocks.timeSource.BlockUntil(1)
1572+
1573+
// Close update channel
1574+
close(updateChan)
1575+
1576+
// Wait for loop to exit
1577+
done := make(chan struct{})
1578+
go func() {
1579+
wg.Wait()
1580+
close(done)
1581+
}()
1582+
1583+
select {
1584+
case <-done:
1585+
// Loop exited as expected
1586+
case <-time.After(time.Second):
1587+
t.Fatal("loop did not exit after updateChan closed")
1588+
}
1589+
})
1590+
}

0 commit comments

Comments
 (0)