From 03bbab72252e126125cfd77cd65ef26c64301ef6 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Thu, 20 Nov 2025 14:17:34 +0100 Subject: [PATCH 1/2] Add canary ping handler for shard ownership verification Implement the server-side handler for canary ping requests. The handler receives ping requests for specific shards and responds with whether this executor owns the shard. The handler: 1. Looks up the executor for the requested namespace 2. Checks if this executor owns the requested shard 3. Returns ownership status and executor ID This allows other executors to verify shard ownership and test executor-to-executor communication. Dependencies: - Requires ShardDistributorExecutorCanaryAPI proto definition - Requires Executor.GetMetadata() to identify the executor Signed-off-by: Jakob Haahr Taankvist --- .../canary/handler/ping_handler.go | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 service/sharddistributor/canary/handler/ping_handler.go diff --git a/service/sharddistributor/canary/handler/ping_handler.go b/service/sharddistributor/canary/handler/ping_handler.go new file mode 100644 index 00000000000..8b8a33ab1c6 --- /dev/null +++ b/service/sharddistributor/canary/handler/ping_handler.go @@ -0,0 +1,122 @@ +package handler + +import ( + "context" + "fmt" + + "go.uber.org/fx" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/service/sharddistributor/canary/processor" + "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" +) + +// PingHandler handles ping requests to verify executor ownership of shards +type PingHandler struct { + logger *zap.Logger + executorsFixed map[string]executorclient.Executor[*processor.ShardProcessor] // namespace -> executor + executorsEphemeral map[string]executorclient.Executor[*processorephemeral.ShardProcessor] // namespace -> executor +} + +// Params are the parameters for creating a PingHandler +type Params struct { + fx.In + + Logger *zap.Logger + + ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"` + ExecutorsEphemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"` +} + +// NewPingHandler creates a new PingHandler +func NewPingHandler(params Params) *PingHandler { + // Create maps of executors for quick lookup + executorsFixed := make(map[string]executorclient.Executor[*processor.ShardProcessor]) + for _, executor := range params.ExecutorsFixed { + executorsFixed[executor.GetNamespace()] = executor + } + executorsEphemeral := make(map[string]executorclient.Executor[*processorephemeral.ShardProcessor]) + for _, executor := range params.ExecutorsEphemeral { + executorsEphemeral[executor.GetNamespace()] = executor + } + + // Return the handler + return &PingHandler{ + logger: params.Logger, + executorsFixed: executorsFixed, + executorsEphemeral: executorsEphemeral, + } +} + +// Ping handles ping requests to check shard ownership +func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.PingRequest) (*sharddistributorv1.PingResponse, error) { + h.logger.Info("Received ping request", + zap.String("shard_key", request.GetShardKey()), + zap.String("namespace", request.GetNamespace())) + + namespace := request.GetNamespace() + + // Check fixed executors first + if executor, found := h.executorsFixed[namespace]; found { + processor, err := executor.GetShardProcess(ctx, request.GetShardKey()) + ownshard := err == nil && processor != nil + metadata := executor.GetMetadata() + executorID := getExecutorID(metadata) + + response := &sharddistributorv1.PingResponse{ + ExecutorId: executorID, + OwnsShard: ownshard, + ShardKey: request.GetShardKey(), + } + + h.logger.Info("Responding to ping from fixed executor", + zap.String("shard_key", request.GetShardKey()), + zap.Bool("owns_shard", ownshard), + zap.String("executor_id", executorID)) + + return response, nil + } + + // Check ephemeral executors + if executor, found := h.executorsEphemeral[namespace]; found { + processor, err := executor.GetShardProcess(ctx, request.GetShardKey()) + ownshard := err == nil && processor != nil + metadata := executor.GetMetadata() + executorID := getExecutorID(metadata) + + response := &sharddistributorv1.PingResponse{ + ExecutorId: executorID, + OwnsShard: ownshard, + ShardKey: request.GetShardKey(), + } + + h.logger.Info("Responding to ping from ephemeral executor", + zap.String("shard_key", request.GetShardKey()), + zap.Bool("owns_shard", ownshard), + zap.String("executor_id", executorID)) + + return response, nil + } + + // Namespace not found in any executor + h.logger.Warn("Namespace not found in executors", + zap.String("namespace", namespace)) + + return &sharddistributorv1.PingResponse{ + ExecutorId: "unknown", + OwnsShard: false, + ShardKey: request.GetShardKey(), + }, nil +} + +func getExecutorID(metadata map[string]string) string { + if id, ok := metadata["executor_id"]; ok && id != "" { + return id + } + if addr, ok := metadata["grpc_address"]; ok && addr != "" { + return fmt.Sprintf("executor@%s", addr) + } + return "unknown" +} From f6480e403564a73a02b60c70957dc1bc69caf6a1 Mon Sep 17 00:00:00 2001 From: Jakob Haahr Taankvist Date: Tue, 25 Nov 2025 09:40:14 +0100 Subject: [PATCH 2/2] Add tests for canary ping handler Add table-driven tests covering: - Fixed executor shard ownership verification - Ephemeral executor shard ownership verification - Executor ID extraction from grpc_address metadata Achieves 93.3% test coverage. Signed-off-by: Jakob Haahr Taankvist --- .../canary/handler/ping_handler.go | 8 +- .../canary/handler/ping_handler_test.go | 95 +++++++++++++++++++ 2 files changed, 97 insertions(+), 6 deletions(-) create mode 100644 service/sharddistributor/canary/handler/ping_handler_test.go diff --git a/service/sharddistributor/canary/handler/ping_handler.go b/service/sharddistributor/canary/handler/ping_handler.go index 8b8a33ab1c6..910c65641f2 100644 --- a/service/sharddistributor/canary/handler/ping_handler.go +++ b/service/sharddistributor/canary/handler/ping_handler.go @@ -2,7 +2,6 @@ package handler import ( "context" - "fmt" "go.uber.org/fx" "go.uber.org/zap" @@ -112,11 +111,8 @@ func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.Ping } func getExecutorID(metadata map[string]string) string { - if id, ok := metadata["executor_id"]; ok && id != "" { - return id - } if addr, ok := metadata["grpc_address"]; ok && addr != "" { - return fmt.Sprintf("executor@%s", addr) + return addr } - return "unknown" + return "" } diff --git a/service/sharddistributor/canary/handler/ping_handler_test.go b/service/sharddistributor/canary/handler/ping_handler_test.go new file mode 100644 index 00000000000..91fcff10b84 --- /dev/null +++ b/service/sharddistributor/canary/handler/ping_handler_test.go @@ -0,0 +1,95 @@ +package handler + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/mock/gomock" + "go.uber.org/zap" + + sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1" + "github.com/uber/cadence/service/sharddistributor/canary/processor" + "github.com/uber/cadence/service/sharddistributor/canary/processorephemeral" + "github.com/uber/cadence/service/sharddistributor/client/executorclient" +) + +func TestPingHandler_Ping(t *testing.T) { + tests := []struct { + name string + namespace string + shardKey string + setup func(*gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) + wantID string + wantOwns bool + }{ + { + name: "fixed executor owns shard", + namespace: "ns1", + shardKey: "shard-1", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns1").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-1").Return(&processor.ShardProcessor{}, nil) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7953"}) + return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil + }, + wantID: "127.0.0.1:7953", + wantOwns: true, + }, + { + name: "fixed executor does not own shard", + namespace: "ns1", + shardKey: "shard-2", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processor.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns1").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-2").Return(nil, errors.New("not found")) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7954"}) + return []executorclient.Executor[*processor.ShardProcessor]{exec}, nil + }, + wantID: "127.0.0.1:7954", + wantOwns: false, + }, + { + name: "ephemeral executor owns shard", + namespace: "ns2", + shardKey: "shard-3", + setup: func(ctrl *gomock.Controller) ([]executorclient.Executor[*processor.ShardProcessor], []executorclient.Executor[*processorephemeral.ShardProcessor]) { + exec := executorclient.NewMockExecutor[*processorephemeral.ShardProcessor](ctrl) + exec.EXPECT().GetNamespace().Return("ns2").AnyTimes() + exec.EXPECT().GetShardProcess(gomock.Any(), "shard-3").Return(&processorephemeral.ShardProcessor{}, nil) + exec.EXPECT().GetMetadata().Return(map[string]string{"grpc_address": "127.0.0.1:7955"}) + return nil, []executorclient.Executor[*processorephemeral.ShardProcessor]{exec} + }, + wantID: "127.0.0.1:7955", + wantOwns: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + fixed, ephemeral := tt.setup(ctrl) + handler := NewPingHandler(Params{ + Logger: zap.NewNop(), + ExecutorsFixed: fixed, + ExecutorsEphemeral: ephemeral, + }) + + resp, err := handler.Ping(context.Background(), &sharddistributorv1.PingRequest{ + Namespace: tt.namespace, + ShardKey: tt.shardKey, + }) + + require.NoError(t, err) + assert.Equal(t, tt.wantID, resp.ExecutorId) + assert.Equal(t, tt.wantOwns, resp.OwnsShard) + assert.Equal(t, tt.shardKey, resp.ShardKey) + }) + } +}