diff --git a/service/sharddistributor/config/config.go b/service/sharddistributor/config/config.go index 46caf62fb4b..9b91141363e 100644 --- a/service/sharddistributor/config/config.go +++ b/service/sharddistributor/config/config.go @@ -28,6 +28,7 @@ import ( "github.com/uber/cadence/common/config" "github.com/uber/cadence/common/dynamicconfig" "github.com/uber/cadence/common/dynamicconfig/dynamicproperties" + "github.com/uber/cadence/common/types" ) type ( @@ -86,6 +87,32 @@ const ( NamespaceTypeEphemeral = "ephemeral" ) +const ( + MigrationModeINVALID = "invalid" + MigrationModeLOCALPASSTHROUGH = "local_pass" + MigrationModeLOCALPASSTHROUGHSHADOW = "local_pass_shadow" + MigrationModeDISTRIBUTEDPASSTHROUGH = "distributed_pass" + MigrationModeONBOARDED = "onboarded" +) + +var configMode = map[string]types.MigrationMode{ + MigrationModeINVALID: types.MigrationModeINVALID, + MigrationModeLOCALPASSTHROUGH: types.MigrationModeLOCALPASSTHROUGH, + MigrationModeLOCALPASSTHROUGHSHADOW: types.MigrationModeLOCALPASSTHROUGHSHADOW, + MigrationModeDISTRIBUTEDPASSTHROUGH: types.MigrationModeDISTRIBUTEDPASSTHROUGH, + MigrationModeONBOARDED: types.MigrationModeONBOARDED, +} + +func (s *ShardDistribution) GetMigrationMode(namespace string) types.MigrationMode { + for _, ns := range s.Namespaces { + if ns.Name == namespace { + return configMode[ns.Type] + } + } + // TODO in the dynamic configuration I will setup a default value + return configMode[MigrationModeONBOARDED] +} + // NewConfig returns new service config with default values func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config { return &Config{ diff --git a/service/sharddistributor/handler/executor.go b/service/sharddistributor/handler/executor.go index 9bc49b1bf8f..72abfd989e2 100644 --- a/service/sharddistributor/handler/executor.go +++ b/service/sharddistributor/handler/executor.go @@ -7,7 +7,10 @@ import ( "time" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log" + "github.com/uber/cadence/common/log/tag" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" ) @@ -16,16 +19,23 @@ const ( ) type executor struct { - timeSource clock.TimeSource - storage store.Store + logger log.Logger + timeSource clock.TimeSource + storage store.Store + shardDistributionCfg config.ShardDistribution } -func NewExecutorHandler(storage store.Store, +func NewExecutorHandler( + logger log.Logger, + storage store.Store, timeSource clock.TimeSource, + shardDistributionCfg config.ShardDistribution, ) Executor { return &executor{ - timeSource: timeSource, - storage: storage, + logger: logger, + timeSource: timeSource, + storage: storage, + shardDistributionCfg: shardDistributionCfg, } } @@ -38,12 +48,29 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe now := h.timeSource.Now().UTC() + mode := h.shardDistributionCfg.GetMigrationMode(request.Namespace) + + switch mode { + case types.MigrationModeINVALID: + h.logger.Warn("Migration mode is invalid", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID)) + return nil, fmt.Errorf("migration mode is invalid") + case types.MigrationModeLOCALPASSTHROUGH: + h.logger.Warn("Migration mode is local passthrough, no calls to heartbeat allowed", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID)) + return nil, fmt.Errorf("migration mode is local passthrough") + // From SD perspective the behaviour is the same + case types.MigrationModeLOCALPASSTHROUGHSHADOW, types.MigrationModeDISTRIBUTEDPASSTHROUGH: + assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request, previousHeartbeat, assignedShards) + if err != nil { + return nil, err + } + } + // If the state has changed we need to update heartbeat data. // Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate. - if previousHeartbeat != nil && request.Status == previousHeartbeat.Status { + if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED { lastHeartbeatTime := time.Unix(previousHeartbeat.LastHeartbeat, 0) if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate { - return _convertResponse(assignedShards), nil + return _convertResponse(assignedShards, mode), nil } } @@ -58,14 +85,56 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe return nil, fmt.Errorf("record heartbeat: %w", err) } - return _convertResponse(assignedShards), nil + return _convertResponse(assignedShards, mode), nil +} + +// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat +func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest, previousHeartbeat *store.HeartbeatState, previousAssignedShards *store.AssignedState) (*store.AssignedState, error) { + changeInShardAssignment := false + assignedShards := previousAssignedShards + // Check if the shards of the new assignment exists and have the same status in the old assignment + for shardAssigned, status := range request.GetShardStatusReports() { + shardStatusPreviousHeartbeat, ok := previousHeartbeat.ReportedShards[shardAssigned] + if !ok { + changeInShardAssignment = true + break + } + if status.GetStatus() != shardStatusPreviousHeartbeat.GetStatus() { + changeInShardAssignment = true + } + } + // Check if some shards from previous assignment should be deleted + if len(request.GetShardStatusReports()) != len(previousHeartbeat.ReportedShards) { + changeInShardAssignment = true + } + if changeInShardAssignment { + assignedShards = &store.AssignedState{ + AssignedShards: make(map[string]*types.ShardAssignment), + LastUpdated: h.timeSource.Now().Unix(), + ModRevision: previousAssignedShards.ModRevision, + } + err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, nil) + if err != nil { + return nil, fmt.Errorf("delete executors: %w", err) + } + for shard := range request.GetShardStatusReports() { + err = h.storage.AssignShard(ctx, request.GetNamespace(), request.GetExecutorID(), shard) + assignedShards.AssignedShards[shard] = &types.ShardAssignment{ + Status: types.AssignmentStatusREADY, + } + return nil, fmt.Errorf("assign shard: %w", err) + } + } + + return assignedShards, nil } -func _convertResponse(shards *store.AssignedState) *types.ExecutorHeartbeatResponse { +func _convertResponse(shards *store.AssignedState, mode types.MigrationMode) *types.ExecutorHeartbeatResponse { res := &types.ExecutorHeartbeatResponse{} if shards == nil { return res } res.ShardAssignments = shards.AssignedShards + res.MigrationMode = mode return res } diff --git a/service/sharddistributor/handler/executor_test.go b/service/sharddistributor/handler/executor_test.go index 17b373dbf3e..bfa17a2cba0 100644 --- a/service/sharddistributor/handler/executor_test.go +++ b/service/sharddistributor/handler/executor_test.go @@ -10,7 +10,9 @@ import ( "go.uber.org/mock/gomock" "github.com/uber/cadence/common/clock" + "github.com/uber/cadence/common/log/testlogger" "github.com/uber/cadence/common/types" + "github.com/uber/cadence/service/sharddistributor/config" "github.com/uber/cadence/service/sharddistributor/store" ) @@ -25,7 +27,8 @@ func TestHeartbeat(t *testing.T) { ctrl := gomock.NewController(t) mockStore := store.NewMockStore(ctrl) mockTimeSource := clock.NewMockedTimeSourceAt(now) - handler := NewExecutorHandler(mockStore, mockTimeSource) + shardDistributionCfg := config.ShardDistribution{} + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -48,7 +51,8 @@ func TestHeartbeat(t *testing.T) { ctrl := gomock.NewController(t) mockStore := store.NewMockStore(ctrl) mockTimeSource := clock.NewMockedTimeSourceAt(now) - handler := NewExecutorHandler(mockStore, mockTimeSource) + shardDistributionCfg := config.ShardDistribution{} + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -72,7 +76,8 @@ func TestHeartbeat(t *testing.T) { ctrl := gomock.NewController(t) mockStore := store.NewMockStore(ctrl) mockTimeSource := clock.NewMockedTimeSourceAt(now) - handler := NewExecutorHandler(mockStore, mockTimeSource) + shardDistributionCfg := config.ShardDistribution{} + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -103,7 +108,8 @@ func TestHeartbeat(t *testing.T) { ctrl := gomock.NewController(t) mockStore := store.NewMockStore(ctrl) mockTimeSource := clock.NewMockedTimeSourceAt(now) - handler := NewExecutorHandler(mockStore, mockTimeSource) + shardDistributionCfg := config.ShardDistribution{} + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -131,7 +137,8 @@ func TestHeartbeat(t *testing.T) { ctrl := gomock.NewController(t) mockStore := store.NewMockStore(ctrl) mockTimeSource := clock.NewMockedTimeSource() - handler := NewExecutorHandler(mockStore, mockTimeSource) + shardDistributionCfg := config.ShardDistribution{} + handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg) req := &types.ExecutorHeartbeatRequest{ Namespace: namespace, @@ -194,7 +201,7 @@ func TestConvertResponse(t *testing.T) { if tc.expectedResp.ShardAssignments == nil { tc.expectedResp.ShardAssignments = make(map[string]*types.ShardAssignment) } - res := _convertResponse(tc.input) + res := _convertResponse(tc.input, types.MigrationModeONBOARDED) // Ensure ShardAssignments is not nil for comparison purposes if res.ShardAssignments == nil { diff --git a/service/sharddistributor/leader/process/processor.go b/service/sharddistributor/leader/process/processor.go index 67db4222695..60421952ec0 100644 --- a/service/sharddistributor/leader/process/processor.go +++ b/service/sharddistributor/leader/process/processor.go @@ -172,9 +172,11 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) { defer ticker.Stop() // Perform an initial rebalance on startup. - err := p.rebalanceShards(ctx) - if err != nil { - p.logger.Error("initial rebalance failed", tag.Error(err)) + if p.namespaceCfg.Mode == config.MigrationModeONBOARDED { + err := p.rebalanceShards(ctx) + if err != nil { + p.logger.Error("initial rebalance failed", tag.Error(err)) + } } updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name) @@ -196,6 +198,10 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) { if latestRevision <= p.lastAppliedRevision { continue } + if p.namespaceCfg.Mode != config.MigrationModeONBOARDED { + p.logger.Info("Namespace not onboarded, rebalance not triggered", tag.ShardNamespace(p.namespaceCfg.Name)) + break + } p.logger.Info("State change detected, triggering rebalance.") err = p.rebalanceShards(ctx) case <-ticker.Chan(): diff --git a/service/sharddistributor/sharddistributorfx/sharddistributorfx.go b/service/sharddistributor/sharddistributorfx/sharddistributorfx.go index 675b5b4542c..88e1bbfb98b 100644 --- a/service/sharddistributor/sharddistributorfx/sharddistributorfx.go +++ b/service/sharddistributor/sharddistributorfx/sharddistributorfx.go @@ -72,7 +72,7 @@ func registerHandlers(params registerHandlersParams) error { rawHandler := handler.NewHandler(params.Logger, params.ShardDistributionCfg, params.Store) wrappedHandler := metered.NewMetricsHandler(rawHandler, params.Logger, params.MetricsClient) - executorHandler := handler.NewExecutorHandler(params.Store, params.TimeSource) + executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg) wrappedExecutor := metered.NewExecutorMetricsExecutor(executorHandler, params.Logger, params.MetricsClient) grpcHandler := grpc.NewGRPCHandler(wrappedHandler)