Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) {
continue
}
staleShardStats := p.identifyStaleShardStats(namespaceState)
if len(staleShardStats) == 0 {
if len(staleShardStats) > 0 {
// No stale shard stats to delete
continue
}
Expand Down
24 changes: 2 additions & 22 deletions service/sharddistributor/store/etcd/executorstore/etcdstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@ var (
_executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE)
)

const deleteShardStatsBatchSize = 64

type executorStoreImpl struct {
client *clientv3.Client
prefix string
Expand Down Expand Up @@ -599,40 +597,22 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin
return nil
}

// DeleteShardStats deletes shard statistics in batches to avoid hitting etcd transaction limits (128 ops).
// If any batch fails (e.g. due to leadership loss), the operation returns immediately.
// Partial deletions are acceptable as the periodic cleanup loop will retry remaining keys.
func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error {
if len(shardIDs) == 0 {
return nil
}

for start := 0; start < len(shardIDs); start += deleteShardStatsBatchSize {
end := start + deleteShardStatsBatchSize
if end > len(shardIDs) {
end = len(shardIDs)
}

if err := s.deleteShardStatsBatch(ctx, namespace, shardIDs[start:end], guard); err != nil {
return err
}
}
return nil
}

func (s *executorStoreImpl) deleteShardStatsBatch(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error {
ops := make([]clientv3.Op, 0, len(shardIDs))
var ops []clientv3.Op
for _, shardID := range shardIDs {
shardStatsKey := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey)
ops = append(ops, clientv3.OpDelete(shardStatsKey))
}

nativeTxn := s.client.Txn(ctx)
guardedTxn, err := guard(nativeTxn)

if err != nil {
return fmt.Errorf("apply transaction guard: %w", err)
}

etcdGuardedTxn, ok := guardedTxn.(clientv3.Txn)
if !ok {
return fmt.Errorf("guard function returned invalid transaction type")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,42 +573,6 @@ func TestGetShardStatisticsForMissingShard(t *testing.T) {
assert.NotContains(t, st.ShardStats, "unknown")
}

// TestDeleteShardStatsDeletesLargeBatches verifies that shard statistics are correctly deleted in batches.
func TestDeleteShardStatsDeletesLargeBatches(t *testing.T) {
tc := testhelper.SetupStoreTestCluster(t)
executorStore := createStore(t, tc)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

totalShardStats := deleteShardStatsBatchSize*2 + 7 // two batches + 7 extra (remainder)
shardIDs := make([]string, 0, totalShardStats)

// Create stale stats
for i := 0; i < totalShardStats; i++ {
shardID := "stale-stats-" + strconv.Itoa(i)
shardIDs = append(shardIDs, shardID)

statsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey)
require.NoError(t, err)
stats := store.ShardStatistics{
SmoothedLoad: float64(i),
LastUpdateTime: int64(i),
LastMoveTime: int64(i),
}
payload, err := json.Marshal(stats)
require.NoError(t, err)
_, err = tc.Client.Put(ctx, statsKey, string(payload))
require.NoError(t, err)
}

require.NoError(t, executorStore.DeleteShardStats(ctx, tc.Namespace, shardIDs, store.NopGuard()))

nsState, err := executorStore.GetState(ctx, tc.Namespace)
require.NoError(t, err)
assert.Empty(t, nsState.ShardStats)
}

// --- Test Setup ---

func stringStatus(s types.ExecutorStatus) string {
Expand Down
Loading