diff --git a/common/metrics/defs.go b/common/metrics/defs.go index 557f3bc571d..8bbd28e413e 100644 --- a/common/metrics/defs.go +++ b/common/metrics/defs.go @@ -1479,6 +1479,7 @@ const ( ShardDistributorStoreAssignShardScope ShardDistributorStoreAssignShardsScope ShardDistributorStoreDeleteExecutorsScope + ShardDistributorStoreGetShardStatsScope ShardDistributorStoreDeleteShardStatsScope ShardDistributorStoreGetHeartbeatScope ShardDistributorStoreGetStateScope @@ -2167,6 +2168,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{ ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"}, ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"}, ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"}, + ShardDistributorStoreGetShardStatsScope: {operation: "StoreGetShardStats"}, ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"}, ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"}, ShardDistributorStoreGetStateScope: {operation: "StoreGetState"}, @@ -2968,6 +2970,13 @@ const ( ShardDistributorStoreRequestsPerNamespace ShardDistributorStoreLatencyHistogramPerNamespace + // ShardDistributorShardAssignmentDistributionLatency measures the time taken between assignment of a shard + // and the time it is fully distributed to executors + ShardDistributorShardAssignmentDistributionLatency + + // ShardDistributorShardHandoverLatency measures the time taken to hand over a shard from one executor to another + ShardDistributorShardHandoverLatency + NumShardDistributorMetrics ) @@ -3753,6 +3762,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{ ShardDistributorStoreFailuresPerNamespace: {metricName: "shard_distributor_store_failures_per_namespace", metricType: Counter}, ShardDistributorStoreRequestsPerNamespace: {metricName: "shard_distributor_store_requests_per_namespace", metricType: Counter}, ShardDistributorStoreLatencyHistogramPerNamespace: {metricName: "shard_distributor_store_latency_histogram_per_namespace", metricType: Histogram, buckets: ShardDistributorExecutorStoreLatencyBuckets}, + + ShardDistributorShardAssignmentDistributionLatency: {metricName: "shard_distributor_shard_assignment_distribution_latency", metricType: Histogram, buckets: Default1ms100s.buckets()}, + ShardDistributorShardHandoverLatency: {metricName: "shard_distributor_shard_handover_latency", metricType: Histogram, buckets: Default1ms100s.buckets()}, }, } diff --git a/common/metrics/tags.go b/common/metrics/tags.go index 2060c871cda..09d88a9ee10 100644 --- a/common/metrics/tags.go +++ b/common/metrics/tags.go @@ -352,6 +352,10 @@ func NamespaceTypeTag(namespaceType string) Tag { return metricWithUnknown("namespace_type", namespaceType) } +func HandoverTypeTag(handoverType string) Tag { + return metricWithUnknown("handover_type", handoverType) +} + func TaskCategoryTag(category string) Tag { return metricWithUnknown("task_category", category) } diff --git a/common/ptr/ptr.go b/common/ptr/ptr.go new file mode 100644 index 00000000000..7097e4f370d --- /dev/null +++ b/common/ptr/ptr.go @@ -0,0 +1,6 @@ +package ptr + +// ToPtr returns a pointer to the given value. +func ToPtr[T any](v T) *T { + return &v +} diff --git a/common/types/sharddistributor.go b/common/types/sharddistributor.go index 5bbddf478f0..a120c7a9e89 100644 --- a/common/types/sharddistributor.go +++ b/common/types/sharddistributor.go @@ -22,9 +22,11 @@ package types -import "fmt" +import ( + "fmt" +) -//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go +//go:generate enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode,HandoverType -json -output sharddistributor_statuses_enumer_generated.go type GetShardOwnerRequest struct { ShardKey string @@ -198,6 +200,7 @@ func (v *ExecutorHeartbeatResponse) GetMigrationPhase() (o MigrationMode) { } type ShardAssignment struct { + // Status indicates the current assignment status of the shard. Status AssignmentStatus `json:"status"` } @@ -217,6 +220,30 @@ const ( AssignmentStatusREADY AssignmentStatus = 1 ) +// HandoverType is used to indicate the type of handover that occurred during shard reassignment. +// Type is persisted to the DB with a string value mapping. +// Beware - if we want to change the name - it should be backward compatible and should be done in two steps. +type HandoverType int32 + +const ( + HandoverTypeINVALID HandoverType = 0 + + // HandoverTypeGRACEFUL + // Graceful handover indicates that the shard was transferred in a way that allowed + // the previous owner had a chance to finish processing before the shard was reassigned. + HandoverTypeGRACEFUL HandoverType = 1 + + // HandoverTypeEMERGENCY + // Emergency handover indicates that the shard was transferred abruptly without + // allowing the previous owner to finish processing. + HandoverTypeEMERGENCY HandoverType = 2 +) + +// Ptr returns a pointer to the HandoverType value. +func (t HandoverType) Ptr() *HandoverType { + return &t +} + type MigrationMode int32 const ( diff --git a/common/types/sharddistributor_statuses_enumer_generated.go b/common/types/sharddistributor_statuses_enumer_generated.go index 9e1f0f873e0..022a9519212 100644 --- a/common/types/sharddistributor_statuses_enumer_generated.go +++ b/common/types/sharddistributor_statuses_enumer_generated.go @@ -1,4 +1,4 @@ -// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. +// Code generated by "enumer -type=ExecutorStatus,ShardStatus,AssignmentStatus,MigrationMode,HandoverType -json -output sharddistributor_statuses_enumer_generated.go"; DO NOT EDIT. package types @@ -379,3 +379,94 @@ func (i *MigrationMode) UnmarshalJSON(data []byte) error { *i, err = MigrationModeString(s) return err } + +const _HandoverTypeName = "HandoverTypeINVALIDHandoverTypeGRACEFULHandoverTypeEMERGENCY" + +var _HandoverTypeIndex = [...]uint8{0, 19, 39, 60} + +const _HandoverTypeLowerName = "handovertypeinvalidhandovertypegracefulhandovertypeemergency" + +func (i HandoverType) String() string { + if i < 0 || i >= HandoverType(len(_HandoverTypeIndex)-1) { + return fmt.Sprintf("HandoverType(%d)", i) + } + return _HandoverTypeName[_HandoverTypeIndex[i]:_HandoverTypeIndex[i+1]] +} + +// An "invalid array index" compiler error signifies that the constant values have changed. +// Re-run the stringer command to generate them again. +func _HandoverTypeNoOp() { + var x [1]struct{} + _ = x[HandoverTypeINVALID-(0)] + _ = x[HandoverTypeGRACEFUL-(1)] + _ = x[HandoverTypeEMERGENCY-(2)] +} + +var _HandoverTypeValues = []HandoverType{HandoverTypeINVALID, HandoverTypeGRACEFUL, HandoverTypeEMERGENCY} + +var _HandoverTypeNameToValueMap = map[string]HandoverType{ + _HandoverTypeName[0:19]: HandoverTypeINVALID, + _HandoverTypeLowerName[0:19]: HandoverTypeINVALID, + _HandoverTypeName[19:39]: HandoverTypeGRACEFUL, + _HandoverTypeLowerName[19:39]: HandoverTypeGRACEFUL, + _HandoverTypeName[39:60]: HandoverTypeEMERGENCY, + _HandoverTypeLowerName[39:60]: HandoverTypeEMERGENCY, +} + +var _HandoverTypeNames = []string{ + _HandoverTypeName[0:19], + _HandoverTypeName[19:39], + _HandoverTypeName[39:60], +} + +// HandoverTypeString retrieves an enum value from the enum constants string name. +// Throws an error if the param is not part of the enum. +func HandoverTypeString(s string) (HandoverType, error) { + if val, ok := _HandoverTypeNameToValueMap[s]; ok { + return val, nil + } + + if val, ok := _HandoverTypeNameToValueMap[strings.ToLower(s)]; ok { + return val, nil + } + return 0, fmt.Errorf("%s does not belong to HandoverType values", s) +} + +// HandoverTypeValues returns all values of the enum +func HandoverTypeValues() []HandoverType { + return _HandoverTypeValues +} + +// HandoverTypeStrings returns a slice of all String values of the enum +func HandoverTypeStrings() []string { + strs := make([]string, len(_HandoverTypeNames)) + copy(strs, _HandoverTypeNames) + return strs +} + +// IsAHandoverType returns "true" if the value is listed in the enum definition. "false" otherwise +func (i HandoverType) IsAHandoverType() bool { + for _, v := range _HandoverTypeValues { + if i == v { + return true + } + } + return false +} + +// MarshalJSON implements the json.Marshaler interface for HandoverType +func (i HandoverType) MarshalJSON() ([]byte, error) { + return json.Marshal(i.String()) +} + +// UnmarshalJSON implements the json.Unmarshaler interface for HandoverType +func (i *HandoverType) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return fmt.Errorf("HandoverType should be a string, got %s", data) + } + + var err error + *i, err = HandoverTypeString(s) + return err +} diff --git a/service/sharddistributor/handler/executor.go b/service/sharddistributor/handler/executor.go index f288177db7d..7ea1647d2a3 100644 --- a/service/sharddistributor/handler/executor.go +++ b/service/sharddistributor/handler/executor.go @@ -9,6 +9,7 @@ import ( "github.com/uber/cadence/common/clock" "github.com/uber/cadence/common/log" "github.com/uber/cadence/common/log/tag" + "github.com/uber/cadence/common/metrics" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" @@ -28,6 +29,7 @@ type executor struct { storage store.Store shardDistributionCfg config.ShardDistribution migrationConfiguration *config.MigrationConfig + metricsClient metrics.Client } func NewExecutorHandler( @@ -36,6 +38,7 @@ func NewExecutorHandler( timeSource clock.TimeSource, shardDistributionCfg config.ShardDistribution, migrationConfig *config.MigrationConfig, + metricsClient metrics.Client, ) Executor { return &executor{ logger: logger, @@ -43,6 +46,7 @@ func NewExecutorHandler( storage: storage, shardDistributionCfg: shardDistributionCfg, migrationConfiguration: migrationConfig, + metricsClient: metricsClient, } } @@ -53,8 +57,9 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get heartbeat: %v", err)} } - now := h.timeSource.Now().UTC() + heartbeatTime := h.timeSource.Now().UTC() mode := h.migrationConfiguration.GetMigrationMode(request.Namespace) + shardAssignedInBackground := true switch mode { case types.MigrationModeINVALID: @@ -68,19 +73,20 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe if err != nil { return nil, err } + shardAssignedInBackground = false } // If the state has changed we need to update heartbeat data. // Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate. if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED { lastHeartbeatTime := previousHeartbeat.LastHeartbeat - if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate { + if heartbeatTime.Sub(lastHeartbeatTime) < _heartbeatRefreshRate { return _convertResponse(assignedShards, mode), nil } } newHeartbeat := store.HeartbeatState{ - LastHeartbeat: now, + LastHeartbeat: heartbeatTime, Status: request.Status, ReportedShards: request.ShardStatusReports, Metadata: request.GetMetadata(), @@ -95,9 +101,58 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to record heartbeat: %v", err)} } + // emit shard assignment metrics only if shards are assigned in the background + // shard assignment in heartbeat doesn't involve any assignment changes happening in the background + // thus there was no shard handover and no assignment distribution latency + // to measure, so don't need to emit metrics in that case + if shardAssignedInBackground { + // emits metrics in background to not block the heartbeat response + h.emitShardAssignmentMetrics(request.Namespace, heartbeatTime, previousHeartbeat, assignedShards) + } + return _convertResponse(assignedShards, mode), nil } +// emitShardAssignmentMetrics emits the following metrics for newly assigned shards: +// - ShardAssignmentDistributionLatency: time taken since the shard was assigned to heartbeat time +// - ShardHandoverLatency: time taken since the previous executor's last heartbeat to heartbeat time +func (h *executor) emitShardAssignmentMetrics(namespace string, heartbeatTime time.Time, previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) { + // find newly assigned shards, if there are none, no handovers happened + newAssignedShardIDs := filterNewlyAssignedShardIDs(previousHeartbeat, assignedState) + if len(newAssignedShardIDs) == 0 { + // no handovers happened, nothing to do + return + } + + metricsScope := h.metricsClient.Scope(metrics.ShardDistributorHeartbeatScope). + Tagged(metrics.NamespaceTag(namespace)) + + distributionLatency := heartbeatTime.Sub(assignedState.LastUpdated) + metricsScope.RecordHistogramDuration(metrics.ShardDistributorShardAssignmentDistributionLatency, distributionLatency) + + // check if handover stats exist at all + isShardHandoverStatsExists := assignedState.ShardHandoverStats != nil + + for _, shardID := range newAssignedShardIDs { + if !isShardHandoverStatsExists { + // no handover stats at all, means no handovers happened before + continue + } + + handoverStats, ok := assignedState.ShardHandoverStats[shardID] + if !ok { + // no handover stats for this shard, means it was never handed over before + // so no handover latency metric to emit + continue + } + + handoverLatency := heartbeatTime.Sub(handoverStats.PreviousExecutorLastHeartbeatTime) + metricsScope.Tagged(metrics.HandoverTypeTag(handoverStats.HandoverType.String())). + RecordHistogramDuration(metrics.ShardDistributorShardHandoverLatency, handoverLatency) + + } +} + // assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest) (*store.AssignedState, error) { assignedShards := store.AssignedState{ @@ -155,3 +210,33 @@ func validateMetadata(metadata map[string]string) error { return nil } + +func filterNewlyAssignedShardIDs(previousHeartbeat *store.HeartbeatState, assignedState *store.AssignedState) []string { + // if assignedState is nil, no shards are assigned + if assignedState == nil || len(assignedState.AssignedShards) == 0 { + return nil + } + + // if previousHeartbeat is nil, all assigned shards are new + if previousHeartbeat == nil { + var newAssignedShardIDs = make([]string, len(assignedState.AssignedShards)) + + var i int + for assignedShardID := range assignedState.AssignedShards { + newAssignedShardIDs[i] = assignedShardID + i++ + } + + return newAssignedShardIDs + } + + // find shards that are assigned now but were not reported in the previous heartbeat + var newAssignedShardIDs []string + for assignedShardID := range assignedState.AssignedShards { + if _, ok := previousHeartbeat.ReportedShards[assignedShardID]; !ok { + newAssignedShardIDs = append(newAssignedShardIDs, assignedShardID) + } + } + + return newAssignedShardIDs +} diff --git a/service/sharddistributor/handler/executor_test.go b/service/sharddistributor/handler/executor_test.go index bb8fe115d82..59aef7012e4 100644 --- a/service/sharddistributor/handler/executor_test.go +++ b/service/sharddistributor/handler/executor_test.go @@ -13,6 +13,9 @@ import ( "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" "github.com/uber/cadence/common/log/testlogger" + "github.com/uber/cadence/common/metrics" + metricmocks "github.com/uber/cadence/common/metrics/mocks" + "github.com/uber/cadence/common/ptr" "github.com/uber/cadence/common/types" "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" @@ -31,7 +34,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSourceAt(now) shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -56,7 +59,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSourceAt(now) shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -82,7 +85,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSourceAt(now) shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -115,7 +118,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSourceAt(now) shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -145,7 +148,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSource() shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -170,7 +173,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeINVALID}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeINVALID}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -199,7 +202,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGH}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGH}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -228,7 +231,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGHSHADOW}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGHSHADOW}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -269,6 +272,7 @@ func TestHeartbeat(t *testing.T) { return nil }, ) + mockStore.EXPECT().RecordHeartbeat(gomock.Any(), namespace, executorID, gomock.AssignableToTypeOf(store.HeartbeatState{})).DoAndReturn( func(_ context.Context, _ string, _ string, hb store.HeartbeatState) error { // Validate status and reported shards, ignore exact timestamp @@ -292,7 +296,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGHSHADOW}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGHSHADOW}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -335,7 +339,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGHSHADOW}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGHSHADOW}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -377,7 +381,7 @@ func TestHeartbeat(t *testing.T) { mockTimeSource := clock.NewMockedTimeSourceAt(now) shardDistributionCfg := config.ShardDistribution{} migrationConfig := newMigrationConfig(t, []configEntry{}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) // Create metadata with more than max allowed keys metadata := make(map[string]string) @@ -408,7 +412,7 @@ func TestHeartbeat(t *testing.T) { Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGH}}, } migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGH}}) - handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig) + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig, metrics.NoopClient) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -606,3 +610,241 @@ func newMigrationConfig(t *testing.T, configEntries []configEntry) *config.Migra migrationConfig := config.NewMigrationConfig(dc) return migrationConfig } + +func TestFilterNewlyAssignedShardIDs(t *testing.T) { + type testCase struct { + name string + previous *store.HeartbeatState + assigned *store.AssignedState + expected []string + } + tests := []testCase{ + { + name: "nil previousHeartbeat returns all assigned", + previous: nil, + assigned: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard1": {}, + "shard2": {}, + }, + }, + expected: []string{"shard1", "shard2"}, + }, + { + name: "no new assigned shards", + previous: &store.HeartbeatState{ + ReportedShards: map[string]*types.ShardStatusReport{ + "shard1": {}, + "shard2": {}, + }, + }, + assigned: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard1": {}, + "shard2": {}, + }, + }, + expected: []string{}, + }, + { + name: "some new assigned shards", + previous: &store.HeartbeatState{ + ReportedShards: map[string]*types.ShardStatusReport{ + "shard1": {}, + }, + }, + assigned: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard1": {}, + "shard2": {}, + "shard3": {}, + }, + }, + expected: []string{"shard2", "shard3"}, + }, + { + name: "empty assigned returns empty", + previous: &store.HeartbeatState{ + ReportedShards: map[string]*types.ShardStatusReport{ + "shard1": {}, + }, + }, + assigned: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{}, + }, + expected: []string{}, + }, + { + name: "nil assignedState returns nil", + previous: &store.HeartbeatState{ + ReportedShards: map[string]*types.ShardStatusReport{ + "shard1": {}, + }, + }, + assigned: nil, + expected: nil, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := filterNewlyAssignedShardIDs(tt.previous, tt.assigned) + require.ElementsMatch(t, tt.expected, result) + }) + } +} + +func TestEmitShardAssignmentMetrics(t *testing.T) { + heartbeatTime := time.Now().UTC() + namespace := "test-namespace" + shardID := "shard-1" + + for name, tc := range map[string]struct { + previousHeartbeat *store.HeartbeatState + assignedState *store.AssignedState + + expectedDistributionLatency *time.Duration + expectedHandoverLatencies []*time.Duration + }{ + "no new assigned shards": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{shardID: {}}}, + assignedState: &store.AssignedState{AssignedShards: map[string]*types.ShardAssignment{shardID: {}}}, + expectedDistributionLatency: nil, + expectedHandoverLatencies: nil, + }, + "newly assigned shard with handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{shardID: {Status: types.AssignmentStatusREADY}}, + LastUpdated: heartbeatTime.Add(-10 * time.Second), + ShardHandoverStats: map[string]store.ShardHandoverStats{ + shardID: { + PreviousExecutorLastHeartbeatTime: heartbeatTime.Add(-20 * time.Second), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + }, + expectedDistributionLatency: ptr.ToPtr(10 * time.Second), + expectedHandoverLatencies: []*time.Duration{ptr.ToPtr(20 * time.Second)}, + }, + "one assigned shard with no handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{shardID: {Status: types.AssignmentStatusREADY}}, + LastUpdated: heartbeatTime.Add(-5 * time.Second), + ShardHandoverStats: map[string]store.ShardHandoverStats{}, + }, + expectedDistributionLatency: ptr.ToPtr(5 * time.Second), + expectedHandoverLatencies: nil, + }, + "one assigned shard with nil handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{shardID: {Status: types.AssignmentStatusREADY}}, + LastUpdated: heartbeatTime.Add(-5 * time.Second), + ShardHandoverStats: nil, + }, + expectedDistributionLatency: ptr.ToPtr(5 * time.Second), + expectedHandoverLatencies: nil, + }, + "multiple newly assigned shards with handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + "shard-2": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: heartbeatTime.Add(-15 * time.Second), + ShardHandoverStats: map[string]store.ShardHandoverStats{ + "shard-1": { + PreviousExecutorLastHeartbeatTime: heartbeatTime.Add(-25 * time.Second), + HandoverType: types.HandoverTypeGRACEFUL, + }, + "shard-2": { + PreviousExecutorLastHeartbeatTime: heartbeatTime.Add(-30 * time.Second), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + }, + expectedDistributionLatency: ptr.ToPtr(15 * time.Second), + expectedHandoverLatencies: []*time.Duration{ptr.ToPtr(30 * time.Second), ptr.ToPtr(25 * time.Second)}, + }, + "multiple newly assigned shards with some handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + "shard-2": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: heartbeatTime.Add(-15 * time.Second), + ShardHandoverStats: map[string]store.ShardHandoverStats{ + "shard-1": { + PreviousExecutorLastHeartbeatTime: heartbeatTime.Add(-25 * time.Second), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + }, + expectedDistributionLatency: ptr.ToPtr(15 * time.Second), + expectedHandoverLatencies: []*time.Duration{ptr.ToPtr(25 * time.Second)}, + }, + "multiple newly assigned shards without handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "shard-1": {Status: types.AssignmentStatusREADY}, + "shard-2": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: heartbeatTime.Add(-15 * time.Second), + }, + expectedDistributionLatency: ptr.ToPtr(15 * time.Second), + expectedHandoverLatencies: nil, + }, + "newly assigned shard without handover stats": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{shardID: {Status: types.AssignmentStatusREADY}}, + LastUpdated: heartbeatTime.Add(-5 * time.Second), + ShardHandoverStats: map[string]store.ShardHandoverStats{}, + }, + expectedDistributionLatency: ptr.ToPtr(5 * time.Second), + expectedHandoverLatencies: nil, + }, + "nil handover stats with new assigned shard": { + previousHeartbeat: &store.HeartbeatState{ReportedShards: map[string]*types.ShardStatusReport{}}, + assignedState: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{shardID: {Status: types.AssignmentStatusREADY}}, + LastUpdated: heartbeatTime.Add(-5 * time.Second), + ShardHandoverStats: nil, + }, + expectedDistributionLatency: ptr.ToPtr(5 * time.Second), + expectedHandoverLatencies: nil, + }, + } { + t.Run(name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + metricsClient := &metricmocks.Client{} + metricsScope := &metricmocks.Scope{} + + if tc.expectedDistributionLatency != nil { + metricsClient.On("Scope", metrics.ShardDistributorHeartbeatScope).Return(metricsScope).Once() + metricsScope.On("Tagged", metrics.NamespaceTag(namespace)).Return(metricsScope).Once() + metricsScope.On("RecordHistogramDuration", metrics.ShardDistributorShardAssignmentDistributionLatency, *tc.expectedDistributionLatency).Once() + } + + if tc.expectedHandoverLatencies != nil { + metricsScope.On("Tagged", metrics.HandoverTypeTag(types.HandoverTypeGRACEFUL.String())).Return(metricsScope) + for _, expectedLatency := range tc.expectedHandoverLatencies { + metricsScope.On("RecordHistogramDuration", metrics.ShardDistributorShardHandoverLatency, *expectedLatency).Once() + } + } + + exec := &executor{metricsClient: metricsClient, logger: testlogger.New(t)} + + exec.emitShardAssignmentMetrics(namespace, heartbeatTime, tc.previousHeartbeat, tc.assignedState) + + metricsClient.AssertExpectations(t) + metricsScope.AssertExpectations(t) + }) + } +} diff --git a/service/sharddistributor/handler/handler.go b/service/sharddistributor/handler/handler.go index 3a29be5cee7..5884c12c2c3 100644 --- a/service/sharddistributor/handler/handler.go +++ b/service/sharddistributor/handler/handler.go @@ -119,24 +119,24 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get namespace state: %v", err)} } - var executor string + var executorID string minAssignedShards := math.MaxInt for assignedExecutor, assignment := range state.ShardAssignments { if len(assignment.AssignedShards) < minAssignedShards { minAssignedShards = len(assignment.AssignedShards) - executor = assignedExecutor + executorID = assignedExecutor } } // Assign the shard to the executor with the least assigned shards - err = h.storage.AssignShard(ctx, namespace, shardID, executor) + err = h.storage.AssignShard(ctx, namespace, shardID, executorID) if err != nil { return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to assign ephemeral shard: %v", err)} } return &types.GetShardOwnerResponse{ - Owner: executor, + Owner: executorID, Namespace: namespace, }, nil } diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 76abb573900..64451a40631 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -2,6 +2,7 @@ package process import ( "context" + "errors" "fmt" "maps" "math/rand" @@ -337,7 +338,6 @@ func (p *namespaceProcessor) rebalanceShards(ctx context.Context) (err error) { } func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoopScope metrics.Scope) (err error) { - namespaceState, err := p.shardStore.GetState(ctx, p.namespaceCfg.Name) if err != nil { return fmt.Errorf("get state: %w", err) @@ -378,8 +378,8 @@ func (p *namespaceProcessor) rebalanceShardsImpl(ctx context.Context, metricsLoo } p.addAssignmentsToNamespaceState(namespaceState, currentAssignments) - p.logger.Info("Applying new shard distribution.") + // Use the leader guard for the assign and delete operation. err = p.shardStore.AssignShards(ctx, p.namespaceCfg.Name, store.AssignShardsRequest{ NewState: namespaceState, @@ -473,27 +473,105 @@ func (*namespaceProcessor) updateAssignments(shardsToReassign []string, activeEx } func (p *namespaceProcessor) addAssignmentsToNamespaceState(namespaceState *store.NamespaceState, currentAssignments map[string][]string) { - newState := make(map[string]store.AssignedState) + newState := make(map[string]store.AssignedState, len(currentAssignments)) + for executorID, shards := range currentAssignments { assignedShardsMap := make(map[string]*types.ShardAssignment) + for _, shardID := range shards { assignedShardsMap[shardID] = &types.ShardAssignment{Status: types.AssignmentStatusREADY} } + modRevision := int64(0) // Should be 0 if we have not seen it yet if namespaceAssignments, ok := namespaceState.ShardAssignments[executorID]; ok { modRevision = namespaceAssignments.ModRevision } newState[executorID] = store.AssignedState{ - AssignedShards: assignedShardsMap, - LastUpdated: p.timeSource.Now().UTC(), - ModRevision: modRevision, + AssignedShards: assignedShardsMap, + LastUpdated: p.timeSource.Now().UTC(), + ModRevision: modRevision, + ShardHandoverStats: p.addHandoverStatsToExecutorAssignedState(namespaceState, executorID, shards), } } namespaceState.ShardAssignments = newState } +func (p *namespaceProcessor) addHandoverStatsToExecutorAssignedState( + namespaceState *store.NamespaceState, + executorID string, shardIDs []string, +) map[string]store.ShardHandoverStats { + var newStats = make(map[string]store.ShardHandoverStats) + + // Prepare handover stats for each shard + for _, shardID := range shardIDs { + handoverStats := p.newHandoverStats(namespaceState, shardID, executorID) + + // If there is no handover (first assignment), we skip adding handover stats + if handoverStats != nil { + newStats[shardID] = *handoverStats + } + } + + return newStats +} + +// newHandoverStats creates shard handover statistics if a handover occurred. +func (p *namespaceProcessor) newHandoverStats( + namespaceState *store.NamespaceState, + shardID string, + newExecutorID string, +) *store.ShardHandoverStats { + logger := p.logger.WithTags( + tag.ShardNamespace(p.namespaceCfg.Name), + tag.ShardKey(shardID), + tag.ShardExecutor(newExecutorID), + ) + + // Fetch previous shard owners from cache + prevExecutor, err := p.shardStore.GetShardOwner(context.Background(), p.namespaceCfg.Name, shardID) + if err != nil && !errors.Is(err, store.ErrShardNotFound) { + logger.Warn("failed to get shard owner for shard statistic", tag.Error(err)) + return nil + } + // previous executor is not found in cache + // meaning this is the first assignment of the shard + // so we skip updating handover stats + if prevExecutor == nil { + return nil + } + + // No change in assignment + // meaning no handover occurred + // skip updating handover stats + if prevExecutor.ExecutorID == newExecutorID { + return nil + } + + // previous executor heartbeat is not found in namespace state + // meaning the executor has already been cleaned up + // skip updating handover stats + prevExecutorHeartbeat, ok := namespaceState.Executors[prevExecutor.ExecutorID] + if !ok { + logger.Info("previous executor heartbeat not found, skipping handover stats") + return nil + } + + handoverType := types.HandoverTypeEMERGENCY + + // Consider it a graceful handover if the previous executor was in DRAINING or DRAINED status + // otherwise, it's an emergency handover + if prevExecutorHeartbeat.Status == types.ExecutorStatusDRAINING || prevExecutorHeartbeat.Status == types.ExecutorStatusDRAINED { + handoverType = types.HandoverTypeGRACEFUL + } + + return &store.ShardHandoverStats{ + HandoverType: handoverType, + PreviousExecutorLastHeartbeatTime: prevExecutorHeartbeat.LastHeartbeat, + } +} + func (*namespaceProcessor) getActiveExecutors(namespaceState *store.NamespaceState, staleExecutors map[string]int64) []string { var activeExecutors []string for id, state := range namespaceState.Executors { diff --git a/service/sharddistributor/leader/process/processor_test.go b/service/sharddistributor/leader/process/processor_test.go index f023bb2d5b5..129043d0f15 100644 --- a/service/sharddistributor/leader/process/processor_test.go +++ b/service/sharddistributor/leader/process/processor_test.go @@ -92,12 +92,16 @@ func TestRebalanceShards_InitialDistribution(t *testing.T) { "exec-2": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}, } mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{Executors: state, GlobalRevision: 1}, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "0").Return(nil, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "1").Return(nil, nil) mocks.election.EXPECT().Guard().Return(store.NopGuard()) mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error { assert.Len(t, request.NewState.ShardAssignments, 2) assert.Len(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, 1) assert.Len(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, 1) + assert.Lenf(t, request.NewState.ShardAssignments["exec-1"].ShardHandoverStats, 0, "no handover stats should be present on initial assignment") + assert.Lenf(t, request.NewState.ShardAssignments["exec-2"].ShardHandoverStats, 0, "no handover stats should be present on initial assignment") return nil }, ) @@ -130,11 +134,15 @@ func TestRebalanceShards_ExecutorRemoved(t *testing.T) { ShardAssignments: assignments, GlobalRevision: 1, }, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "0").Return(&store.ShardOwner{ExecutorID: "exec-2"}, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "1").Return(&store.ShardOwner{ExecutorID: "exec-1"}, nil) mocks.election.EXPECT().Guard().Return(store.NopGuard()) mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error { assert.Len(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, 2) assert.Len(t, request.NewState.ShardAssignments["exec-2"].AssignedShards, 0) + assert.Lenf(t, request.NewState.ShardAssignments["exec-1"].ShardHandoverStats, 1, "only shard 0 should have handover stats") + assert.Lenf(t, request.NewState.ShardAssignments["exec-2"].ShardHandoverStats, 0, "no handover stats should be present for drained executor") return nil }, ) @@ -172,11 +180,14 @@ func TestRebalanceShards_ExecutorStale(t *testing.T) { ShardAssignments: assignments, GlobalRevision: 1, }, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "0").Return(&store.ShardOwner{ExecutorID: "exec-1"}, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "1").Return(&store.ShardOwner{ExecutorID: "exec-2"}, nil) mocks.election.EXPECT().Guard().Return(store.NopGuard()) mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).DoAndReturn( func(_ context.Context, _ string, request store.AssignShardsRequest, _ store.GuardFunc) error { assert.Len(t, request.NewState.ShardAssignments, 1) assert.Len(t, request.NewState.ShardAssignments["exec-1"].AssignedShards, 2) + assert.Len(t, request.NewState.ShardAssignments["exec-1"].ShardHandoverStats, 1, "only shard 1 should have handover stats") assert.Equal(t, request.ExecutorsToDelete, map[string]int64{"exec-2": 1}) return nil }, @@ -312,6 +323,8 @@ func TestRebalance_StoreErrors(t *testing.T) { Executors: map[string]store.HeartbeatState{"e": {Status: types.ExecutorStatusACTIVE, LastHeartbeat: now}}, GlobalRevision: 1, }, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "0").Return(nil, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "1").Return(nil, nil) mocks.election.EXPECT().Guard().Return(store.NopGuard()) mocks.store.EXPECT().AssignShards(gomock.Any(), mocks.cfg.Name, gomock.Any(), gomock.Any()).Return(expectedErr) err = processor.rebalanceShards(context.Background()) @@ -431,6 +444,8 @@ func TestRebalanceShards_WithUnassignedShards(t *testing.T) { }, }, } + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "0").Return(nil, nil) + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, "1").Return(nil, nil) mocks.store.EXPECT().GetState(gomock.Any(), mocks.cfg.Name).Return(&store.NamespaceState{ Executors: heartbeats, ShardAssignments: assignments, @@ -589,3 +604,211 @@ func TestAssignShardsToEmptyExecutors(t *testing.T) { }) } } + +func TestNewHandoverStats(t *testing.T) { + mocks := setupProcessorTest(t, config.NamespaceTypeFixed) + defer mocks.ctrl.Finish() + processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) + + now := time.Now().UTC() + shardID := "shard-1" + newExecutorID := "exec-new" + + type testCase struct { + name string + getOwner *store.ShardOwner + getOwnerErr error + executors map[string]store.HeartbeatState + expectShardStats *store.ShardHandoverStats // nil means expect nil result + } + + testCases := []testCase{ + { + name: "error other than shard not found -> stat without handover", + getOwner: nil, + getOwnerErr: errors.New("random error"), + executors: map[string]store.HeartbeatState{}, + expectShardStats: nil, + }, + { + name: "ErrShardNotFound -> stat without handover", + getOwner: nil, + getOwnerErr: store.ErrShardNotFound, + executors: map[string]store.HeartbeatState{}, + expectShardStats: nil, + }, + { + name: "same executor as previous -> nil", + getOwner: &store.ShardOwner{ExecutorID: newExecutorID}, + getOwnerErr: nil, + executors: map[string]store.HeartbeatState{ + newExecutorID: { + Status: types.ExecutorStatusACTIVE, + LastHeartbeat: now.Add(-10 * time.Second)}, + }, + expectShardStats: nil, + }, + { + name: "prev executor different but heartbeat missing -> no handover", + getOwner: &store.ShardOwner{ExecutorID: "old-exec"}, + getOwnerErr: nil, + executors: map[string]store.HeartbeatState{}, + expectShardStats: nil, + }, + { + name: "prev executor ACTIVE -> emergency handover", + getOwner: &store.ShardOwner{ExecutorID: "old-active"}, + getOwnerErr: nil, + executors: map[string]store.HeartbeatState{ + "old-active": { + Status: types.ExecutorStatusACTIVE, + LastHeartbeat: now.Add(-10 * time.Second), + }, + }, + expectShardStats: &store.ShardHandoverStats{ + HandoverType: types.HandoverTypeEMERGENCY, + PreviousExecutorLastHeartbeatTime: now.Add(-10 * time.Second), + }, + }, + { + name: "prev executor DRAINING -> graceful handover", + getOwner: &store.ShardOwner{ExecutorID: "old-draining"}, + getOwnerErr: nil, + executors: map[string]store.HeartbeatState{ + "old-draining": { + Status: types.ExecutorStatusDRAINING, + LastHeartbeat: now.Add(-10 * time.Second), + }, + }, + expectShardStats: &store.ShardHandoverStats{ + HandoverType: types.HandoverTypeGRACEFUL, + PreviousExecutorLastHeartbeatTime: now.Add(-10 * time.Second), + }, + }, + { + name: "prev executor DRAINED -> graceful handover", + getOwner: &store.ShardOwner{ExecutorID: "old-drained"}, + getOwnerErr: nil, + executors: map[string]store.HeartbeatState{ + "old-drained": { + Status: types.ExecutorStatusDRAINING, + LastHeartbeat: now.Add(-10 * time.Second), + }, + }, + expectShardStats: &store.ShardHandoverStats{ + HandoverType: types.HandoverTypeGRACEFUL, + PreviousExecutorLastHeartbeatTime: now.Add(-10 * time.Second), + }, + }, + } + + for _, tc := range testCases { + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, shardID).Return(tc.getOwner, tc.getOwnerErr) + t.Run(tc.name, func(t *testing.T) { + stat := processor.newHandoverStats(&store.NamespaceState{Executors: tc.executors}, shardID, newExecutorID) + if tc.expectShardStats == nil { + require.Nil(t, stat) + return + } + require.NotNil(t, stat) + require.Equal(t, tc.expectShardStats, stat) + }) + } +} +func TestAddHandoverStatsToExecutorAssignedState(t *testing.T) { + + now := time.Now().UTC() + executorID := "exec-1" + shardIDs := []string{"shard-1", "shard-2"} + + for name, tc := range map[string]struct { + name string + executors map[string]store.HeartbeatState + + getOwners map[string]*store.ShardOwner + getOwnerErrs map[string]error + + expected map[string]store.ShardHandoverStats + }{ + "no previous owner for both shards": { + getOwners: map[string]*store.ShardOwner{"shard-1": nil, "shard-2": nil}, + getOwnerErrs: map[string]error{"shard-1": store.ErrShardNotFound, "shard-2": store.ErrShardNotFound}, + executors: map[string]store.HeartbeatState{}, + expected: map[string]store.ShardHandoverStats{}, + }, + "emergency handover for shard-1, no handover for shard-2": { + getOwners: map[string]*store.ShardOwner{ + "shard-1": {ExecutorID: "old-active"}, + "shard-2": nil, + }, + getOwnerErrs: map[string]error{ + "shard-1": nil, + "shard-2": store.ErrShardNotFound, + }, + executors: map[string]store.HeartbeatState{ + "old-active": { + Status: types.ExecutorStatusACTIVE, + LastHeartbeat: now.Add(-10 * time.Second), + }, + }, + expected: map[string]store.ShardHandoverStats{ + "shard-1": { + HandoverType: types.HandoverTypeEMERGENCY, + PreviousExecutorLastHeartbeatTime: now.Add(-10 * time.Second), + }, + }, + }, + "graceful handover for shard-1": { + getOwners: map[string]*store.ShardOwner{ + "shard-1": {ExecutorID: "old-draining"}, + "shard-2": nil, + }, + getOwnerErrs: map[string]error{ + "shard-1": nil, + }, + executors: map[string]store.HeartbeatState{ + "old-draining": { + Status: types.ExecutorStatusDRAINING, + LastHeartbeat: now.Add(-20 * time.Second), + }, + }, + expected: map[string]store.ShardHandoverStats{ + "shard-1": { + HandoverType: types.HandoverTypeGRACEFUL, + PreviousExecutorLastHeartbeatTime: now.Add(-20 * time.Second), + }, + }, + }, + "same executor as previous, no handover": { + getOwners: map[string]*store.ShardOwner{ + "shard-1": {ExecutorID: executorID}, + "shard-2": nil, + }, + getOwnerErrs: map[string]error{ + "shard-1": nil, + }, + executors: map[string]store.HeartbeatState{ + executorID: { + Status: types.ExecutorStatusACTIVE, + LastHeartbeat: now, + }, + }, + expected: map[string]store.ShardHandoverStats{}, + }, + } { + t.Run(name, func(t *testing.T) { + mocks := setupProcessorTest(t, config.NamespaceTypeFixed) + defer mocks.ctrl.Finish() + processor := mocks.factory.CreateProcessor(mocks.cfg, mocks.store, mocks.election).(*namespaceProcessor) + + for _, shardID := range shardIDs { + mocks.store.EXPECT().GetShardOwner(gomock.Any(), mocks.cfg.Name, shardID).Return(tc.getOwners[shardID], tc.getOwnerErrs[shardID]).AnyTimes() + } + namespaceState := &store.NamespaceState{ + Executors: tc.executors, + } + stats := processor.addHandoverStatsToExecutorAssignedState(namespaceState, executorID, shardIDs) + assert.Equal(t, tc.expected, stats) + }) + } +} diff --git a/service/sharddistributor/sharddistributorfx/sharddistributorfx.go b/service/sharddistributor/sharddistributorfx/sharddistributorfx.go index 9e25a683d51..b59e608b855 100644 --- a/service/sharddistributor/sharddistributorfx/sharddistributorfx.go +++ b/service/sharddistributor/sharddistributorfx/sharddistributorfx.go @@ -73,7 +73,7 @@ func registerHandlers(params registerHandlersParams) error { wrappedHandler := metered.NewMetricsHandler(rawHandler, params.Logger, params.MetricsClient) migrationConfig := config.NewMigrationConfig(params.DynamicCollection) - executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg, migrationConfig) + executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg, migrationConfig, params.MetricsClient) wrappedExecutor := metered.NewExecutorMetricsExecutor(executorHandler, params.Logger, params.MetricsClient) grpcHandler := grpc.NewGRPCHandler(wrappedHandler) diff --git a/service/sharddistributor/store/etcd/etcdtypes/state.go b/service/sharddistributor/store/etcd/etcdtypes/state.go index cf9890cafc6..ee1e0d6e0ca 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/state.go +++ b/service/sharddistributor/store/etcd/etcdtypes/state.go @@ -6,9 +6,10 @@ import ( ) type AssignedState struct { - AssignedShards map[string]*types.ShardAssignment `json:"assigned_shards"` - LastUpdated Time `json:"last_updated"` - ModRevision int64 `json:"mod_revision"` + AssignedShards map[string]*types.ShardAssignment `json:"assigned_shards"` + ShardHandoverStats map[string]ShardHandoverStats `json:"shard_handover_stats,omitempty"` + LastUpdated Time `json:"last_updated"` + ModRevision int64 `json:"mod_revision"` } // ToAssignedState converts the current AssignedState to store.AssignedState. @@ -18,9 +19,10 @@ func (s *AssignedState) ToAssignedState() *store.AssignedState { } return &store.AssignedState{ - AssignedShards: s.AssignedShards, - LastUpdated: s.LastUpdated.ToTime(), - ModRevision: s.ModRevision, + AssignedShards: s.AssignedShards, + ShardHandoverStats: convertMap(s.ShardHandoverStats, ToShardHandoverStats), + LastUpdated: s.LastUpdated.ToTime(), + ModRevision: s.ModRevision, } } @@ -31,9 +33,39 @@ func FromAssignedState(src *store.AssignedState) *AssignedState { } return &AssignedState{ - AssignedShards: src.AssignedShards, - LastUpdated: Time(src.LastUpdated), - ModRevision: src.ModRevision, + AssignedShards: src.AssignedShards, + LastUpdated: Time(src.LastUpdated), + ShardHandoverStats: convertMap(src.ShardHandoverStats, FromShardHandoverStats), + ModRevision: src.ModRevision, + } +} + +type ShardHandoverStats struct { + PreviousExecutorLastHeartbeatTime Time `json:"previous_executor_last_heartbeat_time"` + HandoverType types.HandoverType `json:"handover_type"` +} + +// FromShardHandoverStats creates an ShardHandoverStats from a store.ShardHandoverStats. +func FromShardHandoverStats(src *store.ShardHandoverStats) *ShardHandoverStats { + if src == nil { + return nil + } + + return &ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: Time(src.PreviousExecutorLastHeartbeatTime), + HandoverType: src.HandoverType, + } +} + +// ToShardHandoverStats converts the current ShardHandoverStats to store.ShardHandoverStats. +func ToShardHandoverStats(src *ShardHandoverStats) *store.ShardHandoverStats { + if src == nil { + return nil + } + + return &store.ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: src.PreviousExecutorLastHeartbeatTime.ToTime(), + HandoverType: src.HandoverType, } } @@ -68,3 +100,15 @@ func FromShardStatistics(src *store.ShardStatistics) *ShardStatistics { LastMoveTime: Time(src.LastMoveTime), } } + +// ConvertMap converts a map[K]SrcType to map[K]DstType using a provided converter function. +func convertMap[K comparable, SrcType any, DstType any](src map[K]SrcType, converter func(*SrcType) *DstType) map[K]DstType { + if src == nil { + return nil + } + dst := make(map[K]DstType, len(src)) + for k, v := range src { + dst[k] = *converter(&v) + } + return dst +} diff --git a/service/sharddistributor/store/etcd/etcdtypes/state_test.go b/service/sharddistributor/store/etcd/etcdtypes/state_test.go index 14f12e1dce1..37490c06cb5 100644 --- a/service/sharddistributor/store/etcd/etcdtypes/state_test.go +++ b/service/sharddistributor/store/etcd/etcdtypes/state_test.go @@ -45,6 +45,34 @@ func TestAssignedState_ToAssignedState(t *testing.T) { ModRevision: 42, }, }, + "success with map": { + input: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + ShardHandoverStats: map[string]ShardHandoverStats{ + "1": { + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + ModRevision: 42, + }, + expect: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + ShardHandoverStats: map[string]store.ShardHandoverStats{ + "1": { + PreviousExecutorLastHeartbeatTime: time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + LastUpdated: time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC), + ModRevision: 42, + }, + }, } for name, c := range tests { @@ -91,6 +119,34 @@ func TestAssignedState_FromAssignedState(t *testing.T) { ModRevision: 77, }, }, + "with map": { + input: &store.AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "9": {Status: types.AssignmentStatusREADY}, + }, + ShardHandoverStats: map[string]store.ShardHandoverStats{ + "9": { + PreviousExecutorLastHeartbeatTime: time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + LastUpdated: time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC), + ModRevision: 77, + }, + expect: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "9": {Status: types.AssignmentStatusREADY}, + }, + ShardHandoverStats: map[string]ShardHandoverStats{ + "9": { + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 13, 0, 0, 987654321, time.UTC)), + ModRevision: 77, + }, + }, } for name, c := range tests { @@ -112,29 +168,62 @@ func TestAssignedState_FromAssignedState(t *testing.T) { } func TestAssignedState_JSONMarshalling(t *testing.T) { - const jsonStr = `{"assigned_shards":{"1":{"status":"AssignmentStatusREADY"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}` - - state := &AssignedState{ - AssignedShards: map[string]*types.ShardAssignment{ - "1": {Status: types.AssignmentStatusREADY}, + tests := map[string]struct { + input *AssignedState + jsonStr string + }{ + "simple": { + input: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + ModRevision: 42, + }, + jsonStr: `{"assigned_shards":{"1":{"status":"AssignmentStatusREADY"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}`, + }, + "with map": { + input: &AssignedState{ + AssignedShards: map[string]*types.ShardAssignment{ + "1": {Status: types.AssignmentStatusREADY}, + }, + ShardHandoverStats: map[string]ShardHandoverStats{ + "1": { + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), + ModRevision: 42, + }, + jsonStr: `{"assigned_shards":{"1":{"status":"AssignmentStatusREADY"}},"shard_handover_stats":{"1":{"previous_executor_last_heartbeat_time":"2025-11-18T12:00:00.123456789Z","handover_type":"HandoverTypeGRACEFUL"}},"last_updated":"2025-11-18T12:00:00.123456789Z","mod_revision":42}`, }, - LastUpdated: Time(time.Date(2025, 11, 18, 12, 0, 0, 123456789, time.UTC)), - ModRevision: 42, } - // Marshal to JSON - b, err := json.Marshal(state) - require.NoError(t, err) - require.JSONEq(t, jsonStr, string(b)) + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + // Marshal to JSON + b, err := json.Marshal(tc.input) + require.NoError(t, err) + require.JSONEq(t, tc.jsonStr, string(b)) - // Unmarshal from JSON - var unmarshalled AssignedState - err = json.Unmarshal([]byte(jsonStr), &unmarshalled) - require.NoError(t, err) - require.Equal(t, state.AssignedShards["1"].Status, unmarshalled.AssignedShards["1"].Status) - require.Equal(t, time.Time(state.LastUpdated).UnixNano(), time.Time(unmarshalled.LastUpdated).UnixNano()) - require.Equal(t, state.ModRevision, unmarshalled.ModRevision) + // Unmarshal from JSON + var unmarshalled AssignedState + err = json.Unmarshal([]byte(tc.jsonStr), &unmarshalled) + require.NoError(t, err) + require.Equal(t, tc.input.AssignedShards["1"].Status, unmarshalled.AssignedShards["1"].Status) + require.Equal(t, time.Time(tc.input.LastUpdated).UnixNano(), time.Time(unmarshalled.LastUpdated).UnixNano()) + require.Equal(t, tc.input.ModRevision, unmarshalled.ModRevision) + if tc.input.ShardHandoverStats != nil { + require.NotNil(t, unmarshalled.ShardHandoverStats) + require.Equal(t, tc.input.ShardHandoverStats["1"].HandoverType, unmarshalled.ShardHandoverStats["1"].HandoverType) + require.Equal(t, time.Time(tc.input.ShardHandoverStats["1"].PreviousExecutorLastHeartbeatTime).UnixNano(), + time.Time(unmarshalled.ShardHandoverStats["1"].PreviousExecutorLastHeartbeatTime).UnixNano()) + } + }) + } } + func TestShardStatistics_FieldNumberMatched(t *testing.T) { require.Equal(t, reflect.TypeOf(ShardStatistics{}).NumField(), @@ -241,3 +330,102 @@ func TestShardStatistics_JSONMarshalling(t *testing.T) { require.Equal(t, time.Time(state.LastUpdateTime).UnixNano(), time.Time(unmarshalled.LastUpdateTime).UnixNano()) require.Equal(t, time.Time(state.LastMoveTime).UnixNano(), time.Time(unmarshalled.LastMoveTime).UnixNano()) } + +func TestShardHandoverStats_FieldNumberMatched(t *testing.T) { + require.Equal(t, + reflect.TypeOf(ShardHandoverStats{}).NumField(), + reflect.TypeOf(store.ShardHandoverStats{}).NumField(), + "ShardHandoverStats field count mismatch with store.ShardHandoverStats; ensure conversion is updated", + ) +} + +func TestShardHandoverStats_ToShardHandoverStats(t *testing.T) { + tests := map[string]struct { + input *ShardHandoverStats + expect *store.ShardHandoverStats + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 18, 0, 0, 555555555, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + }, + expect: &store.ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: time.Date(2025, 11, 18, 18, 0, 0, 555555555, time.UTC), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := ToShardHandoverStats(c.input) + if c.expect == nil { + require.Nil(t, got) + return + } + require.NotNil(t, got) + require.Equal(t, c.input.HandoverType, got.HandoverType) + require.Equal(t, time.Time(c.input.PreviousExecutorLastHeartbeatTime).UnixNano(), got.PreviousExecutorLastHeartbeatTime.UnixNano()) + }) + } +} + +func TestShardHandoverStats_FromShardHandoverStats(t *testing.T) { + tests := map[string]struct { + input *store.ShardHandoverStats + expect *ShardHandoverStats + }{ + "nil": { + input: nil, + expect: nil, + }, + "success": { + input: &store.ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: time.Date(2025, 11, 18, 19, 0, 0, 666666666, time.UTC), + HandoverType: types.HandoverTypeGRACEFUL, + }, + expect: &ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 19, 0, 0, 666666666, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + }, + }, + } + + for name, c := range tests { + t.Run(name, func(t *testing.T) { + got := FromShardHandoverStats(c.input) + if c.expect == nil { + require.Nil(t, got) + return + } + require.NotNil(t, got) + require.Equal(t, c.input.HandoverType, got.HandoverType) + require.Equal(t, c.input.PreviousExecutorLastHeartbeatTime.UnixNano(), time.Time(got.PreviousExecutorLastHeartbeatTime).UnixNano()) + }) + } +} + +func TestShardHandoverStats_JSONMarshalling(t *testing.T) { + const jsonStr = `{"previous_executor_last_heartbeat_time":"2025-11-18T20:00:00.777777777Z","handover_type":"HandoverTypeGRACEFUL"}` + + stats := &ShardHandoverStats{ + PreviousExecutorLastHeartbeatTime: Time(time.Date(2025, 11, 18, 20, 0, 0, 777777777, time.UTC)), + HandoverType: types.HandoverTypeGRACEFUL, + } + + // Marshal to JSON + b, err := json.Marshal(stats) + require.NoError(t, err) + require.JSONEq(t, jsonStr, string(b)) + + // Unmarshal from JSON + var unmarshalled ShardHandoverStats + err = json.Unmarshal([]byte(jsonStr), &unmarshalled) + require.NoError(t, err) + require.Equal(t, stats.HandoverType, unmarshalled.HandoverType) + require.Equal(t, time.Time(stats.PreviousExecutorLastHeartbeatTime).UnixNano(), time.Time(unmarshalled.PreviousExecutorLastHeartbeatTime).UnixNano()) +} diff --git a/service/sharddistributor/store/state.go b/service/sharddistributor/store/state.go index 25f6bb3ee07..ffbbe13820f 100644 --- a/service/sharddistributor/store/state.go +++ b/service/sharddistributor/store/state.go @@ -15,17 +15,43 @@ type HeartbeatState struct { } type AssignedState struct { - // AssignedShards is the map of shard ID to shard assignment + // AssignedShards holds the current assignment of shards to this executor + // Key: ShardID AssignedShards map[string]*types.ShardAssignment - // LastUpdated is the time we last updated this assignment + // ShardHandoverStats holds handover statistics of all shards experienced handovers to this executor + // Mostly all shards in AssignedShards will have corresponding entries here + // But if a shard was assigned but never had a handover (e.g., first assignment), it does not have an entry here + // Key: ShardID + ShardHandoverStats map[string]ShardHandoverStats + + // LastUpdated is the time when this assignment state was last updated + // Used to calculate assignment distribution latency for newly assigned shards LastUpdated time.Time ModRevision int64 } +// ShardHandoverStats holds statistics related to the latest handover of a shard +type ShardHandoverStats struct { + // PreviousExecutorLastHeartbeatTime is the last heartbeat time received + // from the previous executor before the shard was reassigned. + PreviousExecutorLastHeartbeatTime time.Time + + // HandoverType indicates the type of handover that occurred during the last shard reassignment. + HandoverType types.HandoverType +} + type NamespaceState struct { - Executors map[string]HeartbeatState - ShardStats map[string]ShardStatistics + // Executors holds the heartbeat states of all executors in the namespace. + // Key: ExecutorID + Executors map[string]HeartbeatState + + // ShardStats holds the statistics of all shards in the namespace. + // Key: ShardID + ShardStats map[string]ShardStatistics + + // ShardAssignments holds the assignment states of all shards in the namespace. + // Key: ExecutorID ShardAssignments map[string]AssignedState GlobalRevision int64 } diff --git a/service/sharddistributor/store/store.go b/service/sharddistributor/store/store.go index 1320a14a4fd..f2b2d3d0774 100644 --- a/service/sharddistributor/store/store.go +++ b/service/sharddistributor/store/store.go @@ -57,18 +57,29 @@ type AssignShardsRequest struct { ExecutorsToDelete map[string]int64 } -// Store is a composite interface that combines all storage capabilities. type Store interface { + // GetState retrieves the current state of a namespace, including executors, + // shard statistics, and shard assignments. GetState(ctx context.Context, namespace string) (*NamespaceState, error) + + // AssignShards assigns multiple shards to executors within a namespace. + // It also updates shard statistics and deletes specified executors + // The operation is atomic and guarded by the provided GuardFunc. AssignShards(ctx context.Context, namespace string, request AssignShardsRequest, guard GuardFunc) error + + // AssignShard assigns a single shard to an executor within a namespace. + AssignShard(ctx context.Context, namespace string, shardID string, executorID string) error + Subscribe(ctx context.Context, namespace string) (<-chan int64, error) DeleteExecutors(ctx context.Context, namespace string, executorIDs []string, guard GuardFunc) error - DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard GuardFunc) error + // GetShardOwner retrieves the owner of a specific shard within a namespace. + // It returns ErrShardNotFound if the shard does not exist. GetShardOwner(ctx context.Context, namespace, shardID string) (*ShardOwner, error) SubscribeToAssignmentChanges(ctx context.Context, namespace string) (<-chan map[*ShardOwner][]string, func(), error) - AssignShard(ctx context.Context, namespace, shardID, executorID string) error GetHeartbeat(ctx context.Context, namespace string, executorID string) (*HeartbeatState, *AssignedState, error) RecordHeartbeat(ctx context.Context, namespace, executorID string, state HeartbeatState) error + + DeleteShardStats(ctx context.Context, namespace string, shardIDs []string, guard GuardFunc) error }