Skip to content

Commit 128dd79

Browse files
Merge branch 'master' into diana/histogram-task-ack
2 parents e7dd6b3 + 4e6f35e commit 128dd79

File tree

7 files changed

+436
-37
lines changed

7 files changed

+436
-37
lines changed

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2737,6 +2737,7 @@ const (
27372737
ReplicationTasksReturnedDiff
27382738
ExponentialReplicationTasksReturnedDiff
27392739
ReplicationTasksAppliedLatency
2740+
ExponentialReplicationTasksAppliedLatency
27402741
ReplicationTasksBatchSize
27412742
ReplicationDynamicTaskBatchSizerDecision
27422743
ReplicationDLQFailed
@@ -3557,6 +3558,7 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
35573558
ReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff", metricType: Timer},
35583559
ExponentialReplicationTasksReturnedDiff: {metricName: "replication_tasks_returned_diff_counts", metricType: Histogram, intExponentialBuckets: Mid1To16k},
35593560
ReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency", metricType: Timer},
3561+
ExponentialReplicationTasksAppliedLatency: {metricName: "replication_tasks_applied_latency_ns", metricType: Histogram, exponentialBuckets: Low1ms100s},
35603562
ReplicationTasksBatchSize: {metricName: "replication_tasks_batch_size", metricType: Gauge},
35613563
ReplicationDynamicTaskBatchSizerDecision: {metricName: "replication_dynamic_task_batch_sizer_decision", metricType: Counter},
35623564
ReplicationDLQFailed: {metricName: "replication_dlq_enqueue_failed", metricType: Counter},

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ require (
7676
github.com/ncruces/go-sqlite3 v0.22.0
7777
github.com/opensearch-project/opensearch-go/v4 v4.1.0
7878
github.com/robfig/cron/v3 v3.0.1
79+
go.etcd.io/etcd/api/v3 v3.5.5
7980
go.uber.org/mock v0.5.0
8081
)
8182

@@ -89,7 +90,6 @@ require (
8990
github.com/tetratelabs/wazero v1.8.2 // indirect
9091
github.com/xrash/smetrics v0.0.0-20240521201337-686a1a2994c1 // indirect
9192
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
92-
go.etcd.io/etcd/api/v3 v3.5.5 // indirect
9393
go.etcd.io/etcd/client/pkg/v3 v3.5.5 // indirect
9494
google.golang.org/genproto v0.0.0-20231016165738-49dd2c1f3d0b // indirect
9595
google.golang.org/genproto/googleapis/api v0.0.0-20231012201019-e917dd12ba7a // indirect

service/history/replication/task_processor.go

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -274,12 +274,16 @@ func (p *taskProcessorImpl) cleanupAckedReplicationTasks() error {
274274
}
275275
p.logger.Debug("Cleaning up replication task queue.", tag.ReadLevel(minAckLevel))
276276
p.metricsClient.Scope(metrics.ReplicationTaskCleanupScope).IncCounter(metrics.ReplicationTaskCleanupCount)
277-
p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope,
277+
maxReadLevel := p.shard.UpdateIfNeededAndGetQueueMaxReadLevel(
278+
persistence.HistoryTaskCategoryReplication,
279+
p.currentCluster,
280+
).GetTaskID()
281+
lag := time.Duration(maxReadLevel - minAckLevel)
282+
scope := p.metricsClient.Scope(metrics.ReplicationTaskFetcherScope,
278283
metrics.TargetClusterTag(p.currentCluster),
279-
).RecordTimer(
280-
metrics.ReplicationTasksLag,
281-
time.Duration(p.shard.UpdateIfNeededAndGetQueueMaxReadLevel(persistence.HistoryTaskCategoryReplication, p.currentCluster).GetTaskID()-minAckLevel),
282284
)
285+
scope.RecordTimer(metrics.ReplicationTasksLag, lag)
286+
scope.ExponentialHistogram(metrics.ExponentialReplicationTasksLag, lag)
283287
for {
284288
pageSize := p.config.ReplicatorTaskDeleteBatchSize()
285289
resp, err := p.shard.GetExecutionManager().RangeCompleteHistoryTask(
@@ -338,7 +342,9 @@ func (p *taskProcessorImpl) processResponse(response *types.ReplicationMessages)
338342
backoffDuration := p.noTaskRetrier.NextBackOff()
339343
time.Sleep(backoffDuration)
340344
} else {
341-
scope.RecordTimer(metrics.ReplicationTasksAppliedLatency, time.Since(batchRequestStartTime))
345+
appliedLatency := time.Since(batchRequestStartTime)
346+
scope.RecordTimer(metrics.ReplicationTasksAppliedLatency, appliedLatency)
347+
scope.ExponentialHistogram(metrics.ExponentialReplicationTasksAppliedLatency, appliedLatency)
342348
}
343349

344350
if p.isShuttingDown() {

service/sharddistributor/leader/process/processor.go

Lines changed: 54 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -180,18 +180,16 @@ func (p *namespaceProcessor) runProcess(ctx context.Context) {
180180

181181
// runRebalancingLoop handles shard assignment and redistribution.
182182
func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
183-
ticker := p.timeSource.NewTicker(p.cfg.Period)
184-
defer ticker.Stop()
185183

186184
// Perform an initial rebalance on startup.
187185
err := p.rebalanceShards(ctx)
188186
if err != nil {
189187
p.logger.Error("initial rebalance failed", tag.Error(err))
190188
}
191189

192-
updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
190+
updateChan, err := p.runRebalanceTriggeringLoop(ctx)
193191
if err != nil {
194-
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
192+
p.logger.Error("failed to start rebalance triggering loop", tag.Error(err))
195193
return
196194
}
197195

@@ -200,22 +198,65 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
200198
case <-ctx.Done():
201199
p.logger.Info("Rebalancing loop cancelled.")
202200
return
201+
202+
case update := <-updateChan:
203+
p.logger.Info("Rebalancing triggered", tag.Dynamic("reason", update))
204+
if err := p.rebalanceShards(ctx); err != nil {
205+
p.logger.Error("rebalance failed", tag.Error(err))
206+
}
207+
}
208+
}
209+
}
210+
211+
// runRebalanceTriggeringLoop monitors for state changes and periodic triggers to initiate rebalancing.
212+
// it doesn't block Subscribe calls to avoid a growing backlog of updates.
213+
func (p *namespaceProcessor) runRebalanceTriggeringLoop(ctx context.Context) (<-chan string, error) {
214+
// Buffered channel to allow one pending rebalance trigger.
215+
triggerChan := make(chan string, 1)
216+
217+
updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
218+
if err != nil {
219+
p.logger.Error("Failed to subscribe to state changes, stopping rebalancing loop.", tag.Error(err))
220+
return nil, err
221+
}
222+
223+
go p.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
224+
return triggerChan, nil
225+
}
226+
227+
func (p *namespaceProcessor) rebalanceTriggeringLoop(ctx context.Context, updateChan <-chan int64, triggerChan chan<- string) {
228+
defer close(triggerChan)
229+
230+
ticker := p.timeSource.NewTicker(p.cfg.Period)
231+
defer ticker.Stop()
232+
233+
tryTriggerRebalancing := func(reason string) {
234+
select {
235+
case triggerChan <- reason:
236+
default:
237+
p.logger.Info("Rebalance already pending, skipping trigger attempt", tag.Dynamic("reason", reason))
238+
}
239+
}
240+
241+
for {
242+
select {
243+
case <-ctx.Done():
244+
p.logger.Info("Rebalance triggering loop cancelled")
245+
return
246+
247+
case <-ticker.Chan():
248+
tryTriggerRebalancing("Periodic reconciliation triggered")
249+
203250
case latestRevision, ok := <-updateChan:
204251
if !ok {
205-
p.logger.Info("Update channel closed, stopping rebalancing loop.")
252+
p.logger.Info("Update channel closed, stopping rebalance triggering loop")
206253
return
207254
}
208255
if latestRevision <= p.lastAppliedRevision {
209256
continue
210257
}
211-
p.logger.Info("State change detected, triggering rebalance.")
212-
err = p.rebalanceShards(ctx)
213-
case <-ticker.Chan():
214-
p.logger.Info("Periodic reconciliation triggered, rebalancing.")
215-
err = p.rebalanceShards(ctx)
216-
}
217-
if err != nil {
218-
p.logger.Error("rebalance failed", tag.Error(err))
258+
259+
tryTriggerRebalancing("State change detected")
219260
}
220261
}
221262
}

service/sharddistributor/leader/process/processor_test.go

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1414,3 +1414,177 @@ func TestEmitOldestExecutorHeartbeatLag(t *testing.T) {
14141414
})
14151415
}
14161416
}
1417+
1418+
func TestRunRebalanceTriggeringLoop(t *testing.T) {
1419+
t.Run("no events from subscribe, trigger from ticker", func(t *testing.T) {
1420+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1421+
defer mocks.ctrl.Finish()
1422+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1423+
1424+
ctx, cancel := context.WithCancel(context.Background())
1425+
defer cancel()
1426+
1427+
updateChan := make(chan int64)
1428+
triggerChan := make(chan string, 1)
1429+
1430+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1431+
1432+
// Wait for ticker to be created
1433+
mocks.timeSource.BlockUntil(1)
1434+
1435+
// Advance time to trigger the ticker
1436+
mocks.timeSource.Advance(processor.cfg.Period)
1437+
1438+
// Expect trigger from periodic reconciliation
1439+
select {
1440+
case reason := <-triggerChan:
1441+
assert.Equal(t, "Periodic reconciliation triggered", reason)
1442+
case <-time.After(time.Second):
1443+
t.Fatal("expected trigger from ticker, but timed out")
1444+
}
1445+
1446+
cancel()
1447+
})
1448+
1449+
t.Run("events from subscribe before period, trigger from state change", func(t *testing.T) {
1450+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1451+
defer mocks.ctrl.Finish()
1452+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1453+
processor.lastAppliedRevision = 0
1454+
1455+
ctx, cancel := context.WithCancel(context.Background())
1456+
defer cancel()
1457+
1458+
updateChan := make(chan int64, 1)
1459+
triggerChan := make(chan string, 1)
1460+
1461+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1462+
1463+
// Wait for ticker to be created
1464+
mocks.timeSource.BlockUntil(1)
1465+
1466+
// Send a state change event before the ticker fires
1467+
updateChan <- 1
1468+
1469+
// Expect trigger from state change
1470+
select {
1471+
case reason := <-triggerChan:
1472+
assert.Equal(t, "State change detected", reason)
1473+
case <-time.After(time.Second):
1474+
t.Fatal("expected trigger from state change, but timed out")
1475+
}
1476+
1477+
cancel()
1478+
})
1479+
1480+
t.Run("triggerChan full, multiple subscribe events, loop not stuck", func(t *testing.T) {
1481+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1482+
defer mocks.ctrl.Finish()
1483+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1484+
processor.lastAppliedRevision = 0
1485+
1486+
ctx, cancel := context.WithCancel(context.Background())
1487+
defer cancel()
1488+
1489+
// Use unbuffered channel for updates to ensure they are processed one at a time
1490+
updateChan := make(chan int64)
1491+
triggerChan := make(chan string, 1)
1492+
1493+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1494+
1495+
// Wait for ticker to be created
1496+
mocks.timeSource.BlockUntil(1)
1497+
1498+
// Don't read from triggerChan yet to keep it full
1499+
// Send multiple state change events
1500+
for i := int64(0); i <= 10; i++ {
1501+
select {
1502+
case updateChan <- i:
1503+
case <-time.After(time.Second):
1504+
// Expect that the loop is not stuck
1505+
t.Fatalf("failed to send update %d, channel blocked", i)
1506+
}
1507+
}
1508+
1509+
// Expect trigger from state change
1510+
select {
1511+
case reason := <-triggerChan:
1512+
assert.Equal(t, "State change detected", reason)
1513+
case <-time.After(time.Second):
1514+
t.Fatal("expected trigger from state change, but timed out")
1515+
}
1516+
1517+
cancel()
1518+
})
1519+
1520+
t.Run("stale revision ignored", func(t *testing.T) {
1521+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1522+
defer mocks.ctrl.Finish()
1523+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1524+
processor.lastAppliedRevision = 5
1525+
1526+
ctx, cancel := context.WithCancel(context.Background())
1527+
defer cancel()
1528+
1529+
updateChan := make(chan int64, 1)
1530+
triggerChan := make(chan string, 1)
1531+
1532+
go processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1533+
1534+
// Wait for ticker to be created
1535+
mocks.timeSource.BlockUntil(1)
1536+
1537+
// Send stale revision (less than or equal to lastAppliedRevision)
1538+
updateChan <- 3
1539+
1540+
// Should not trigger - verify by advancing ticker and getting that trigger instead
1541+
mocks.timeSource.Advance(processor.cfg.Period)
1542+
1543+
select {
1544+
case reason := <-triggerChan:
1545+
assert.Equal(t, "Periodic reconciliation triggered", reason)
1546+
case <-time.After(time.Second):
1547+
t.Fatal("expected trigger from ticker")
1548+
}
1549+
1550+
cancel()
1551+
})
1552+
1553+
t.Run("update channel closed stops loop", func(t *testing.T) {
1554+
mocks := setupProcessorTest(t, config.NamespaceTypeFixed)
1555+
defer mocks.ctrl.Finish()
1556+
processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor)
1557+
1558+
ctx := context.Background()
1559+
1560+
updateChan := make(chan int64)
1561+
triggerChan := make(chan string, 1)
1562+
1563+
var wg sync.WaitGroup
1564+
wg.Add(1)
1565+
go func() {
1566+
defer wg.Done()
1567+
processor.rebalanceTriggeringLoop(ctx, updateChan, triggerChan)
1568+
}()
1569+
1570+
// Wait for ticker to be created
1571+
mocks.timeSource.BlockUntil(1)
1572+
1573+
// Close update channel
1574+
close(updateChan)
1575+
1576+
// Wait for loop to exit
1577+
done := make(chan struct{})
1578+
go func() {
1579+
wg.Wait()
1580+
close(done)
1581+
}()
1582+
1583+
select {
1584+
case <-done:
1585+
// Loop exited as expected
1586+
case <-time.After(time.Second):
1587+
t.Fatal("loop did not exit after updateChan closed")
1588+
}
1589+
})
1590+
}

0 commit comments

Comments
 (0)