Skip to content

Commit 34d56e5

Browse files
committed
Add canary pinger for periodic shard ownership verification
Implement the client-side pinger that periodically pings random shard owners to verify: 1. Executors can route to each other based on shard ownership 2. Shard ownership information is accurate 3. The shard distributor is functioning correctly The pinger: - Selects random shards at regular intervals (1s with 10% jitter) - Sends ping requests to the executor owning each shard - Validates that the receiving executor actually owns the shard - Logs warnings when ownership is incorrect Dependencies: - Requires ShardDistributorExecutorCanaryAPI proto and client - Will use SpectatorPeerChooser for routing (wired in later commit) Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 71df42c commit 34d56e5

File tree

3 files changed

+320
-0
lines changed

3 files changed

+320
-0
lines changed

service/sharddistributor/canary/pinger/canary_client_mock.go

Lines changed: 63 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"sync"
8+
"time"
9+
10+
"go.uber.org/fx"
11+
"go.uber.org/yarpc"
12+
"go.uber.org/zap"
13+
14+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
15+
"github.com/uber/cadence/common/backoff"
16+
"github.com/uber/cadence/common/clock"
17+
"github.com/uber/cadence/service/sharddistributor/client/spectatorclient"
18+
)
19+
20+
//go:generate mockgen -package $GOPACKAGE -destination canary_client_mock.go github.com/uber/cadence/.gen/proto/sharddistributor/v1 ShardDistributorExecutorCanaryAPIYARPCClient
21+
22+
const (
23+
pingInterval = 1 * time.Second
24+
pingJitterCoeff = 0.1 // 10% jitter
25+
pingTimeout = 5 * time.Second
26+
)
27+
28+
// Pinger periodically pings shard owners in the fixed namespace
29+
type Pinger struct {
30+
logger *zap.Logger
31+
timeSource clock.TimeSource
32+
canaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
33+
namespace string
34+
numShards int
35+
ctx context.Context
36+
cancel context.CancelFunc
37+
wg sync.WaitGroup
38+
}
39+
40+
// Params are the parameters for creating a Pinger
41+
type Params struct {
42+
fx.In
43+
44+
Logger *zap.Logger
45+
TimeSource clock.TimeSource
46+
CanaryClient sharddistributorv1.ShardDistributorExecutorCanaryAPIYARPCClient
47+
}
48+
49+
// NewPinger creates a new Pinger for the fixed namespace
50+
func NewPinger(params Params, namespace string, numShards int) *Pinger {
51+
return &Pinger{
52+
logger: params.Logger,
53+
timeSource: params.TimeSource,
54+
canaryClient: params.CanaryClient,
55+
namespace: namespace,
56+
numShards: numShards,
57+
}
58+
}
59+
60+
// Start begins the periodic ping loop
61+
func (p *Pinger) Start(ctx context.Context) {
62+
p.logger.Info("Starting canary pinger", zap.String("namespace", p.namespace), zap.Int("num_shards", p.numShards))
63+
p.ctx, p.cancel = context.WithCancel(context.WithoutCancel(ctx))
64+
p.wg.Add(1)
65+
go p.pingLoop()
66+
}
67+
68+
// Stop stops the ping loop
69+
func (p *Pinger) Stop() {
70+
if p.cancel != nil {
71+
p.cancel()
72+
}
73+
p.wg.Wait()
74+
}
75+
76+
func (p *Pinger) pingLoop() {
77+
defer p.wg.Done()
78+
79+
ticker := p.timeSource.NewTicker(backoff.JitDuration(pingInterval, pingJitterCoeff))
80+
defer ticker.Stop()
81+
82+
p.logger.Info("Starting canary pinger",
83+
zap.String("namespace", p.namespace),
84+
zap.Int("num_shards", p.numShards))
85+
86+
for {
87+
select {
88+
case <-p.ctx.Done():
89+
p.logger.Info("Pinger context done, stopping")
90+
return
91+
case <-ticker.Chan():
92+
p.pingRandomShard()
93+
ticker.Reset(backoff.JitDuration(pingInterval, pingJitterCoeff))
94+
}
95+
}
96+
}
97+
98+
func (p *Pinger) pingRandomShard() {
99+
// Pick a random shard number
100+
shardNum := rand.Intn(p.numShards)
101+
shardKey := fmt.Sprintf("%d", shardNum)
102+
103+
if err := p.pingShard(shardKey); err != nil {
104+
p.logger.Error("Failed to ping shard",
105+
zap.String("namespace", p.namespace),
106+
zap.String("shard_key", shardKey),
107+
zap.Error(err))
108+
}
109+
}
110+
111+
func (p *Pinger) pingShard(shardKey string) error {
112+
// Create ping request
113+
request := &sharddistributorv1.PingRequest{
114+
ShardKey: shardKey,
115+
Namespace: p.namespace,
116+
}
117+
118+
// Create context with deadline for the RPC call
119+
ctx, cancel := context.WithTimeout(p.ctx, pingTimeout)
120+
defer cancel()
121+
122+
response, err := p.canaryClient.Ping(ctx, request, yarpc.WithShardKey(shardKey), yarpc.WithHeader(spectatorclient.NamespaceHeader, p.namespace))
123+
if err != nil {
124+
return fmt.Errorf("ping rpc failed: %w", err)
125+
}
126+
127+
// Verify response
128+
if !response.GetOwnsShard() {
129+
p.logger.Warn("Executor does not own shard",
130+
zap.String("namespace", p.namespace),
131+
zap.String("shard_key", shardKey),
132+
zap.String("executor_id", response.GetExecutorId()))
133+
return fmt.Errorf("executor %s does not own shard %s", response.GetExecutorId(), shardKey)
134+
}
135+
136+
p.logger.Info("Successfully pinged shard owner",
137+
zap.String("namespace", p.namespace),
138+
zap.String("shard_key", shardKey),
139+
zap.String("executor_id", response.GetExecutorId()))
140+
141+
return nil
142+
}
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package pinger
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/stretchr/testify/assert"
9+
"github.com/stretchr/testify/require"
10+
"go.uber.org/goleak"
11+
"go.uber.org/mock/gomock"
12+
"go.uber.org/zap"
13+
14+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
15+
"github.com/uber/cadence/common/clock"
16+
)
17+
18+
func TestPingerStartStop(t *testing.T) {
19+
defer goleak.VerifyNone(t)
20+
21+
ctrl := gomock.NewController(t)
22+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
23+
24+
pinger := NewPinger(Params{
25+
Logger: zap.NewNop(),
26+
TimeSource: clock.NewRealTimeSource(),
27+
CanaryClient: mockClient,
28+
}, "test-ns", 10)
29+
30+
pinger.Start(context.Background())
31+
pinger.Stop()
32+
}
33+
34+
func TestPingShard_Success(t *testing.T) {
35+
ctrl := gomock.NewController(t)
36+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
37+
38+
pinger := NewPinger(Params{
39+
Logger: zap.NewNop(),
40+
TimeSource: clock.NewRealTimeSource(),
41+
CanaryClient: mockClient,
42+
}, "test-ns", 10)
43+
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
44+
defer pinger.cancel()
45+
46+
mockClient.EXPECT().
47+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
48+
Return(&sharddistributorv1.PingResponse{
49+
OwnsShard: true,
50+
ExecutorId: "127.0.0.1:7953",
51+
}, nil)
52+
53+
err := pinger.pingShard("5")
54+
assert.NoError(t, err)
55+
}
56+
57+
func TestPingShard_DoesNotOwnShard(t *testing.T) {
58+
ctrl := gomock.NewController(t)
59+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
60+
61+
pinger := NewPinger(Params{
62+
Logger: zap.NewNop(),
63+
TimeSource: clock.NewRealTimeSource(),
64+
CanaryClient: mockClient,
65+
}, "test-ns", 10)
66+
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
67+
defer pinger.cancel()
68+
69+
mockClient.EXPECT().
70+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
71+
Return(&sharddistributorv1.PingResponse{
72+
OwnsShard: false,
73+
ExecutorId: "127.0.0.1:7953",
74+
}, nil)
75+
76+
err := pinger.pingShard("5")
77+
assert.Error(t, err)
78+
assert.Contains(t, err.Error(), "does not own shard")
79+
}
80+
81+
func TestPingShard_RPCError(t *testing.T) {
82+
ctrl := gomock.NewController(t)
83+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
84+
85+
pinger := NewPinger(Params{
86+
Logger: zap.NewNop(),
87+
TimeSource: clock.NewRealTimeSource(),
88+
CanaryClient: mockClient,
89+
}, "test-ns", 10)
90+
pinger.ctx, pinger.cancel = context.WithCancel(context.Background())
91+
defer pinger.cancel()
92+
93+
mockClient.EXPECT().
94+
Ping(gomock.Any(), gomock.Any(), gomock.Any()).
95+
Return(nil, errors.New("network error"))
96+
97+
err := pinger.pingShard("5")
98+
assert.Error(t, err)
99+
assert.Contains(t, err.Error(), "ping rpc failed")
100+
}
101+
102+
func TestNewPinger(t *testing.T) {
103+
ctrl := gomock.NewController(t)
104+
mockClient := NewMockShardDistributorExecutorCanaryAPIYARPCClient(ctrl)
105+
106+
pinger := NewPinger(Params{
107+
Logger: zap.NewNop(),
108+
TimeSource: clock.NewRealTimeSource(),
109+
CanaryClient: mockClient,
110+
}, "test-ns", 100)
111+
112+
require.NotNil(t, pinger)
113+
assert.Equal(t, "test-ns", pinger.namespace)
114+
assert.Equal(t, 100, pinger.numShards)
115+
}

0 commit comments

Comments
 (0)