Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
143 commits
Select commit Hold shift + click to select a range
2de12d8
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5d95067
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
6e57536
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
595d320
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
d9ba54d
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
32d2ecd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
b624a00
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
aad7b2e
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
6360f8a
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
1536d0a
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
f316fbf
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
4524da9
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
126f725
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc53f68
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
733bbcb
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
6816b8e
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
f97e0cf
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
513e88c
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
9833525
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
0332fe5
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
d5a13d9
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
634bc02
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
812e854
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
b9813e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
dfb7448
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
36ec08f
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
38a6e81
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Nov 6, 2025
af733e6
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
a52e86f
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
abfc80e
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
df0feaf
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
8546a26
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
dde87ef
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
415e80c
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
c67d5c3
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
9ffcefb
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc769bf
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
8c22663
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
5ac3c5d
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
3973b82
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
3830d5e
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
443c0b1
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
9d159e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
18e63b7
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
e08a286
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
08eb635
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
f63664a
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
10e2ffa
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
8c6b0c8
chore: removed an old struct that appeared during rebase
Theis-Mathiassen Nov 11, 2025
158e030
feat(shard distributor): throttle shard-stat writes
AndreasHolt Nov 13, 2025
dd45ff0
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Nov 13, 2025
05e0d1d
fix(shard distributor): linter error
AndreasHolt Nov 13, 2025
e0779ec
feat(shard distributor): decouple shard stats write-throttling decisi…
AndreasHolt Nov 18, 2025
9546f24
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Nov 19, 2025
db70702
fix(shard-distributor): inverted condition in shard stats cleanup loop
AndreasHolt Nov 19, 2025
481f9c6
chore(shard-distributor): did some formatting, and use current load i…
Theis-Mathiassen Nov 19, 2025
f754dd6
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Dec 2, 2025
3366828
fix(shard distributor): decouple shard assignment from stats writes
AndreasHolt Dec 2, 2025
1c01604
feat: add new ExecutorKeyType for stats, add case to GetState and rem…
AndreasHolt Dec 3, 2025
e3294b9
feat: update etcdstore.go to support new way of storing stats
AndreasHolt Dec 4, 2025
915da0f
test: persistence of new stats
AndreasHolt Dec 4, 2025
e3bd964
test: update test, remove batching
AndreasHolt Dec 4, 2025
352df29
chore: ExecutorShardStatisticsKey is not significant
AndreasHolt Dec 4, 2025
a22908e
fix: linter
AndreasHolt Dec 4, 2025
49ed5cc
chore: delete shard key related helpers and tests
AndreasHolt Dec 4, 2025
1ddad41
chore: add logs to cleanup
AndreasHolt Dec 4, 2025
c0bc604
fix: comment
AndreasHolt Dec 5, 2025
2e81f28
chore: remove debug logs
AndreasHolt Dec 8, 2025
e36f2a1
CI rerun
AndreasHolt Dec 8, 2025
3753640
fix: remove "len(ops) == 0" check
AndreasHolt Dec 8, 2025
e11b0d9
Merge remote-tracking branch 'origin/stats-etcd-refactor' into heartb…
Theis-Mathiassen Dec 8, 2025
b88ff69
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 9, 2025
e943951
feat: changed heartbeat record statistics in new format
Theis-Mathiassen Dec 9, 2025
03a655a
fix: potential fix to smothed load not working as expected
AndreasHolt Dec 9, 2025
1c0b0fb
feat: implemented getting shard statistics, using cache
Theis-Mathiassen Dec 9, 2025
84976c3
test: added and altered tests to work with statistics from cache
Theis-Mathiassen Dec 9, 2025
c779090
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
Theis-Mathiassen Dec 9, 2025
cb27553
feat: locks and optimization
AndreasHolt Dec 10, 2025
746095a
chore: comments
AndreasHolt Dec 10, 2025
0ca3bda
chore: added comment describing a complicated function
Theis-Mathiassen Dec 10, 2025
bfd1973
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Dec 10, 2025
7975639
Merge remote-tracking branch 'origin/master' into heartbeat-shard-sta…
Theis-Mathiassen Dec 10, 2025
ece4cd4
chore: refactored a function, only calling 1 other function
Theis-Mathiassen Dec 10, 2025
7b82b9e
chore: set LastMoveTime to zero for new shards
AndreasHolt Dec 10, 2025
daf4aa4
Merge branches 'heartbeat-shard-statistics' and 'heartbeat-shard-stat…
AndreasHolt Dec 10, 2025
51e9480
fix: event type
AndreasHolt Dec 10, 2025
90cfc58
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 11, 2025
8daa183
chore: removed unused test helper functions causing build errors
Theis-Mathiassen Dec 11, 2025
c03c9b3
chore: consistent naming
AndreasHolt Dec 11, 2025
4aa7ab2
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Dec 11, 2025
8938199
chore: return error directly, reference ewma as smoothed load
AndreasHolt Dec 11, 2025
8ae5baa
chore: Removed no longer needed statistics from test, and magic number
Theis-Mathiassen Dec 11, 2025
2a7e86d
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
Theis-Mathiassen Dec 11, 2025
1558cd4
chore: moved large switch case into separate function
Theis-Mathiassen Dec 12, 2025
5a9404c
chore: refactored GetExecutorStatistics, fixed some concurrency probl…
Theis-Mathiassen Dec 12, 2025
afd2755
chore: moved statistics relevant data to separate struct
Theis-Mathiassen Dec 12, 2025
fef9cb8
chore: moved and renamed ewmasmoothload calculation
Theis-Mathiassen Dec 12, 2025
aa9553e
chore: renamed mutable variable to something more descriptive, and ma…
Theis-Mathiassen Dec 12, 2025
9e944e5
chore: returning error from applyStatistics as we now treat statistic…
Theis-Mathiassen Dec 12, 2025
718c161
chore: responding on applyShardStatistics now returning error
Theis-Mathiassen Dec 12, 2025
b38fa88
chore: continues best effort to update statistics, and return multipl…
Theis-Mathiassen Dec 12, 2025
3f0e893
chore: renamed recordShardStatistics in order to reflect what is happ…
Theis-Mathiassen Dec 12, 2025
0ae823b
chore: moved the load check to calculate smoothload function
Theis-Mathiassen Dec 12, 2025
4f649dd
chore: extract statistic update from etcdstore
Theis-Mathiassen Dec 12, 2025
eba5943
chore: added a timesource to namespaceShardToExecutor and ShardToExec…
Theis-Mathiassen Dec 12, 2025
e9461bf
chore: refactored refreshExecutorState to adhere closer to MVC archit…
Theis-Mathiassen Dec 12, 2025
2e51154
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 12, 2025
dff7f07
chore: kept the lock while cloning, and moved right before ok created
Theis-Mathiassen Dec 14, 2025
4625397
fix: changed the way we wait using timesource in tests
Theis-Mathiassen Dec 15, 2025
ecbab71
fix: changed some time.Now to use mocktime source
Theis-Mathiassen Dec 15, 2025
fb58ce6
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 15, 2025
0e783e0
fix: removed duplicate timesource
Theis-Mathiassen Dec 15, 2025
465c722
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 15, 2025
7f3b9a5
chore: moved CalculateSmoothedLoad out of executor store
Theis-Mathiassen Dec 16, 2025
83af565
chore: added alias to metadata
Theis-Mathiassen Dec 16, 2025
441bdd2
chore: refactored GetExecutorStatistics
Theis-Mathiassen Dec 16, 2025
abcb6d0
chore: added error logging statements to parse executor key
Theis-Mathiassen Dec 16, 2025
f7c9d77
chore: removed functions only used once
Theis-Mathiassen Dec 16, 2025
671661c
chore: simplified return in RecordHeartbeat
Theis-Mathiassen Dec 16, 2025
20456d6
chore: refactored calcUpdatedStatistics
Theis-Mathiassen Dec 16, 2025
92fafe5
chore: fixed duplicate error messages
Theis-Mathiassen Dec 16, 2025
38cf3d9
chore: added logging to parseExecutorData
Theis-Mathiassen Dec 16, 2025
5d3b545
chore: refactored executorData to be for a single executor
Theis-Mathiassen Dec 16, 2025
09a4a40
chore: added some spacing
Theis-Mathiassen Dec 16, 2025
260af7e
feat: added helper functions for namespaceExecutorStatistics
Theis-Mathiassen Dec 16, 2025
6db1110
chore: refactored introducing maps.clone
Theis-Mathiassen Dec 16, 2025
537dab9
chore: refactored switch in watch in namespaceshardcache
Theis-Mathiassen Dec 16, 2025
7e93d0d
chore: made reason for deleting statistics more clear in handleExecut…
Theis-Mathiassen Dec 16, 2025
457997a
chore: refactored namespaceShardsCache
Theis-Mathiassen Dec 17, 2025
c5bb576
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 17, 2025
3fdc134
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Dec 17, 2025
d3e660e
fix: tests failing
AndreasHolt Dec 18, 2025
4389eab
fix: same fix but for shardcache
AndreasHolt Dec 18, 2025
c40dac8
Merge remote-tracking branch 'origin/master' into heartbeat-shard-sta…
AndreasHolt Dec 18, 2025
c6e5cfe
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Jan 12, 2026
f4fab6b
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Feb 2, 2026
a874d8d
fix: test match functionality, and watch error not checked before pro…
Theis-Mathiassen Feb 3, 2026
f83952e
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Feb 6, 2026
8142dab
fix: initial write of statistics would fail
Theis-Mathiassen Feb 6, 2026
fc8f9fb
fix: performance holding locks for less time and bug where executorSt…
Theis-Mathiassen Feb 6, 2026
4063c59
fix: properly return executor not found without wrapper to satisfy test
Theis-Mathiassen Feb 6, 2026
3847095
fix: correct handling of executor not found, and more useful error me…
Theis-Mathiassen Feb 6, 2026
3550766
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Feb 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func NewProcessorFactory(
cfg config.ShardDistribution,
sdConfig *config.Config,
) Factory {
if cfg.Process.Period == 0 {
if cfg.Process.Period <= 0 {
cfg.Process.Period = _defaultPeriod
}
if cfg.Process.HeartbeatTTL == 0 {
if cfg.Process.HeartbeatTTL <= 0 {
cfg.Process.HeartbeatTTL = _defaultHeartbeatTTL
}
if cfg.Process.Timeout == 0 {
if cfg.Process.Timeout <= 0 {
cfg.Process.Timeout = _defaultTimeout
}
if cfg.Process.RebalanceCooldown == 0 {
Expand Down
9 changes: 5 additions & 4 deletions service/sharddistributor/leader/process/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ func TestCleanupStaleExecutors(t *testing.T) {

heartbeats := map[string]store.HeartbeatState{
"exec-active": {LastHeartbeat: now},
"exec-stale": {LastHeartbeat: now.Add(-2 * time.Second)},
"exec-stale": {LastHeartbeat: now.Add(-_defaultHeartbeatTTL).Add(-1 * time.Second)},
}

namespaceState := &store.NamespaceState{Executors: heartbeats}
Expand All @@ -321,7 +321,7 @@ func TestCleanupStaleShardStats(t *testing.T) {

heartbeats := map[string]store.HeartbeatState{
"exec-active": {LastHeartbeat: now, Status: types.ExecutorStatusACTIVE},
"exec-stale": {LastHeartbeat: now.Add(-2 * time.Second)},
"exec-stale": {LastHeartbeat: now.Add(-_defaultHeartbeatTTL).Add(-1 * time.Second)},
}

assignments := map[string]store.AssignedState{
Expand All @@ -338,10 +338,11 @@ func TestCleanupStaleShardStats(t *testing.T) {
},
}

staleCutoff := now.Add(-_defaultHeartbeatTTL).Add(-1 * time.Second)
shardStats := map[string]store.ShardStatistics{
"shard-1": {SmoothedLoad: 1.0, LastUpdateTime: now, LastMoveTime: now},
"shard-2": {SmoothedLoad: 2.0, LastUpdateTime: now, LastMoveTime: now},
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: now.Add(-2 * time.Second), LastMoveTime: now.Add(-2 * time.Second)},
"shard-3": {SmoothedLoad: 3.0, LastUpdateTime: staleCutoff, LastMoveTime: staleCutoff},
}

namespaceState := &store.NamespaceState{
Expand All @@ -361,7 +362,7 @@ func TestCleanupStaleShardStats(t *testing.T) {

now := mocks.timeSource.Now()

expiredExecutor := now.Add(-2 * time.Second)
expiredExecutor := now.Add(-_defaultHeartbeatTTL).Add(-1 * time.Second)
namespaceState := &store.NamespaceState{
Executors: map[string]store.HeartbeatState{
"exec-stale": {LastHeartbeat: expiredExecutor},
Expand Down
22 changes: 22 additions & 0 deletions service/sharddistributor/statistics/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package statistics

import (
"math"
"time"
)

func CalculateSmoothedLoad(prev, current float64, lastUpdate, now time.Time) float64 {
if math.IsNaN(current) || math.IsInf(current, 0) {
current = 0
}
const tau = 30 * time.Second // smaller = more responsive, larger = smoother
if lastUpdate.IsZero() || tau <= 0 {
return current
}
if now.Before(lastUpdate) {
return current
}
dt := now.Sub(lastUpdate)
alpha := 1 - math.Exp(-dt.Seconds()/tau.Seconds())
return (1-alpha)*prev + alpha*current
}
225 changes: 126 additions & 99 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"encoding/json"
"errors"
"fmt"
"maps"
"time"

clientv3 "go.etcd.io/etcd/client/v3"
"go.uber.org/fx"
Expand All @@ -16,6 +18,7 @@ import (
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/statistics"
"github.com/uber/cadence/service/sharddistributor/store"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdclient"
"github.com/uber/cadence/service/sharddistributor/store/etcd/etcdkeys"
Expand Down Expand Up @@ -70,7 +73,6 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
if err != nil {
return nil, fmt.Errorf("create record writer: %w", err)
}

store := &executorStoreImpl{
client: p.Client,
prefix: p.ETCDConfig.Prefix,
Expand Down Expand Up @@ -139,7 +141,65 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
if err != nil {
return fmt.Errorf("record heartbeat: %w", err)
}
return nil

statsUpdates, err := s.calcUpdatedStatistics(ctx, namespace, executorID, request.ReportedShards)
if err != nil {
return err
}

return s.applyShardStatisticsUpdates(ctx, namespace, statsUpdates)
}

func (s *executorStoreImpl) calcUpdatedStatistics(ctx context.Context, namespace, executorID string, reported map[string]*types.ShardStatusReport) ([]shardStatisticsUpdate, error) {
if len(reported) == 0 {
return nil, nil
}

var statsUpdate shardStatisticsUpdate
statsUpdate.executorID = executorID
statsUpdate.stats = make(map[string]etcdtypes.ShardStatistics)

oldStats, err := s.shardCache.GetExecutorStatistics(ctx, namespace, executorID)
if err != nil {
if errors.Is(err, store.ErrExecutorNotFound) {
oldStats = make(map[string]etcdtypes.ShardStatistics)
} else {
return nil, err
}
}

now := s.timeSource.Now().UTC()
for shardID, report := range reported {
if report == nil {
s.logger.Warn("empty report; skipping smoothed load update",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
)
continue
}
statsUpdate.stats[shardID] = UpdateShardStatistic(shardID, report.ShardLoad, now, oldStats)
}

return []shardStatisticsUpdate{statsUpdate}, nil
}

func UpdateShardStatistic(shardID string, shardLoad float64, now time.Time, oldStats map[string]etcdtypes.ShardStatistics) etcdtypes.ShardStatistics {
var stats etcdtypes.ShardStatistics

prevStats, ok := oldStats[shardID]
if ok {
stats.LastMoveTime = prevStats.LastMoveTime
}

prevSmoothed := prevStats.SmoothedLoad
prevUpdate := prevStats.LastUpdateTime.ToTime()
newSmoothed := statistics.CalculateSmoothedLoad(prevSmoothed, shardLoad, prevUpdate, now)

stats.SmoothedLoad = newSmoothed
stats.LastUpdateTime = etcdtypes.Time(now)

return stats
}

// GetHeartbeat retrieves the last known heartbeat state for a single executor.
Expand Down Expand Up @@ -791,57 +851,85 @@ func (s *executorStoreImpl) GetExecutor(ctx context.Context, namespace string, e
return s.shardCache.GetExecutor(ctx, namespace, executorID)
}

// This function calculates the necessary changes to shard statistics based on a new shard assignment plan.
// It determines which shards have moved between executors, which are new. It then prepares a list of
// update operations that will remove a moved shard's stats from its old owner and add them to its new owner, recording the time of the move.
func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, namespace string, newAssignments map[string]store.AssignedState) ([]shardStatisticsUpdate, error) {
executorStatsCache := make(map[string]map[string]etcdtypes.ShardStatistics)
changedExecutors := make(map[string]struct{})
// This map will store the *new, final* state of statistics for any executor whose stats have changed.
pendingStatChanges := make(map[string]map[string]etcdtypes.ShardStatistics)

for executorID, state := range newAssignments {
for shardID := range state.AssignedShards {
now := s.timeSource.Now().UTC()
existingShardFound := true

oldOwner, err := s.shardCache.GetShardOwner(ctx, namespace, shardID)
if err != nil && !errors.Is(err, store.ErrShardNotFound) {
return nil, fmt.Errorf("lookup cached shard owner: %w", err)
}

if err == nil && oldOwner.ExecutorID == executorID {
continue
if err != nil {
if errors.Is(err, store.ErrShardNotFound) {
existingShardFound = false
} else {
return nil, fmt.Errorf("lookup cached shard owner: %w", err)
}
}

var stats etcdtypes.ShardStatistics
var shardStatToMove etcdtypes.ShardStatistics

if err == nil {
oldStats, err := s.getOrLoadExecutorShardStatistics(ctx, namespace, oldOwner.ExecutorID, executorStatsCache)
if err != nil {
return nil, err
if existingShardFound {
if oldOwner.ExecutorID == executorID {
continue
}

if existing, ok := oldStats[shardID]; ok {
stats = existing
oldOwnerStats, ok := pendingStatChanges[oldOwner.ExecutorID]
if !ok { // Not yet touched in this loop, get from main cache.
oldOwnerStats, err = s.shardCache.GetExecutorStatistics(ctx, namespace, oldOwner.ExecutorID)
if err != nil {
if errors.Is(err, store.ErrExecutorNotFound) {
oldOwnerStats = make(map[string]etcdtypes.ShardStatistics)
} else {
return nil, err
}
}
}

delete(oldStats, shardID)
changedExecutors[oldOwner.ExecutorID] = struct{}{}
} else {
stats.SmoothedLoad = 0
stats.LastUpdateTime = etcdtypes.Time(now)
clonedOldOwnerStats := maps.Clone(oldOwnerStats)

if existing, ok := clonedOldOwnerStats[shardID]; ok {
shardStatToMove = existing
shardStatToMove.LastMoveTime = etcdtypes.Time(now)
delete(clonedOldOwnerStats, shardID)
}
pendingStatChanges[oldOwner.ExecutorID] = clonedOldOwnerStats
}

stats.LastMoveTime = etcdtypes.Time(now)
// If the shard is new or had no previous stats, initialize them.
if shardStatToMove.LastUpdateTime == etcdtypes.Time(time.Time{}) {
shardStatToMove.SmoothedLoad = 0
shardStatToMove.LastUpdateTime = etcdtypes.Time(now)
// Leave LastMoveTime for newly added shards as zero, to not block it from being moved once we have load measurements
shardStatToMove.LastMoveTime = etcdtypes.Time(time.Time{})
}

newStats, err := s.getOrLoadExecutorShardStatistics(ctx, namespace, executorID, executorStatsCache)
if err != nil {
return nil, err
newOwnerStats, ok := pendingStatChanges[executorID]
if !ok {
newOwnerStats, err = s.shardCache.GetExecutorStatistics(ctx, namespace, executorID)
if err != nil {
if errors.Is(err, store.ErrExecutorNotFound) {
newOwnerStats = make(map[string]etcdtypes.ShardStatistics)
} else {
return nil, err
}
}
}

newStats[shardID] = stats
changedExecutors[executorID] = struct{}{}
clonedNewOwnerStats := maps.Clone(newOwnerStats)

clonedNewOwnerStats[shardID] = shardStatToMove
pendingStatChanges[executorID] = clonedNewOwnerStats
}
}

updates := make([]shardStatisticsUpdate, 0, len(changedExecutors))
for executorID := range changedExecutors {
stats := executorStatsCache[executorID]
updates := make([]shardStatisticsUpdate, 0, len(pendingStatChanges))
for executorID, stats := range pendingStatChanges {
updates = append(updates, shardStatisticsUpdate{
executorID: executorID,
stats: stats,
Expand All @@ -852,94 +940,33 @@ func (s *executorStoreImpl) prepareShardStatisticsUpdates(ctx context.Context, n
}

// applyShardStatisticsUpdates updates shard statistics.
// Is intentionally made tolerant of failures since the data is telemetry only.
func (s *executorStoreImpl) applyShardStatisticsUpdates(ctx context.Context, namespace string, updates []shardStatisticsUpdate) {
func (s *executorStoreImpl) applyShardStatisticsUpdates(ctx context.Context, namespace string, updates []shardStatisticsUpdate) error {
var multiError error
for _, update := range updates {
statsKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, update.executorID, etcdkeys.ExecutorShardStatisticsKey)

if len(update.stats) == 0 {
if _, err := s.client.Delete(ctx, statsKey); err != nil {
s.logger.Warn(
"failed to delete executor shard statistics",
tag.ShardNamespace(namespace),
tag.ShardExecutor(update.executorID),
tag.Error(err),
)
multiError = errors.Join(multiError, fmt.Errorf("failed to delete executor shard statistics: %w", err))
}
continue
}

payload, err := json.Marshal(update.stats)
if err != nil {
s.logger.Warn(
"failed to marshal shard statistics after assignment",
tag.ShardNamespace(namespace),
tag.ShardExecutor(update.executorID),
tag.Error(err),
)
multiError = errors.Join(multiError, fmt.Errorf("failed to marshal executor shard statistics: %w", err))
continue
}

compressedPayload, err := s.recordWriter.Write(payload)
if err != nil {
s.logger.Warn(
"failed to compress shard statistics after assignment",
tag.ShardNamespace(namespace),
tag.ShardExecutor(update.executorID),
tag.Error(err),
)
multiError = errors.Join(multiError, fmt.Errorf("failed to compress executor shard statistics: %w", err))
continue
}

if _, err := s.client.Put(ctx, statsKey, string(compressedPayload)); err != nil {
s.logger.Warn(
"failed to update shard statistics",
tag.ShardNamespace(namespace),
tag.ShardExecutor(update.executorID),
tag.Error(err),
)
multiError = errors.Join(multiError, fmt.Errorf("failed to put executor shard statistics: %w", err))
}
}
}

// getExecutorShardStatistics returns the shard statistics for the given executor from etcd.
func (s *executorStoreImpl) getExecutorShardStatistics(ctx context.Context, namespace, executorID string) (map[string]etcdtypes.ShardStatistics, error) {
statsKey := etcdkeys.BuildExecutorKey(s.prefix, namespace, executorID, etcdkeys.ExecutorShardStatisticsKey)
resp, err := s.client.Get(ctx, statsKey)
if err != nil {
return nil, fmt.Errorf("get executor shard statistics: %w", err)
}

stats := make(map[string]etcdtypes.ShardStatistics)
if len(resp.Kvs) == 0 {
return stats, nil
}

if err := common.DecompressAndUnmarshal(resp.Kvs[0].Value, &stats); err != nil {
return nil, fmt.Errorf("parse executor shard statistics: %w", err)
}

return stats, nil
}

// getOrLoadExecutorShardStatistics returns the shard statistics for the given executor.
// If the statistics are not cached, it will fetch them from etcd.
func (s *executorStoreImpl) getOrLoadExecutorShardStatistics(
ctx context.Context,
namespace, executorID string,
cache map[string]map[string]etcdtypes.ShardStatistics,
) (map[string]etcdtypes.ShardStatistics, error) {
// Load from cache if available.
if stats, ok := cache[executorID]; ok {
return stats, nil
}

// Otherwise, load from etcd.
stats, err := s.getExecutorShardStatistics(ctx, namespace, executorID)
if err != nil {
return nil, err
}

cache[executorID] = stats
return stats, nil
return multiError
}
Loading