From 1c8fe114df551b0e2cc37f2de43e52c0da10fa6e Mon Sep 17 00:00:00 2001 From: Felipe Imanishi Date: Mon, 24 Nov 2025 11:07:59 -0800 Subject: [PATCH] =?UTF-8?q?Revert=20"fix(shard-distributor):=20make=20Dele?= =?UTF-8?q?teShardStats=20non-transactional=20and=20f=E2=80=A6"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This reverts commit 125b038a300e27d3d50dfd22d225bace62f0ee72. --- .../leader/process/processor.go | 2 +- .../store/etcd/executorstore/etcdstore.go | 24 ++----------- .../etcd/executorstore/etcdstore_test.go | 36 ------------------- 3 files changed, 3 insertions(+), 59 deletions(-) diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 76abb573900..127324249f8 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -237,7 +237,7 @@ func (p *namespaceProcessor) runShardStatsCleanupLoop(ctx context.Context) { continue } staleShardStats := p.identifyStaleShardStats(namespaceState) - if len(staleShardStats) == 0 { + if len(staleShardStats) > 0 { // No stale shard stats to delete continue } diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore.go b/service/sharddistributor/store/etcd/executorstore/etcdstore.go index a6fb4473f9f..edba090876b 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore.go @@ -29,8 +29,6 @@ var ( _executorStatusRunningJSON = fmt.Sprintf(`"%s"`, types.ExecutorStatusACTIVE) ) -const deleteShardStatsBatchSize = 64 - type executorStoreImpl struct { client *clientv3.Client prefix string @@ -599,29 +597,11 @@ func (s *executorStoreImpl) DeleteExecutors(ctx context.Context, namespace strin return nil } -// DeleteShardStats deletes shard statistics in batches to avoid hitting etcd transaction limits (128 ops). -// If any batch fails (e.g. due to leadership loss), the operation returns immediately. -// Partial deletions are acceptable as the periodic cleanup loop will retry remaining keys. func (s *executorStoreImpl) DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error { if len(shardIDs) == 0 { return nil } - - for start := 0; start < len(shardIDs); start += deleteShardStatsBatchSize { - end := start + deleteShardStatsBatchSize - if end > len(shardIDs) { - end = len(shardIDs) - } - - if err := s.deleteShardStatsBatch(ctx, namespace, shardIDs[start:end], guard); err != nil { - return err - } - } - return nil -} - -func (s *executorStoreImpl) deleteShardStatsBatch(ctx context.Context, namespace string, shardIDs []string, guard store.GuardFunc) error { - ops := make([]clientv3.Op, 0, len(shardIDs)) + var ops []clientv3.Op for _, shardID := range shardIDs { shardStatsKey := etcdkeys.BuildShardKey(s.prefix, namespace, shardID, etcdkeys.ShardStatisticsKey) ops = append(ops, clientv3.OpDelete(shardStatsKey)) @@ -629,10 +609,10 @@ func (s *executorStoreImpl) deleteShardStatsBatch(ctx context.Context, namespace nativeTxn := s.client.Txn(ctx) guardedTxn, err := guard(nativeTxn) + if err != nil { return fmt.Errorf("apply transaction guard: %w", err) } - etcdGuardedTxn, ok := guardedTxn.(clientv3.Txn) if !ok { return fmt.Errorf("guard function returned invalid transaction type") diff --git a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go index 5e5f1688d7b..490a72da967 100644 --- a/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go +++ b/service/sharddistributor/store/etcd/executorstore/etcdstore_test.go @@ -573,42 +573,6 @@ func TestGetShardStatisticsForMissingShard(t *testing.T) { assert.NotContains(t, st.ShardStats, "unknown") } -// TestDeleteShardStatsDeletesLargeBatches verifies that shard statistics are correctly deleted in batches. -func TestDeleteShardStatsDeletesLargeBatches(t *testing.T) { - tc := testhelper.SetupStoreTestCluster(t) - executorStore := createStore(t, tc) - - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - totalShardStats := deleteShardStatsBatchSize*2 + 7 // two batches + 7 extra (remainder) - shardIDs := make([]string, 0, totalShardStats) - - // Create stale stats - for i := 0; i < totalShardStats; i++ { - shardID := "stale-stats-" + strconv.Itoa(i) - shardIDs = append(shardIDs, shardID) - - statsKey, err := etcdkeys.BuildShardKey(tc.EtcdPrefix, tc.Namespace, shardID, etcdkeys.ShardStatisticsKey) - require.NoError(t, err) - stats := store.ShardStatistics{ - SmoothedLoad: float64(i), - LastUpdateTime: int64(i), - LastMoveTime: int64(i), - } - payload, err := json.Marshal(stats) - require.NoError(t, err) - _, err = tc.Client.Put(ctx, statsKey, string(payload)) - require.NoError(t, err) - } - - require.NoError(t, executorStore.DeleteShardStats(ctx, tc.Namespace, shardIDs, store.NopGuard())) - - nsState, err := executorStore.GetState(ctx, tc.Namespace) - require.NoError(t, err) - assert.Empty(t, nsState.ShardStats) -} - // --- Test Setup --- func stringStatus(s types.ExecutorStatus) string {