Skip to content

Commit adb4a98

Browse files
authored
feat(shard-distributor): add spectator client for read-only shard state monitoring (#7438)
**What changed?** - Added spectator client for shard distributor service - Spectator provides read-only access to shard ownership state via streaming updates - Includes local caching with RPC fallback for cache misses - Refactored executor and spectator clients to use `common/backoff.JitDuration()` for consistent retry jitter **Why?** - Enables services to monitor shard ownership without participating in shard distribution - Reduces load on shard distributor by caching state locally - Provides consistent retry behavior across shard distributor clients **How did you test it?** Unit tests - checked for flakyness **Potential risks** None **Release notes** None **Documentation Changes** None required --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent c6959bf commit adb4a98

File tree

7 files changed

+681
-11
lines changed

7 files changed

+681
-11
lines changed

service/sharddistributor/client/executorclient/clientimpl.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,14 @@ import (
44
"context"
55
"errors"
66
"fmt"
7-
"math/rand"
87
"sync"
98
"sync/atomic"
109
"time"
1110

1211
"github.com/uber-go/tally"
1312

1413
"github.com/uber/cadence/client/sharddistributorexecutor"
14+
"github.com/uber/cadence/common/backoff"
1515
"github.com/uber/cadence/common/clock"
1616
"github.com/uber/cadence/common/log"
1717
"github.com/uber/cadence/common/log/tag"
@@ -34,7 +34,7 @@ const (
3434
)
3535

3636
const (
37-
heartbeatJitterMax = 100 * time.Millisecond
37+
heartbeatJitterCoeff = 0.1 // 10% jitter
3838
)
3939

4040
type managedProcessor[SP ShardProcessor] struct {
@@ -185,7 +185,7 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
185185
return
186186
}
187187

188-
heartBeatTimer := e.timeSource.NewTimer(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
188+
heartBeatTimer := e.timeSource.NewTimer(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
189189
defer heartBeatTimer.Stop()
190190

191191
for {
@@ -199,7 +199,7 @@ func (e *executorImpl[SP]) heartbeatloop(ctx context.Context) {
199199
e.stopShardProcessors()
200200
return
201201
case <-heartBeatTimer.Chan():
202-
heartBeatTimer.Reset(getJitteredHeartbeatDuration(e.heartBeatInterval, heartbeatJitterMax))
202+
heartBeatTimer.Reset(backoff.JitDuration(e.heartBeatInterval, heartbeatJitterCoeff))
203203
shardAssignment, err := e.heartbeatAndHandleMigrationMode(ctx)
204204
if errors.Is(err, ErrLocalPassthroughMode) {
205205
e.logger.Info("local passthrough mode: stopping heartbeat loop")
@@ -462,13 +462,6 @@ func (e *executorImpl[SP]) emitMetricsConvergence(converged bool) {
462462
}
463463
}
464464

465-
func getJitteredHeartbeatDuration(interval time.Duration, jitterMax time.Duration) time.Duration {
466-
jitterMaxNanos := int64(jitterMax)
467-
randomJitterNanos := rand.Int63n(jitterMaxNanos)
468-
jitter := time.Duration(randomJitterNanos)
469-
return interval - jitter
470-
}
471-
472465
func (e *executorImpl[SP]) SetMetadata(metadata map[string]string) {
473466
e.metadata.Set(metadata)
474467
}
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/uber-go/tally"
8+
"go.uber.org/fx"
9+
10+
sharddistributorv1 "github.com/uber/cadence/.gen/proto/sharddistributor/v1"
11+
"github.com/uber/cadence/client/sharddistributor"
12+
"github.com/uber/cadence/client/wrappers/grpc"
13+
"github.com/uber/cadence/client/wrappers/retryable"
14+
timeoutwrapper "github.com/uber/cadence/client/wrappers/timeout"
15+
"github.com/uber/cadence/common"
16+
"github.com/uber/cadence/common/clock"
17+
"github.com/uber/cadence/common/log"
18+
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
19+
)
20+
21+
//go:generate mockgen -package $GOPACKAGE -source $GOFILE -destination interface_mock.go . Spectator
22+
23+
type Spectator interface {
24+
Start(ctx context.Context) error
25+
Stop()
26+
27+
// GetShardOwner returns the owner of a shard. It first checks the local cache,
28+
// and if not found, falls back to querying the shard distributor directly.
29+
GetShardOwner(ctx context.Context, shardKey string) (string, error)
30+
}
31+
32+
type Params struct {
33+
fx.In
34+
35+
YarpcClient sharddistributorv1.ShardDistributorAPIYARPCClient
36+
MetricsScope tally.Scope
37+
Logger log.Logger
38+
Config clientcommon.Config
39+
TimeSource clock.TimeSource
40+
}
41+
42+
// NewSpectatorWithNamespace creates a spectator for a specific namespace
43+
func NewSpectatorWithNamespace(params Params, namespace string) (Spectator, error) {
44+
return newSpectatorImpl(params, namespace)
45+
}
46+
47+
// NewSpectator creates a spectator for the single namespace in config
48+
func NewSpectator(params Params) (Spectator, error) {
49+
cfg, err := params.Config.GetSingleConfig()
50+
if err != nil {
51+
return nil, err
52+
}
53+
return newSpectatorImpl(params, cfg.Namespace)
54+
}
55+
56+
func newSpectatorImpl(params Params, namespace string) (Spectator, error) {
57+
// Get config for the specified namespace
58+
namespaceConfig, err := params.Config.GetConfigForNamespace(namespace)
59+
if err != nil {
60+
return nil, fmt.Errorf("get config for namespace %s: %w", namespace, err)
61+
}
62+
63+
return newSpectatorWithConfig(params, namespaceConfig)
64+
}
65+
66+
func newSpectatorWithConfig(params Params, namespaceConfig *clientcommon.NamespaceConfig) (Spectator, error) {
67+
// Create the wrapped shard distributor client
68+
shardDistributorClient, err := createShardDistributorClient(params.YarpcClient, params.MetricsScope)
69+
if err != nil {
70+
return nil, fmt.Errorf("create shard distributor client: %w", err)
71+
}
72+
73+
impl := &spectatorImpl{
74+
namespace: namespaceConfig.Namespace,
75+
config: *namespaceConfig,
76+
client: shardDistributorClient,
77+
logger: params.Logger,
78+
scope: params.MetricsScope,
79+
timeSource: params.TimeSource,
80+
}
81+
// Set WaitGroup to 1 to block until first state is received
82+
impl.firstStateWG.Add(1)
83+
84+
return impl, nil
85+
}
86+
87+
func createShardDistributorClient(yarpcClient sharddistributorv1.ShardDistributorAPIYARPCClient, metricsScope tally.Scope) (sharddistributor.Client, error) {
88+
// Wrap the YARPC client with GRPC wrapper
89+
client := grpc.NewShardDistributorClient(yarpcClient)
90+
91+
// Add timeout wrapper
92+
client = timeoutwrapper.NewShardDistributorClient(client, timeoutwrapper.ShardDistributorDefaultTimeout)
93+
94+
// Add metered wrapper
95+
if metricsScope != nil {
96+
client = NewMeteredShardDistributorClient(client, metricsScope)
97+
}
98+
99+
// Add retry wrapper
100+
client = retryable.NewShardDistributorClient(
101+
client,
102+
common.CreateShardDistributorServiceRetryPolicy(),
103+
common.IsServiceTransientError,
104+
)
105+
106+
return client, nil
107+
}
108+
109+
// Module creates a spectator module using auto-selection (single namespace only)
110+
func Module() fx.Option {
111+
return fx.Module("shard-distributor-spectator-client",
112+
fx.Provide(NewSpectator),
113+
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
114+
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
115+
}),
116+
)
117+
}
118+
119+
// ModuleWithNamespace creates a spectator module for a specific namespace
120+
func ModuleWithNamespace(namespace string) fx.Option {
121+
return fx.Module(fmt.Sprintf("shard-distributor-spectator-client-%s", namespace),
122+
fx.Provide(func(params Params) (Spectator, error) {
123+
return NewSpectatorWithNamespace(params, namespace)
124+
}),
125+
fx.Invoke(func(spectator Spectator, lc fx.Lifecycle) {
126+
lc.Append(fx.StartStopHook(spectator.Start, spectator.Stop))
127+
}),
128+
)
129+
}
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
package spectatorclient
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"sync"
7+
"time"
8+
9+
"github.com/uber-go/tally"
10+
11+
"github.com/uber/cadence/client/sharddistributor"
12+
"github.com/uber/cadence/common/backoff"
13+
"github.com/uber/cadence/common/clock"
14+
"github.com/uber/cadence/common/log"
15+
"github.com/uber/cadence/common/log/tag"
16+
"github.com/uber/cadence/common/types"
17+
"github.com/uber/cadence/service/sharddistributor/client/clientcommon"
18+
)
19+
20+
const (
21+
streamRetryInterval = 1 * time.Second
22+
streamRetryJitterCoeff = 0.1 // 10% jitter (900ms - 1100ms)
23+
)
24+
25+
// ShardOwner contains information about the executor that owns a shard
26+
type ShardOwner struct {
27+
ExecutorID string
28+
Metadata map[string]string
29+
}
30+
31+
type spectatorImpl struct {
32+
namespace string
33+
config clientcommon.NamespaceConfig
34+
client sharddistributor.Client
35+
scope tally.Scope
36+
logger log.Logger
37+
timeSource clock.TimeSource
38+
39+
ctx context.Context
40+
cancel context.CancelFunc
41+
stopWG sync.WaitGroup
42+
43+
// State storage with lock for thread-safe access
44+
// Map from shard ID to shard owner (executor ID + metadata)
45+
stateMu sync.RWMutex
46+
shardToOwner map[string]*ShardOwner
47+
48+
// WaitGroup to ensure first state is received before allowing queries
49+
firstStateWG sync.WaitGroup
50+
}
51+
52+
func (s *spectatorImpl) Start(ctx context.Context) error {
53+
// Create a cancellable context for the lifetime of the spectator
54+
// Use context.WithoutCancel to inherit values but not cancellation from fx lifecycle ctx
55+
s.ctx, s.cancel = context.WithCancel(context.WithoutCancel(ctx))
56+
57+
s.stopWG.Add(1)
58+
go func() {
59+
defer s.stopWG.Done()
60+
s.watchLoop()
61+
}()
62+
63+
return nil
64+
}
65+
66+
func (s *spectatorImpl) Stop() {
67+
if s.cancel != nil {
68+
s.cancel()
69+
}
70+
s.stopWG.Wait()
71+
}
72+
73+
func (s *spectatorImpl) watchLoop() {
74+
s.logger.Info("Starting watch loop for namespace", tag.ShardNamespace(s.namespace))
75+
76+
for {
77+
if s.ctx.Err() != nil {
78+
s.logger.Info("Shutting down, stopping watch loop", tag.ShardNamespace(s.namespace))
79+
return
80+
}
81+
82+
// Create new stream
83+
stream, err := s.client.WatchNamespaceState(s.ctx, &types.WatchNamespaceStateRequest{
84+
Namespace: s.namespace,
85+
})
86+
if err != nil {
87+
if s.ctx.Err() != nil {
88+
s.logger.Info("Shutting down during stream creation, exiting watch loop", tag.ShardNamespace(s.namespace))
89+
return
90+
}
91+
92+
s.logger.Error("Failed to create stream, retrying", tag.Error(err), tag.ShardNamespace(s.namespace))
93+
s.timeSource.Sleep(backoff.JitDuration(streamRetryInterval, streamRetryJitterCoeff))
94+
continue
95+
}
96+
97+
s.receiveLoop(stream)
98+
99+
if s.ctx.Err() != nil {
100+
s.logger.Info("Shutting down, exiting watch loop", tag.ShardNamespace(s.namespace))
101+
return
102+
}
103+
104+
// Server shutdown or network issue - recreate stream (load balancer will route to new server)
105+
s.logger.Info("Stream ended, reconnecting", tag.ShardNamespace(s.namespace))
106+
}
107+
}
108+
109+
func (s *spectatorImpl) receiveLoop(stream sharddistributor.WatchNamespaceStateClient) {
110+
defer func() {
111+
if err := stream.CloseSend(); err != nil {
112+
s.logger.Warn("Failed to close stream", tag.Error(err), tag.ShardNamespace(s.namespace))
113+
}
114+
}()
115+
116+
for {
117+
response, err := stream.Recv()
118+
if err != nil {
119+
if s.ctx.Err() != nil {
120+
// Client shutdown - Recv() unblocked due to context cancellation
121+
s.logger.Info("Recv interrupted by client shutdown", tag.ShardNamespace(s.namespace))
122+
} else {
123+
// Server error - io.EOF, network error, server shutdown, etc.
124+
s.logger.Warn("Stream error (server issue), will reconnect", tag.Error(err), tag.ShardNamespace(s.namespace))
125+
}
126+
return // Exit receiveLoop, watchLoop will handle reconnection or shutdown
127+
}
128+
129+
// Process the response
130+
s.handleResponse(response)
131+
}
132+
}
133+
134+
func (s *spectatorImpl) handleResponse(response *types.WatchNamespaceStateResponse) {
135+
// Build inverted map: shard ID -> shard owner (executor ID + metadata)
136+
shardToOwner := make(map[string]*ShardOwner)
137+
for _, executor := range response.Executors {
138+
owner := &ShardOwner{
139+
ExecutorID: executor.ExecutorID,
140+
Metadata: executor.Metadata,
141+
}
142+
for _, shard := range executor.AssignedShards {
143+
shardToOwner[shard.ShardKey] = owner
144+
}
145+
}
146+
147+
// Check if this is the first state we're receiving
148+
isFirstState := false
149+
s.stateMu.Lock()
150+
if s.shardToOwner == nil {
151+
isFirstState = true
152+
}
153+
s.shardToOwner = shardToOwner
154+
s.stateMu.Unlock()
155+
156+
// Signal that first state has been received
157+
if isFirstState {
158+
s.firstStateWG.Done()
159+
}
160+
161+
s.logger.Debug("Received namespace state update",
162+
tag.ShardNamespace(s.namespace),
163+
tag.Counter(len(response.Executors)))
164+
}
165+
166+
// GetShardOwner returns the executor ID for a given shard.
167+
// It first waits for the initial state to be received, then checks the cache.
168+
// If not found in cache, it falls back to querying the shard distributor directly.
169+
func (s *spectatorImpl) GetShardOwner(ctx context.Context, shardKey string) (string, error) {
170+
// Wait for first state to be received to avoid flooding shard distributor on startup
171+
s.firstStateWG.Wait()
172+
173+
// Check cache first
174+
s.stateMu.RLock()
175+
owner := s.shardToOwner[shardKey]
176+
s.stateMu.RUnlock()
177+
178+
if owner != nil {
179+
return owner.ExecutorID, nil
180+
}
181+
182+
// Cache miss - fall back to RPC call
183+
s.logger.Debug("Shard not found in cache, querying shard distributor",
184+
tag.ShardKey(shardKey),
185+
tag.ShardNamespace(s.namespace))
186+
187+
response, err := s.client.GetShardOwner(ctx, &types.GetShardOwnerRequest{
188+
Namespace: s.namespace,
189+
ShardKey: shardKey,
190+
})
191+
if err != nil {
192+
return "", fmt.Errorf("get shard owner from shard distributor: %w", err)
193+
}
194+
195+
return response.Owner, nil
196+
}

0 commit comments

Comments
 (0)