Skip to content

Commit 158e030

Browse files
committed
feat(shard distributor): throttle shard-stat writes
Signed-off-by: Andreas Holt <[email protected]>
1 parent abfc80e commit 158e030

File tree

2 files changed

+129
-6
lines changed

2 files changed

+129
-6
lines changed

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 44 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,15 @@ type executorStoreImpl struct {
3636
logger log.Logger
3737
shardCache *shardcache.ShardToExecutorCache
3838
timeSource clock.TimeSource
39+
// Max interval (seconds) before we force a shard-stat persist.
40+
maxStatsPersistIntervalSeconds int64
3941
}
4042

43+
// Constants for gating shard statistics writes to reduce etcd load.
44+
const (
45+
shardStatsEpsilon = 0.05
46+
)
47+
4148
// shardStatisticsUpdate holds the staged statistics for a shard so we can write them
4249
// to etcd after the main AssignShards transaction commits.
4350
type shardStatisticsUpdate struct {
@@ -90,11 +97,12 @@ func NewStore(p ExecutorStoreParams) (store.Store, error) {
9097
}
9198

9299
store := &executorStoreImpl{
93-
client: etcdClient,
94-
prefix: etcdCfg.Prefix,
95-
logger: p.Logger,
96-
shardCache: shardCache,
97-
timeSource: timeSource,
100+
client: etcdClient,
101+
prefix: etcdCfg.Prefix,
102+
logger: p.Logger,
103+
shardCache: shardCache,
104+
timeSource: timeSource,
105+
maxStatsPersistIntervalSeconds: deriveStatsPersistInterval(p.Cfg.Process.HeartbeatTTL),
98106
}
99107

100108
p.Lifecycle.Append(fx.StartStopHook(store.Start, store.Stop))
@@ -160,6 +168,15 @@ func (s *executorStoreImpl) RecordHeartbeat(ctx context.Context, namespace, exec
160168
return nil
161169
}
162170

171+
func deriveStatsPersistInterval(heartbeatTTL time.Duration) int64 {
172+
ttlSeconds := int64(heartbeatTTL.Seconds())
173+
interval := ttlSeconds - 1
174+
if interval < 1 {
175+
interval = 1
176+
}
177+
return interval
178+
}
179+
163180
func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace, executorID string, reported map[string]*types.ShardStatusReport) {
164181
if len(reported) == 0 {
165182
return
@@ -228,7 +245,28 @@ func (s *executorStoreImpl) recordShardStatistics(ctx context.Context, namespace
228245
}
229246

230247
// Update smoothed load via EWMA.
231-
stats.SmoothedLoad = ewmaSmoothedLoad(stats.SmoothedLoad, load, stats.LastUpdateTime, now)
248+
prevSmoothed := stats.SmoothedLoad
249+
prevUpdate := stats.LastUpdateTime
250+
newSmoothed := ewmaSmoothedLoad(prevSmoothed, load, prevUpdate, now)
251+
252+
// Decide whether to persist this update. We always persist if this is the
253+
// first observation (prevUpdate == 0). Otherwise, if the change is small
254+
// and the previous persist is recent, skip the write to reduce etcd load.
255+
shouldPersist := true
256+
if prevUpdate > 0 {
257+
age := now - prevUpdate
258+
delta := math.Abs(newSmoothed - prevSmoothed)
259+
if delta < shardStatsEpsilon && age < s.maxStatsPersistIntervalSeconds {
260+
shouldPersist = false
261+
}
262+
}
263+
264+
if !shouldPersist {
265+
// Skip persisting, proceed to next shard.
266+
continue
267+
}
268+
269+
stats.SmoothedLoad = newSmoothed
232270
stats.LastUpdateTime = now
233271

234272
payload, err := json.Marshal(stats)

service/sharddistributor/store/etcd/executorstore/etcdstore_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"github.com/stretchr/testify/require"
1212
"go.uber.org/fx/fxtest"
1313

14+
"github.com/uber/cadence/common/clock"
1415
"github.com/uber/cadence/common/log/testlogger"
1516
"github.com/uber/cadence/common/types"
1617
"github.com/uber/cadence/service/sharddistributor/store"
@@ -178,6 +179,66 @@ func TestRecordHeartbeatSkipsShardStatisticsWithNilReport(t *testing.T) {
178179
assert.NotContains(t, nsState.ShardStats, skippedShardID)
179180
}
180181

182+
func TestRecordHeartbeatShardStatisticsThrottlesWrites(t *testing.T) {
183+
tc := testhelper.SetupStoreTestCluster(t)
184+
tc.LeaderCfg.Process.HeartbeatTTL = 10 * time.Second
185+
mockTS := clock.NewMockedTimeSourceAt(time.Unix(1000, 0))
186+
executorStore := createStoreWithTimeSource(t, tc, mockTS)
187+
esImpl, ok := executorStore.(*executorStoreImpl)
188+
require.True(t, ok, "unexpected store implementation")
189+
190+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
191+
defer cancel()
192+
193+
executorID := "executor-shard-stats-throttle"
194+
shardID := "shard-stats-throttle"
195+
196+
baseLoad := 0.40
197+
smallDelta := shardStatsEpsilon / 2
198+
intervalSeconds := esImpl.maxStatsPersistIntervalSeconds
199+
halfIntervalSeconds := intervalSeconds / 2
200+
if halfIntervalSeconds == 0 {
201+
halfIntervalSeconds = 1
202+
}
203+
204+
// First heartbeat should always persist stats.
205+
require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{
206+
LastHeartbeat: mockTS.Now().Unix(),
207+
Status: types.ExecutorStatusACTIVE,
208+
ReportedShards: map[string]*types.ShardStatusReport{
209+
shardID: {Status: types.ShardStatusREADY, ShardLoad: baseLoad},
210+
},
211+
}))
212+
statsAfterFirst := getShardStats(t, executorStore, ctx, tc.Namespace, shardID)
213+
require.NotNil(t, statsAfterFirst)
214+
215+
// Advance time by less than the persist interval and provide a small delta: should skip the write.
216+
mockTS.Advance(time.Duration(halfIntervalSeconds) * time.Second)
217+
require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{
218+
LastHeartbeat: mockTS.Now().Unix(),
219+
Status: types.ExecutorStatusACTIVE,
220+
ReportedShards: map[string]*types.ShardStatusReport{
221+
shardID: {Status: types.ShardStatusREADY, ShardLoad: baseLoad + smallDelta},
222+
},
223+
}))
224+
statsAfterSkip := getShardStats(t, executorStore, ctx, tc.Namespace, shardID)
225+
require.NotNil(t, statsAfterSkip)
226+
assert.Equal(t, statsAfterFirst.LastUpdateTime, statsAfterSkip.LastUpdateTime, "small recent deltas should not trigger a persist")
227+
228+
// Advance time beyond the max persist interval, even small deltas should now persist.
229+
mockTS.Advance(time.Duration(intervalSeconds) * time.Second)
230+
require.NoError(t, executorStore.RecordHeartbeat(ctx, tc.Namespace, executorID, store.HeartbeatState{
231+
LastHeartbeat: mockTS.Now().Unix(),
232+
Status: types.ExecutorStatusACTIVE,
233+
ReportedShards: map[string]*types.ShardStatusReport{
234+
shardID: {Status: types.ShardStatusREADY, ShardLoad: baseLoad + smallDelta/2},
235+
},
236+
}))
237+
statsAfterForce := getShardStats(t, executorStore, ctx, tc.Namespace, shardID)
238+
require.NotNil(t, statsAfterForce)
239+
assert.Greater(t, statsAfterForce.LastUpdateTime, statsAfterSkip.LastUpdateTime, "stale stats must be refreshed even if delta is small")
240+
}
241+
181242
func TestGetHeartbeat(t *testing.T) {
182243
tc := testhelper.SetupStoreTestCluster(t)
183244
executorStore := createStore(t, tc)
@@ -695,3 +756,27 @@ func createStore(t *testing.T, tc *testhelper.StoreTestCluster) store.Store {
695756
require.NoError(t, err)
696757
return store
697758
}
759+
760+
func createStoreWithTimeSource(t *testing.T, tc *testhelper.StoreTestCluster, ts clock.TimeSource) store.Store {
761+
t.Helper()
762+
store, err := NewStore(ExecutorStoreParams{
763+
Client: tc.Client,
764+
Cfg: tc.LeaderCfg,
765+
Lifecycle: fxtest.NewLifecycle(t),
766+
Logger: testlogger.New(t),
767+
TimeSource: ts,
768+
})
769+
require.NoError(t, err)
770+
return store
771+
}
772+
773+
func getShardStats(t *testing.T, s store.Store, ctx context.Context, namespace, shardID string) *store.ShardStatistics {
774+
t.Helper()
775+
nsState, err := s.GetState(ctx, namespace)
776+
require.NoError(t, err)
777+
stats, ok := nsState.ShardStats[shardID]
778+
if !ok {
779+
return nil
780+
}
781+
return &stats
782+
}

0 commit comments

Comments
 (0)