Skip to content

Commit b3c155c

Browse files
committed
Add canary ping handler for shard ownership verification
Implement the server-side handler for canary ping requests. The handler receives ping requests for specific shards and responds with whether this executor owns the shard. The handler: 1. Looks up the executor for the requested namespace 2. Checks if this executor owns the requested shard 3. Returns ownership status and executor ID This allows other executors to verify shard ownership and test executor-to-executor communication. Dependencies: - Requires ShardDistributorExecutorCanaryAPI proto definition - Requires Executor.GetMetadata() to identify the executor Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 43c9efc commit b3c155c

File tree

1 file changed

+122
-0
lines changed

1 file changed

+122
-0
lines changed
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
package handler
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"go.uber.org/fx"
8+
"go.uber.org/zap"
9+
10+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
11+
"github.com/uber/cadence/service/sharddistributor/canary/processor"
12+
"github.com/uber/cadence/service/sharddistributor/canary/processorephemeral"
13+
"github.com/uber/cadence/service/sharddistributor/client/executorclient"
14+
)
15+
16+
// PingHandler handles ping requests to verify executor ownership of shards
17+
type PingHandler struct {
18+
logger *zap.Logger
19+
executorsFixed map[string]executorclient.Executor[*processor.ShardProcessor] // namespace -> executor
20+
executorsEphemeral map[string]executorclient.Executor[*processorephemeral.ShardProcessor] // namespace -> executor
21+
}
22+
23+
// Params are the parameters for creating a PingHandler
24+
type Params struct {
25+
fx.In
26+
27+
Logger *zap.Logger
28+
29+
ExecutorsFixed []executorclient.Executor[*processor.ShardProcessor] `group:"executor-fixed-proc"`
30+
ExecutorsEphemeral []executorclient.Executor[*processorephemeral.ShardProcessor] `group:"executor-ephemeral-proc"`
31+
}
32+
33+
// NewPingHandler creates a new PingHandler
34+
func NewPingHandler(params Params) *PingHandler {
35+
// Create maps of executors for quick lookup
36+
executorsFixed := make(map[string]executorclient.Executor[*processor.ShardProcessor])
37+
for _, executor := range params.ExecutorsFixed {
38+
executorsFixed[executor.GetNamespace()] = executor
39+
}
40+
executorsEphemeral := make(map[string]executorclient.Executor[*processorephemeral.ShardProcessor])
41+
for _, executor := range params.ExecutorsEphemeral {
42+
executorsEphemeral[executor.GetNamespace()] = executor
43+
}
44+
45+
// Return the handler
46+
return &PingHandler{
47+
logger: params.Logger,
48+
executorsFixed: executorsFixed,
49+
executorsEphemeral: executorsEphemeral,
50+
}
51+
}
52+
53+
// Ping handles ping requests to check shard ownership
54+
func (h *PingHandler) Ping(ctx context.Context, request *sharddistributorv1.PingRequest) (*sharddistributorv1.PingResponse, error) {
55+
h.logger.Info("Received ping request",
56+
zap.String("shard_key", request.GetShardKey()),
57+
zap.String("namespace", request.GetNamespace()))
58+
59+
namespace := request.GetNamespace()
60+
61+
// Check fixed executors first
62+
if executor, found := h.executorsFixed[namespace]; found {
63+
processor, err := executor.GetShardProcess(ctx, request.GetShardKey())
64+
ownshard := err == nil && processor != nil
65+
metadata := executor.GetMetadata()
66+
executorID := getExecutorID(metadata)
67+
68+
response := &sharddistributorv1.PingResponse{
69+
ExecutorId: executorID,
70+
OwnsShard: ownshard,
71+
ShardKey: request.GetShardKey(),
72+
}
73+
74+
h.logger.Info("Responding to ping from fixed executor",
75+
zap.String("shard_key", request.GetShardKey()),
76+
zap.Bool("owns_shard", ownshard),
77+
zap.String("executor_id", executorID))
78+
79+
return response, nil
80+
}
81+
82+
// Check ephemeral executors
83+
if executor, found := h.executorsEphemeral[namespace]; found {
84+
processor, err := executor.GetShardProcess(ctx, request.GetShardKey())
85+
ownshard := err == nil && processor != nil
86+
metadata := executor.GetMetadata()
87+
executorID := getExecutorID(metadata)
88+
89+
response := &sharddistributorv1.PingResponse{
90+
ExecutorId: executorID,
91+
OwnsShard: ownshard,
92+
ShardKey: request.GetShardKey(),
93+
}
94+
95+
h.logger.Info("Responding to ping from ephemeral executor",
96+
zap.String("shard_key", request.GetShardKey()),
97+
zap.Bool("owns_shard", ownshard),
98+
zap.String("executor_id", executorID))
99+
100+
return response, nil
101+
}
102+
103+
// Namespace not found in any executor
104+
h.logger.Warn("Namespace not found in executors",
105+
zap.String("namespace", namespace))
106+
107+
return &sharddistributorv1.PingResponse{
108+
ExecutorId: "unknown",
109+
OwnsShard: false,
110+
ShardKey: request.GetShardKey(),
111+
}, nil
112+
}
113+
114+
func getExecutorID(metadata map[string]string) string {
115+
if id, ok := metadata["executor_id"]; ok && id != "" {
116+
return id
117+
}
118+
if addr, ok := metadata["grpc_address"]; ok && addr != "" {
119+
return fmt.Sprintf("executor@%s", addr)
120+
}
121+
return "unknown"
122+
}

0 commit comments

Comments
 (0)