diff --git a/service/sharddistributor/canary/handler/ping_handler.go b/service/sharddistributor/canary/handler/ping_handler.go new file mode 100644 index 00000000000..910c65641f2 --- /dev/null +++ b/service/sharddistributor/canary/handler/ping_handler.go @@ -0,0 +1,118 @@ +package handler + +import ( + "context" + + "go.uber.org/fx" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/service/sharddistributor/canary/processor" + "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" +) + +// PingHandler handles ping requests to verify executor ownership of shards +type PingHandler struct { + logger *zap.Logger + executorsFixed map[string]executorclient.Executor[*processor.ShardProcessor] // namespace -> executor + executorsEphemeral map[string]executorclient.Executor[*processorephemeral.ShardProcessor] // namespace -> executor +} + +// Params are the parameters for creating a PingHandler +type Params struct { + fx.In + + Logger *zap.Logger + + ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"` + ExecutorsEphemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"` +} + +// NewPingHandler creates a new PingHandler +func NewPingHandler(params Params) *PingHandler { + // Create maps of executors for quick lookup + executorsFixed := make(map[string]executorclient.Executor[*processor.ShardProcessor]) + for _, executor := range params.ExecutorsFixed { + executorsFixed[executor.GetNamespace()] = executor + } + executorsEphemeral := make(map[string]executorclient.Executor[*processorephemeral.ShardProcessor]) + for _, executor := range params.ExecutorsEphemeral { + executorsEphemeral[executor.GetNamespace()] = executor + } + + // Return the handler + return &PingHandler{ + logger: params.Logger, + executorsFixed: executorsFixed, + executorsEphemeral: executorsEphemeral, + } +} + +// Ping handles ping requests to check shard ownership +func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.PingRequest) (*sharddistributorv1.PingResponse, error) { + h.logger.Info("Received ping request", + zap.String("shard_key", request.GetShardKey()), + zap.String("namespace", request.GetNamespace())) + + namespace := request.GetNamespace() + + // Check fixed executors first + if executor, found := h.executorsFixed[namespace]; found { + processor, err := executor.GetShardProcess(ctx, request.GetShardKey()) + ownshard := err == nil && processor != nil + metadata := executor.GetMetadata() + executorID := getExecutorID(metadata) + + response := &sharddistributorv1.PingResponse{ + ExecutorId: executorID, + OwnsShard: ownshard, + ShardKey: request.GetShardKey(), + } + + h.logger.Info("Responding to ping from fixed executor", + zap.String("shard_key", request.GetShardKey()), + zap.Bool("owns_shard", ownshard), + zap.String("executor_id", executorID)) + + return response, nil + } + + // Check ephemeral executors + if executor, found := h.executorsEphemeral[namespace]; found { + processor, err := executor.GetShardProcess(ctx, request.GetShardKey()) + ownshard := err == nil && processor != nil + metadata := executor.GetMetadata() + executorID := getExecutorID(metadata) + + response := &sharddistributorv1.PingResponse{ + ExecutorId: executorID, + OwnsShard: ownshard, + ShardKey: request.GetShardKey(), + } + + h.logger.Info("Responding to ping from ephemeral executor", + zap.String("shard_key", request.GetShardKey()), + zap.Bool("owns_shard", ownshard), + zap.String("executor_id", executorID)) + + return response, nil + } + + // Namespace not found in any executor + h.logger.Warn("Namespace not found in executors", + zap.String("namespace", namespace)) + + return &sharddistributorv1.PingResponse{ + ExecutorId: "unknown", + OwnsShard: false, + ShardKey: request.GetShardKey(), + }, nil +} + +func getExecutorID(metadata map[string]string) string { + if addr, ok := metadata["grpc_address"]; ok && addr != "" { + return addr + } + return "" +} diff --git a/service/sharddistributor/canary/handler/ping_handler_test.go b/service/sharddistributor/canary/handler/ping_handler_test.go new file mode 100644 index 00000000000..91fcff10b84 --- /dev/null +++ b/service/sharddistributor/canary/handler/ping_handler_test.go @@ -0,0 +1,95 @@ +package handler + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/service/sharddistributor/canary/processor" + "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" +) + +func TestPingHandler_Ping(t *testing.T) { + tests := []struct { + name string + namespace string + shardKey string + setup func(*gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) + wantID string + wantOwns bool + }{ + { + name: "fixed executor owns shard", + namespace: "ns1", + shardKey: "shard-1", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns1").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-1").Return(&processor.ShardProcessor{}, nil) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7953"}) + return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil + }, + wantID: "127.0.0.1:7953", + wantOwns: true, + }, + { + name: "fixed executor does not own shard", + namespace: "ns1", + shardKey: "shard-2", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns1").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-2").Return(nil, errors.New("not found")) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7954"}) + return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil + }, + wantID: "127.0.0.1:7954", + wantOwns: false, + }, + { + name: "ephemeral executor owns shard", + namespace: "ns2", + shardKey: "shard-3", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns2").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-3").Return(&processorephemeral.ShardProcessor{}, nil) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7955"}) + return nil, []executorclient.Executor[*processorephemeral.ShardProcessor]{exec} + }, + wantID: "127.0.0.1:7955", + wantOwns: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + fixed, ephemeral := tt.setup(ctrl) + handler := NewPingHandler(Params{ + Logger: zap.NewNop(), + ExecutorsFixed: fixed, + ExecutorsEphemeral: ephemeral, + }) + + resp, err := handler.Ping(context.Background(), &sharddistributorv1.PingRequest{ + Namespace: tt.namespace, + ShardKey: tt.shardKey, + }) + + require.NoError(t, err) + assert.Equal(t, tt.wantID, resp.ExecutorId) + assert.Equal(t, tt.wantOwns, resp.OwnsShard) + assert.Equal(t, tt.shardKey, resp.ShardKey) + }) + } +}