Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
56 commits
Select commit Hold shift + click to select a range
2de12d8
feat(shard distributor): add shard key helpers and metrics state
AndreasHolt Oct 19, 2025
5d95067
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
6e57536
fix(shard distributor): update LastMoveTime in the case where a shard…
AndreasHolt Oct 19, 2025
595d320
test(shard distributor): add tests for shard metrics
AndreasHolt Oct 19, 2025
d9ba54d
fix(shard distributor): modify comment
AndreasHolt Oct 19, 2025
32d2ecd
fix(shard distributor): add atomic check to prevent metrics race
AndreasHolt Oct 19, 2025
b624a00
fix(shard distributor): apply shard metric updates in a second phase …
AndreasHolt Oct 19, 2025
aad7b2e
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
6360f8a
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
1536d0a
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
f316fbf
test(shard distributor): BuildShardPrefix, BuildShardKey, ParseShardKey
AndreasHolt Oct 22, 2025
4524da9
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
126f725
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc53f68
test(shard distributor): small changes to shard key tests s.t. they l…
AndreasHolt Oct 25, 2025
733bbcb
fix(shard distributor): no longer check for key type ShardStatisticsK…
AndreasHolt Oct 25, 2025
6816b8e
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
f97e0cf
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
513e88c
feat(shard distributor): clean up the shard statistics
AndreasHolt Oct 29, 2025
9833525
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
0332fe5
fix(shard distributor): add mapping (new metric)
AndreasHolt Oct 29, 2025
d5a13d9
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
634bc02
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
812e854
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
b9813e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
dfb7448
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
36ec08f
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
38a6e81
Merge branch 'master' into heartbeat-shard-statistics
Theis-Mathiassen Nov 6, 2025
af733e6
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
a52e86f
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
abfc80e
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
df0feaf
feat(shard distributor): persist shard metrics in etcd store
AndreasHolt Oct 19, 2025
8546a26
feat(shard distributor): move shard metric updates out of AssignShard…
AndreasHolt Oct 19, 2025
dde87ef
fix(shard distributor): keep NamespaceState revisions tied to assignm…
AndreasHolt Oct 20, 2025
415e80c
refactor(shard distributor): use shard cache and clock for preparing …
AndreasHolt Oct 22, 2025
c67d5c3
feat(shard distributor): simplify shard metrics updates
AndreasHolt Oct 23, 2025
9ffcefb
refactor(shard distributor): ShardMetrics renamed to ShardStatistics.…
AndreasHolt Oct 24, 2025
cc769bf
refactor(shard distributor): found a place where I forgot to rename t…
AndreasHolt Oct 27, 2025
8c22663
fix(shard distributor): move non-exported helpers to end of file to f…
AndreasHolt Oct 27, 2025
5ac3c5d
test(shard distributor): add test case for when shard stats are deleted
AndreasHolt Oct 29, 2025
3973b82
feat(shard distributor): retain shard stats while shards are within h…
AndreasHolt Oct 30, 2025
3830d5e
feat: function to update shard statistics from heartbeat (currently n…
AndreasHolt Oct 27, 2025
443c0b1
test(shard distributor): add tests to verify statistics are updated a…
AndreasHolt Oct 27, 2025
9d159e7
feat(shard distributor): calculate smoothed load (ewma) using the Sha…
AndreasHolt Oct 27, 2025
18e63b7
fix(shard distributor): log invalid shard load
AndreasHolt Oct 27, 2025
e08a286
chore: added logger warning and simplified ewma calculation
Theis-Mathiassen Nov 2, 2025
08eb635
fix: remove duplicate test introduced in merge
AndreasHolt Nov 6, 2025
f63664a
chore: consistent error checking, and rename function
Theis-Mathiassen Nov 11, 2025
10e2ffa
chore: added decompress to unmarshal
Theis-Mathiassen Nov 11, 2025
8c6b0c8
chore: removed an old struct that appeared during rebase
Theis-Mathiassen Nov 11, 2025
158e030
feat(shard distributor): throttle shard-stat writes
AndreasHolt Nov 13, 2025
dd45ff0
Merge branch 'heartbeat-shard-statistics' of github.com:AndreasHolt/c…
AndreasHolt Nov 13, 2025
05e0d1d
fix(shard distributor): linter error
AndreasHolt Nov 13, 2025
e0779ec
feat(shard distributor): decouple shard stats write-throttling decisi…
AndreasHolt Nov 18, 2025
9546f24
Merge branch 'master' into heartbeat-shard-statistics
AndreasHolt Nov 19, 2025
db70702
fix(shard-distributor): inverted condition in shard stats cleanup loop
AndreasHolt Nov 19, 2025
481f9c6
chore(shard-distributor): did some formatting, and use current load i…
Theis-Mathiassen Nov 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math"
"strconv"
"time"

Expand Down Expand Up @@ -153,9 +154,108 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
if err != nil {
return fmt.Errorf("record heartbeat: %w", err)
}

s.recordShardStatistics(ctx, namespace, executorID, request.ReportedShards)

return nil
}

func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace, executorID string, reported map[string]*types.ShardStatusReport) {
if len(reported) == 0 {
return
}

now := s.timeSource.Now().Unix()

for shardID, report := range reported {
if report == nil {
s.logger.Warn("empty report; skipping EWMA update",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
)
continue
}

load := report.ShardLoad
if math.IsNaN(load) || math.IsInf(load, 0) {
s.logger.Warn(
"invalid shard load reported; skipping EWMA update",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
)
continue
}

shardStatsKey, err := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey)
if err != nil {
s.logger.Warn(
"failed to build shard statistics key from heartbeat",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
tag.Error(err),
)
continue
}

statsResp, err := s.client.Get(ctx, shardStatsKey)
if err != nil {
s.logger.Warn(
"failed to read shard statistics for heartbeat update",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
tag.Error(err),
)
continue
}

var stats store.ShardStatistics
if len(statsResp.Kvs) > 0 {
err := common.DecompressAndUnmarshal(statsResp.Kvs[0].Value, &stats)
if err != nil {
s.logger.Warn(
"failed to unmarshal shard statistics for heartbeat update",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
tag.Error(err),
)
continue
}
}

// Update smoothed load via EWMA.
stats.SmoothedLoad = ewmaSmoothedLoad(stats.SmoothedLoad, load, stats.LastUpdateTime, now)
stats.LastUpdateTime = now

payload, err := json.Marshal(stats)
if err != nil {
s.logger.Warn(
"failed to marshal shard statistics after heartbeat",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
tag.Error(err),
)
continue
}

_, err = s.client.Put(ctx, shardStatsKey, string(payload))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am worried that we are generating too much writes load to etcd,do you think we can avoid writing if the the load is stable?

Copy link
Author

@Theis-Mathiassen Theis-Mathiassen Nov 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that is possible, so as an example if load from heartbeat is within +/- 5% of the currently stored value, we do not update it?

Some problems with this approach might be:

Because of how ewmaSmoothedLoad calculates the new load, if there have been a longer time since last load change, the new load will have greater impact.

So I fear if we have a very stable shard load with some spikes in the load, the smoothing effect is lost, excuse my drawing, but i hope it illustrates the idea:
image

It might still be possible, we will probably just have to rethink the update function, how we can avoid this.
We will look into how to solve this, and comment again when we have something.

Copy link
Contributor

@AndreasHolt AndreasHolt Nov 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have implemented something that can help throttle the writes in 158e030.
We now only write

  1. If load has fluctuated enough
  2. If enough time has passed since last write (we don't want to cause a stats cleanup just because load hasn't fluctuated)

Edit:
e0779ec decouples the stats cleanup, which before this change was coupled to heartbeat TTL when determining whether a shard stat is stale.

if err != nil {
s.logger.Warn(
"failed to persist shard statistics from heartbeat",
tag.ShardNamespace(namespace),
tag.ShardExecutor(executorID),
tag.ShardKey(shardID),
tag.Error(err),
)
}
}
}

// GetHeartbeat retrieves the last known heartbeat state for a single executor.
func (s *executorStoreImpl) GetHeartbeat(ctx context.Context, namespace string, executorID string) (*store.HeartbeatState, *store.AssignedState, error) {
// The prefix for all keys related to a single executor.
Expand Down Expand Up @@ -717,3 +817,13 @@ func (s *executorStoreImpl) applyShardStatisticsUpdates(ctx context.Context, nam
}
}
}

func ewmaSmoothedLoad(prev, current float64, lastUpdate, now int64) float64 {
const tauSeconds = 30.0 // smaller = more responsive, larger = smoother
if lastUpdate <= 0 || tauSeconds <= 0 {
return current
}
dt := max(now-lastUpdate, 0)
alpha := 1 - math.Exp(-float64(dt)/tauSeconds)
return (1-alpha)*prev + alpha*current
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,93 @@ func TestRecordHeartbeat(t *testing.T) {
assert.Equal(t, "value-2", string(resp.Kvs[0].Value))
}

func TestRecordHeartbeatUpdatesShardStatistics(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

executorID := "executor-shard-stats"
shardID := "shard-with-load"

initialStats := store.ShardStatistics{
SmoothedLoad: 1.23,
LastUpdateTime: 10,
LastMoveTime: 123,
}

shardStatsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey)
require.NoError(t, err)
payload, err := json.Marshal(initialStats)
require.NoError(t, err)
_, err = tc.Client.Put(ctx, shardStatsKey, string(payload))
require.NoError(t, err)

nowTS := time.Now().Unix()

req := store.HeartbeatState{
LastHeartbeat: nowTS,
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{
shardID: {
Status: types.ShardStatusREADY,
ShardLoad: 45.6,
},
},
}

require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req))

nsState, err := executorStore.GetState(ctx, tc.Namespace)
require.NoError(t, err)

require.Contains(t, nsState.ShardStats, shardID)
updated := nsState.ShardStats[shardID]

assert.InDelta(t, 45.6, updated.SmoothedLoad, 1e-9)
assert.GreaterOrEqual(t, updated.LastUpdateTime, nowTS)
assert.Equal(t, initialStats.LastMoveTime, updated.LastMoveTime)
}

func TestRecordHeartbeatSkipsShardStatisticsWithNilReport(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

executorID := "executor-missing-load"
validShardID := "shard-with-valid-load"
skippedShardID := "shard-missing-load"

nowTS := time.Now().Unix()

req := store.HeartbeatState{
LastHeartbeat: nowTS,
Status: types.ExecutorStatusACTIVE,
ReportedShards: map[string]*types.ShardStatusReport{
validShardID: {
Status: types.ShardStatusREADY,
ShardLoad: 3.21,
},
skippedShardID: nil,
},
}

require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, req))

nsState, err := executorStore.GetState(ctx, tc.Namespace)
require.NoError(t, err)

require.Contains(t, nsState.ShardStats, validShardID)
validStats := nsState.ShardStats[validShardID]
assert.InDelta(t, 3.21, validStats.SmoothedLoad, 1e-9)
assert.Greater(t, validStats.LastUpdateTime, int64(0))

assert.NotContains(t, nsState.ShardStats, skippedShardID)
}

func TestGetHeartbeat(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)
Expand Down