Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions service/sharddistributor/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/dynamicconfig"
"github.com/uber/cadence/common/dynamicconfig/dynamicproperties"
"github.com/uber/cadence/common/types"
)

type (
Expand Down Expand Up @@ -86,6 +87,32 @@ const (
NamespaceTypeEphemeral = "ephemeral"
)

const (
MigrationModeINVALID = "invalid"
MigrationModeLOCALPASSTHROUGH = "local_pass"
MigrationModeLOCALPASSTHROUGHSHADOW = "local_pass_shadow"
MigrationModeDISTRIBUTEDPASSTHROUGH = "distributed_pass"
MigrationModeONBOARDED = "onboarded"
)

var configMode = map[string]types.MigrationMode{
MigrationModeINVALID: types.MigrationModeINVALID,
MigrationModeLOCALPASSTHROUGH: types.MigrationModeLOCALPASSTHROUGH,
MigrationModeLOCALPASSTHROUGHSHADOW: types.MigrationModeLOCALPASSTHROUGHSHADOW,
MigrationModeDISTRIBUTEDPASSTHROUGH: types.MigrationModeDISTRIBUTEDPASSTHROUGH,
MigrationModeONBOARDED: types.MigrationModeONBOARDED,
}

func (s *ShardDistribution) GetMigrationMode(namespace string) types.MigrationMode {
for _, ns := range s.Namespaces {
if ns.Name == namespace {
return configMode[ns.Type]
}
}
// TODO in the dynamic configuration I will setup a default value
return configMode[MigrationModeONBOARDED]
}

// NewConfig returns new service config with default values
func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
return &Config{
Expand Down
87 changes: 78 additions & 9 deletions service/sharddistributor/handler/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"time"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
)

Expand All @@ -16,16 +19,23 @@ const (
)

type executor struct {
timeSource clock.TimeSource
storage store.Store
logger log.Logger
timeSource clock.TimeSource
storage store.Store
shardDistributionCfg config.ShardDistribution
}

func NewExecutorHandler(storage store.Store,
func NewExecutorHandler(
logger log.Logger,
storage store.Store,
timeSource clock.TimeSource,
shardDistributionCfg config.ShardDistribution,
) Executor {
return &executor{
timeSource: timeSource,
storage: storage,
logger: logger,
timeSource: timeSource,
storage: storage,
shardDistributionCfg: shardDistributionCfg,
}
}

Expand All @@ -38,12 +48,29 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe

now := h.timeSource.Now().UTC()

mode := h.shardDistributionCfg.GetMigrationMode(request.Namespace)

switch mode {
case types.MigrationModeINVALID:
h.logger.Warn("Migration mode is invalid", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
return nil, fmt.Errorf("migration mode is invalid")
case types.MigrationModeLOCALPASSTHROUGH:
h.logger.Warn("Migration mode is local passthrough, no calls to heartbeat allowed", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
return nil, fmt.Errorf("migration mode is local passthrough")
// From SD perspective the behaviour is the same
case types.MigrationModeLOCALPASSTHROUGHSHADOW, types.MigrationModeDISTRIBUTEDPASSTHROUGH:
assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request, previousHeartbeat, assignedShards)
if err != nil {
return nil, err
}
}

// If the state has changed we need to update heartbeat data.
// Otherwise, we want to do it with controlled frequency - at most every _heartbeatRefreshRate.
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status {
if previousHeartbeat != nil && request.Status == previousHeartbeat.Status && mode == types.MigrationModeONBOARDED {
lastHeartbeatTime := time.Unix(previousHeartbeat.LastHeartbeat, 0)
if now.Sub(lastHeartbeatTime) < _heartbeatRefreshRate {
return _convertResponse(assignedShards), nil
return _convertResponse(assignedShards, mode), nil
}
}

Expand All @@ -58,14 +85,56 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
return nil, fmt.Errorf("record heartbeat: %w", err)
}

return _convertResponse(assignedShards), nil
return _convertResponse(assignedShards, mode), nil
}

// assignShardsInCurrentHeartbeat is used during the migration phase to assign the shards to the executors according to what is reported during the heartbeat
func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *types.ExecutorHeartbeatRequest, previousHeartbeat *store.HeartbeatState, previousAssignedShards *store.AssignedState) (*store.AssignedState, error) {
changeInShardAssignment := false
assignedShards := previousAssignedShards
// Check if the shards of the new assignment exists and have the same status in the old assignment
for shardAssigned, status := range request.GetShardStatusReports() {
shardStatusPreviousHeartbeat, ok := previousHeartbeat.ReportedShards[shardAssigned]
if !ok {
changeInShardAssignment = true
break
}
if status.GetStatus() != shardStatusPreviousHeartbeat.GetStatus() {
changeInShardAssignment = true
}
}
// Check if some shards from previous assignment should be deleted
if len(request.GetShardStatusReports()) != len(previousHeartbeat.ReportedShards) {
changeInShardAssignment = true
}
if changeInShardAssignment {
assignedShards = &store.AssignedState{
AssignedShards: make(map[string]*types.ShardAssignment),
LastUpdated: h.timeSource.Now().Unix(),
ModRevision: previousAssignedShards.ModRevision,
}
err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, nil)
if err != nil {
return nil, fmt.Errorf("delete executors: %w", err)
}
for shard := range request.GetShardStatusReports() {
err = h.storage.AssignShard(ctx, request.GetNamespace(), request.GetExecutorID(), shard)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is Ok for now, but slightly worried about transactionallity - maybe we should have an "assignShardsToExecutor" function in the storage?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we just deleted the executor from the store I'm concerned if this call will just fail since the executor is not there?
I think a new store function that's transactional sound good?

assignedShards.AssignedShards[shard] = &types.ShardAssignment{
Status: types.AssignmentStatusREADY,
}
return nil, fmt.Errorf("assign shard: %w", err)
}
}

return assignedShards, nil
}

func _convertResponse(shards *store.AssignedState) *types.ExecutorHeartbeatResponse {
func _convertResponse(shards *store.AssignedState, mode types.MigrationMode) *types.ExecutorHeartbeatResponse {
res := &types.ExecutorHeartbeatResponse{}
if shards == nil {
return res
}
res.ShardAssignments = shards.AssignedShards
res.MigrationMode = mode
return res
}
19 changes: 13 additions & 6 deletions service/sharddistributor/handler/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ import (
"go.uber.org/mock/gomock"

"github.com/uber/cadence/common/clock"
"github.com/uber/cadence/common/log/testlogger"
"github.com/uber/cadence/common/types"
"github.com/uber/cadence/service/sharddistributor/config"
"github.com/uber/cadence/service/sharddistributor/store"
)

Expand All @@ -25,7 +27,8 @@ func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSourceAt(now)
handler := NewExecutorHandler(mockStore, mockTimeSource)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
Expand All @@ -48,7 +51,8 @@ func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSourceAt(now)
handler := NewExecutorHandler(mockStore, mockTimeSource)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
Expand All @@ -72,7 +76,8 @@ func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSourceAt(now)
handler := NewExecutorHandler(mockStore, mockTimeSource)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
Expand Down Expand Up @@ -103,7 +108,8 @@ func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSourceAt(now)
handler := NewExecutorHandler(mockStore, mockTimeSource)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
Expand Down Expand Up @@ -131,7 +137,8 @@ func TestHeartbeat(t *testing.T) {
ctrl := gomock.NewController(t)
mockStore := store.NewMockStore(ctrl)
mockTimeSource := clock.NewMockedTimeSource()
handler := NewExecutorHandler(mockStore, mockTimeSource)
shardDistributionCfg := config.ShardDistribution{}
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg)

req := &types.ExecutorHeartbeatRequest{
Namespace: namespace,
Expand Down Expand Up @@ -194,7 +201,7 @@ func TestConvertResponse(t *testing.T) {
if tc.expectedResp.ShardAssignments == nil {
tc.expectedResp.ShardAssignments = make(map[string]*types.ShardAssignment)
}
res := _convertResponse(tc.input)
res := _convertResponse(tc.input, types.MigrationModeONBOARDED)

// Ensure ShardAssignments is not nil for comparison purposes
if res.ShardAssignments == nil {
Expand Down
12 changes: 9 additions & 3 deletions service/sharddistributor/leader/process/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,9 +172,11 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
defer ticker.Stop()

// Perform an initial rebalance on startup.
err := p.rebalanceShards(ctx)
if err != nil {
p.logger.Error("initial rebalance failed", tag.Error(err))
if p.namespaceCfg.Mode == config.MigrationModeONBOARDED {
err := p.rebalanceShards(ctx)
if err != nil {
p.logger.Error("initial rebalance failed", tag.Error(err))
}
}

updateChan, err := p.shardStore.Subscribe(ctx, p.namespaceCfg.Name)
Expand All @@ -196,6 +198,10 @@ func (p *namespaceProcessor) runRebalancingLoop(ctx context.Context) {
if latestRevision <= p.lastAppliedRevision {
continue
}
if p.namespaceCfg.Mode != config.MigrationModeONBOARDED {
p.logger.Info("Namespace not onboarded, rebalance not triggered", tag.ShardNamespace(p.namespaceCfg.Name))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might log a lot, but we can always adjust if it's too much

break
}
p.logger.Info("State change detected, triggering rebalance.")
err = p.rebalanceShards(ctx)
case <-ticker.Chan():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func registerHandlers(params registerHandlersParams) error {
rawHandler := handler.NewHandler(params.Logger, params.ShardDistributionCfg, params.Store)
wrappedHandler := metered.NewMetricsHandler(rawHandler, params.Logger, params.MetricsClient)

executorHandler := handler.NewExecutorHandler(params.Store, params.TimeSource)
executorHandler := handler.NewExecutorHandler(params.Logger, params.Store, params.TimeSource, params.ShardDistributionCfg)
wrappedExecutor := metered.NewExecutorMetricsExecutor(executorHandler, params.Logger, params.MetricsClient)

grpcHandler := grpc.NewGRPCHandler(wrappedHandler)
Expand Down
Loading