Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 118 additions & 0 deletions service/sharddistributor/canary/handler/ping_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package handler

import (
"context"

"go.uber.org/fx"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/service/sharddistributor/canary/processor"
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
)

// PingHandler handles ping requests to verify executor ownership of shards
type PingHandler struct {
logger *zap.Logger
executorsFixed map[string]executorclient.Executor[*processor.ShardProcessor] // namespace -> executor
executorsEphemeral map[string]executorclient.Executor[*processorephemeral.ShardProcessor] // namespace -> executor
}

// Params are the parameters for creating a PingHandler
type Params struct {
fx.In

Logger *zap.Logger

ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"`
ExecutorsEphemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
}

// NewPingHandler creates a new PingHandler
func NewPingHandler(params Params) *PingHandler {
// Create maps of executors for quick lookup
executorsFixed := make(map[string]executorclient.Executor[*processor.ShardProcessor])
for _, executor := range params.ExecutorsFixed {
executorsFixed[executor.GetNamespace()] = executor
}
executorsEphemeral := make(map[string]executorclient.Executor[*processorephemeral.ShardProcessor])
for _, executor := range params.ExecutorsEphemeral {
executorsEphemeral[executor.GetNamespace()] = executor
}

// Return the handler
return &PingHandler{
logger: params.Logger,
executorsFixed: executorsFixed,
executorsEphemeral: executorsEphemeral,
}
}

// Ping handles ping requests to check shard ownership
func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.PingRequest) (*sharddistributorv1.PingResponse, error) {
h.logger.Info("Received ping request",
zap.String("shard_key", request.GetShardKey()),
zap.String("namespace", request.GetNamespace()))

namespace := request.GetNamespace()

// Check fixed executors first
if executor, found := h.executorsFixed[namespace]; found {
processor, err := executor.GetShardProcess(ctx, request.GetShardKey())
ownshard := err == nil && processor != nil
metadata := executor.GetMetadata()
executorID := getExecutorID(metadata)

response := &sharddistributorv1.PingResponse{
ExecutorId: executorID,
OwnsShard: ownshard,
ShardKey: request.GetShardKey(),
}

h.logger.Info("Responding to ping from fixed executor",
zap.String("shard_key", request.GetShardKey()),
zap.Bool("owns_shard", ownshard),
zap.String("executor_id", executorID))

return response, nil
}

// Check ephemeral executors
if executor, found := h.executorsEphemeral[namespace]; found {
processor, err := executor.GetShardProcess(ctx, request.GetShardKey())
ownshard := err == nil && processor != nil
metadata := executor.GetMetadata()
executorID := getExecutorID(metadata)

response := &sharddistributorv1.PingResponse{
ExecutorId: executorID,
OwnsShard: ownshard,
ShardKey: request.GetShardKey(),
}

h.logger.Info("Responding to ping from ephemeral executor",
zap.String("shard_key", request.GetShardKey()),
zap.Bool("owns_shard", ownshard),
zap.String("executor_id", executorID))

return response, nil
}

// Namespace not found in any executor
h.logger.Warn("Namespace not found in executors",
zap.String("namespace", namespace))

return &sharddistributorv1.PingResponse{
ExecutorId: "unknown",
OwnsShard: false,
ShardKey: request.GetShardKey(),
}, nil
}

func getExecutorID(metadata map[string]string) string {
if addr, ok := metadata["grpc_address"]; ok && addr != "" {
return addr
}
return ""
}
95 changes: 95 additions & 0 deletions service/sharddistributor/canary/handler/ping_handler_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package handler

import (
"context"
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"
"go.uber.org/zap"

sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
"github.com/uber/cadence/service/sharddistributor/canary/processor"
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
)

func TestPingHandler_Ping(t *testing.T) {
tests := []struct {
name string
namespace string
shardKey string
setup func(*gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor])
wantID string
wantOwns bool
}{
{
name: "fixed executor owns shard",
namespace: "ns1",
shardKey: "shard-1",
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl)
exec.EXPECT().GetNamespace().Return("ns1").AnyTimes()
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-1").Return(&processor.ShardProcessor{}, nil)
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7953"})
return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil
},
wantID: "127.0.0.1:7953",
wantOwns: true,
},
{
name: "fixed executor does not own shard",
namespace: "ns1",
shardKey: "shard-2",
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl)
exec.EXPECT().GetNamespace().Return("ns1").AnyTimes()
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-2").Return(nil, errors.New("not found"))
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7954"})
return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil
},
wantID: "127.0.0.1:7954",
wantOwns: false,
},
{
name: "ephemeral executor owns shard",
namespace: "ns2",
shardKey: "shard-3",
setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) {
exec := executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl)
exec.EXPECT().GetNamespace().Return("ns2").AnyTimes()
exec.EXPECT().GetShardProcess(gomock.Any(), "shard-3").Return(&processorephemeral.ShardProcessor{}, nil)
exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7955"})
return nil, []executorclient.Executor[*processorephemeral.ShardProcessor]{exec}
},
wantID: "127.0.0.1:7955",
wantOwns: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

fixed, ephemeral := tt.setup(ctrl)
handler := NewPingHandler(Params{
Logger: zap.NewNop(),
ExecutorsFixed: fixed,
ExecutorsEphemeral: ephemeral,
})

resp, err := handler.Ping(context.Background(), &sharddistributorv1.PingRequest{
Namespace: tt.namespace,
ShardKey: tt.shardKey,
})

require.NoError(t, err)
assert.Equal(t, tt.wantID, resp.ExecutorId)
assert.Equal(t, tt.wantOwns, resp.OwnsShard)
assert.Equal(t, tt.shardKey, resp.ShardKey)
})
}
}
Loading