Skip to content

Commit 57f0d8d

Browse files
authored
feat(shard-distributor): implement WatchNamespaceState streaming RPC (#7432)
**What changed?** Implemented the `WatchNamespaceState` streaming RPC endpoint for the shard distributor service, including a pub/sub mechanism for real-time assignment change notifications. **Why?** The `WatchNamespaceState` endpoint was previously unimplemented. This enables executors and spectators to receive real-time updates about shard assignment changes without polling, improving responsiveness and reducing load on the storage layer. **How did you test it?** Added unit tests for the handler's streaming behavior and the pub/sub mechanism. **Potential risks** Low - this is a new feature in an experimental service. The pub/sub implementation includes non-blocking publish to prevent slow subscribers from blocking the system. **Release notes** N/A - shard distributor is experimental **Documentation Changes** None required --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent 1dd074e commit 57f0d8d

File tree

11 files changed

+397
-15
lines changed

11 files changed

+397
-15
lines changed

common/metrics/defs.go

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1481,6 +1481,7 @@ const (
14811481
ShardDistributorStoreGetStateScope
14821482
ShardDistributorStoreRecordHeartbeatScope
14831483
ShardDistributorStoreSubscribeScope
1484+
ShardDistributorStoreSubscribeToAssignmentChangesScope
14841485

14851486
// The scope for the shard distributor executor
14861487
ShardDistributorExecutorScope
@@ -2153,20 +2154,21 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
21532154
DiagnosticsWorkflowScope: {operation: "DiagnosticsWorkflow"},
21542155
},
21552156
ShardDistributor: {
2156-
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
2157-
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
2158-
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
2159-
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
2160-
ShardDistributorExecutorScope: {operation: "Executor"},
2161-
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
2162-
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
2163-
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
2164-
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
2165-
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
2166-
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
2167-
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
2168-
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
2169-
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
2157+
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
2158+
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
2159+
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
2160+
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
2161+
ShardDistributorExecutorScope: {operation: "Executor"},
2162+
ShardDistributorStoreGetShardOwnerScope: {operation: "StoreGetShardOwner"},
2163+
ShardDistributorStoreAssignShardScope: {operation: "StoreAssignShard"},
2164+
ShardDistributorStoreAssignShardsScope: {operation: "StoreAssignShards"},
2165+
ShardDistributorStoreDeleteExecutorsScope: {operation: "StoreDeleteExecutors"},
2166+
ShardDistributorStoreDeleteShardStatsScope: {operation: "StoreDeleteShardStats"},
2167+
ShardDistributorStoreGetHeartbeatScope: {operation: "StoreGetHeartbeat"},
2168+
ShardDistributorStoreGetStateScope: {operation: "StoreGetState"},
2169+
ShardDistributorStoreRecordHeartbeatScope: {operation: "StoreRecordHeartbeat"},
2170+
ShardDistributorStoreSubscribeScope: {operation: "StoreSubscribe"},
2171+
ShardDistributorStoreSubscribeToAssignmentChangesScope: {operation: "StoreSubscribeToAssignmentChanges"},
21702172
},
21712173
}
21722174

service/sharddistributor/handler/handler.go

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,5 +142,78 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string
142142
}
143143

144144
func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequest, server WatchNamespaceStateServer) error {
145-
return fmt.Errorf("not implemented")
145+
h.startWG.Wait()
146+
147+
// Subscribe to state changes from storage
148+
assignmentChangesChan, unSubscribe, err := h.storage.SubscribeToAssignmentChanges(server.Context(), request.Namespace)
149+
defer unSubscribe()
150+
if err != nil {
151+
return fmt.Errorf("subscribe to namespace state: %w", err)
152+
}
153+
154+
// Send initial state immediately so client doesn't have to wait for first update
155+
state, err := h.storage.GetState(server.Context(), request.Namespace)
156+
if err != nil {
157+
return fmt.Errorf("get initial state: %w", err)
158+
}
159+
response := toWatchNamespaceStateResponse(state)
160+
if err := server.Send(response); err != nil {
161+
return fmt.Errorf("send initial state: %w", err)
162+
}
163+
164+
// Stream subsequent updates
165+
for {
166+
select {
167+
case <-server.Context().Done():
168+
return server.Context().Err()
169+
case assignmentChanges, ok := <-assignmentChangesChan:
170+
if !ok {
171+
return fmt.Errorf("unexpected close of updates channel")
172+
}
173+
response := &types.WatchNamespaceStateResponse{
174+
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
175+
}
176+
for executor, shardIDs := range assignmentChanges {
177+
response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
178+
ExecutorID: executor.ExecutorID,
179+
AssignedShards: WrapShards(shardIDs),
180+
Metadata: executor.Metadata,
181+
})
182+
}
183+
184+
err = server.Send(response)
185+
if err != nil {
186+
return fmt.Errorf("send response: %w", err)
187+
}
188+
}
189+
}
190+
}
191+
192+
func toWatchNamespaceStateResponse(state *store.NamespaceState) *types.WatchNamespaceStateResponse {
193+
response := &types.WatchNamespaceStateResponse{
194+
Executors: make([]*types.ExecutorShardAssignment, 0, len(state.ShardAssignments)),
195+
}
196+
197+
for executorID, assignment := range state.ShardAssignments {
198+
// Extract shard IDs from the assigned shards map
199+
shardIDs := make([]string, 0, len(assignment.AssignedShards))
200+
for shardID := range assignment.AssignedShards {
201+
shardIDs = append(shardIDs, shardID)
202+
}
203+
204+
response.Executors = append(response.Executors, &types.ExecutorShardAssignment{
205+
ExecutorID: executorID,
206+
AssignedShards: WrapShards(shardIDs),
207+
Metadata: state.Executors[executorID].Metadata,
208+
})
209+
}
210+
return response
211+
}
212+
213+
func WrapShards(shardIDs []string) []*types.Shard {
214+
shards := make([]*types.Shard, 0, len(shardIDs))
215+
for _, shardID := range shardIDs {
216+
shards = append(shards, &types.Shard{ShardKey: shardID})
217+
}
218+
return shards
146219
}

service/sharddistributor/handler/handler_test.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@ package handler
2525
import (
2626
"context"
2727
"errors"
28+
"sync"
2829
"testing"
30+
"time"
2931

3032
"github.com/stretchr/testify/require"
3133
"go.uber.org/mock/gomock"
@@ -215,3 +217,82 @@ func TestGetShardOwner(t *testing.T) {
215217
})
216218
}
217219
}
220+
221+
func TestWatchNamespaceState(t *testing.T) {
222+
ctrl := gomock.NewController(t)
223+
logger := testlogger.New(t)
224+
mockStorage := store.NewMockStore(ctrl)
225+
mockServer := NewMockWatchNamespaceStateServer(ctrl)
226+
227+
cfg := config.ShardDistribution{
228+
Namespaces: []config.Namespace{
229+
{Name: "test-ns", Type: config.NamespaceTypeFixed, ShardNum: 2},
230+
},
231+
}
232+
233+
handler := &handlerImpl{
234+
logger: logger,
235+
shardDistributionCfg: cfg,
236+
storage: mockStorage,
237+
startWG: sync.WaitGroup{},
238+
}
239+
240+
t.Run("successful streaming", func(t *testing.T) {
241+
ctx, cancel := context.WithCancel(context.Background())
242+
243+
initialState := &store.NamespaceState{
244+
ShardAssignments: map[string]store.AssignedState{
245+
"executor-1": {
246+
AssignedShards: map[string]*types.ShardAssignment{
247+
"shard-1": {},
248+
},
249+
},
250+
},
251+
}
252+
253+
updatesChan := make(chan map[*store.ShardOwner][]string, 1)
254+
unsubscribe := func() { close(updatesChan) }
255+
256+
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
257+
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(initialState, nil)
258+
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(updatesChan, unsubscribe, nil)
259+
260+
// Expect initial state send
261+
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
262+
require.Len(t, resp.Executors, 1)
263+
require.Equal(t, "executor-1", resp.Executors[0].ExecutorID)
264+
return nil
265+
})
266+
267+
// Expect update send
268+
mockServer.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *types.WatchNamespaceStateResponse) error {
269+
require.Len(t, resp.Executors, 1)
270+
require.Equal(t, "executor-2", resp.Executors[0].ExecutorID)
271+
return nil
272+
})
273+
274+
// Send update, then cancel
275+
go func() {
276+
time.Sleep(10 * time.Millisecond)
277+
updatesChan <- map[*store.ShardOwner][]string{
278+
{ExecutorID: "executor-2", Metadata: map[string]string{}}: {"shard-2"},
279+
}
280+
cancel()
281+
}()
282+
283+
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
284+
require.Error(t, err)
285+
require.ErrorIs(t, err, context.Canceled)
286+
})
287+
288+
t.Run("storage error on initial state", func(t *testing.T) {
289+
ctx := context.Background()
290+
mockServer.EXPECT().Context().Return(ctx).AnyTimes()
291+
mockStorage.EXPECT().GetState(gomock.Any(), "test-ns").Return(nil, errors.New("storage error"))
292+
mockStorage.EXPECT().SubscribeToAssignmentChanges(gomock.Any(), "test-ns").Return(make(chan map[*store.ShardOwner][]string), func() {}, nil)
293+
294+
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
295+
require.Error(t, err)
296+
require.Contains(t, err.Error(), "get initial state")
297+
})
298+
}

service/sharddistributor/store/etcd/executorstore/etcdstore.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -288,6 +288,10 @@ func (s *executorStoreImpl) GetState(ctx context.Context, namespace string) (*st
288288
}, nil
289289
}
290290

291+
func (s *executorStoreImpl) SubscribeToAssignmentChanges(ctx context.Context, namespace string) (<-chan map[*store.ShardOwner][]string, func(), error) {
292+
return s.shardCache.Subscribe(ctx, namespace)
293+
}
294+
291295
func (s *executorStoreImpl) Subscribe(ctx context.Context, namespace string) (<-chan int64, error) {
292296
revisionChan := make(chan int64, 1)
293297
watchPrefix := etcdkeys.BuildExecutorPrefix(s.prefix, namespace)

service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,15 @@ type namespaceShardToExecutor struct {
1919
sync.RWMutex
2020

2121
shardToExecutor map[string]*store.ShardOwner
22+
executorState map[*store.ShardOwner][]string // executor -> shardIDs
2223
executorRevision map[string]int64
2324
namespace string
2425
etcdPrefix string
2526
changeUpdateChannel clientv3.WatchChan
2627
stopCh chan struct{}
2728
logger log.Logger
2829
client *clientv3.Client
30+
pubSub *executorStatePubSub
2931
}
3032

3133
func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.Client, stopCh chan struct{}, logger log.Logger) (*namespaceShardToExecutor, error) {
@@ -35,13 +37,15 @@ func newNamespaceShardToExecutor(etcdPrefix, namespace string, client *clientv3.
3537

3638
return &namespaceShardToExecutor{
3739
shardToExecutor: make(map[string]*store.ShardOwner),
40+
executorState: make(map[*store.ShardOwner][]string),
3841
executorRevision: make(map[string]int64),
3942
namespace: namespace,
4043
etcdPrefix: etcdPrefix,
4144
changeUpdateChannel: watchChan,
4245
stopCh: stopCh,
4346
logger: logger,
4447
client: client,
48+
pubSub: newExecutorStatePubSub(logger, namespace),
4549
}, nil
4650
}
4751

@@ -94,6 +98,10 @@ func (n *namespaceShardToExecutor) GetExecutorModRevisionCmp() ([]clientv3.Cmp,
9498
return comparisons, nil
9599
}
96100

101+
func (n *namespaceShardToExecutor) Subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
102+
return n.pubSub.subscribe(ctx)
103+
}
104+
97105
func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
98106
for {
99107
select {
@@ -124,7 +132,24 @@ func (n *namespaceShardToExecutor) nameSpaceRefreashLoop() {
124132
}
125133

126134
func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
135+
err := n.refreshExecutorState(ctx)
136+
if err != nil {
137+
return fmt.Errorf("refresh executor state: %w", err)
138+
}
139+
140+
n.RLock()
141+
executorState := make(map[*store.ShardOwner][]string)
142+
for executor, shardIDs := range n.executorState {
143+
executorState[executor] = make([]string, len(shardIDs))
144+
copy(executorState[executor], shardIDs)
145+
}
146+
n.RUnlock()
147+
148+
n.pubSub.publish(n.executorState)
149+
return nil
150+
}
127151

152+
func (n *namespaceShardToExecutor) refreshExecutorState(ctx context.Context) error {
128153
executorPrefix := etcdkeys.BuildExecutorPrefix(n.etcdPrefix, n.namespace)
129154

130155
resp, err := n.client.Get(ctx, executorPrefix, clientv3.WithPrefix())
@@ -136,6 +161,7 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
136161
defer n.Unlock()
137162
// Clear the cache, so we don't have any stale data
138163
n.shardToExecutor = make(map[string]*store.ShardOwner)
164+
n.executorState = make(map[*store.ShardOwner][]string)
139165
n.executorRevision = make(map[string]int64)
140166

141167
shardOwners := make(map[string]*store.ShardOwner)
@@ -154,10 +180,15 @@ func (n *namespaceShardToExecutor) refresh(ctx context.Context) error {
154180
if err != nil {
155181
return fmt.Errorf("parse assigned state: %w", err)
156182
}
183+
184+
// Build both shard->executor and executor->shards mappings
185+
shardIDs := make([]string, 0, len(assignedState.AssignedShards))
157186
for shardID := range assignedState.AssignedShards {
158187
n.shardToExecutor[shardID] = shardOwner
188+
shardIDs = append(shardIDs, shardID)
159189
n.executorRevision[executorID] = kv.ModRevision
160190
}
191+
n.executorState[shardOwner] = shardIDs
161192

162193
case etcdkeys.ExecutorMetadataKey:
163194
shardOwner := getOrCreateShardOwner(shardOwners, executorID)
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package shardcache
2+
3+
import (
4+
"context"
5+
"sync"
6+
7+
"github.com/google/uuid"
8+
9+
"github.com/uber/cadence/common/log"
10+
"github.com/uber/cadence/common/log/tag"
11+
"github.com/uber/cadence/service/sharddistributor/store"
12+
)
13+
14+
// executorStatePubSub manages subscriptions to executor state changes
15+
type executorStatePubSub struct {
16+
mu sync.RWMutex
17+
subscribers map[string]chan<- map[*store.ShardOwner][]string
18+
logger log.Logger
19+
namespace string
20+
}
21+
22+
func newExecutorStatePubSub(logger log.Logger, namespace string) *executorStatePubSub {
23+
return &executorStatePubSub{
24+
subscribers: make(map[string]chan<- map[*store.ShardOwner][]string),
25+
logger: logger,
26+
namespace: namespace,
27+
}
28+
}
29+
30+
// Subscribe returns a channel that receives executor state updates.
31+
func (p *executorStatePubSub) subscribe(ctx context.Context) (<-chan map[*store.ShardOwner][]string, func()) {
32+
ch := make(chan map[*store.ShardOwner][]string)
33+
uniqueID := uuid.New().String()
34+
35+
p.mu.Lock()
36+
defer p.mu.Unlock()
37+
p.subscribers[uniqueID] = ch
38+
39+
unSub := func() {
40+
p.unSubscribe(uniqueID)
41+
}
42+
43+
return ch, unSub
44+
}
45+
46+
func (p *executorStatePubSub) unSubscribe(uniqueID string) {
47+
p.mu.Lock()
48+
defer p.mu.Unlock()
49+
delete(p.subscribers, uniqueID)
50+
}
51+
52+
// Publish sends the state to all subscribers (non-blocking)
53+
func (p *executorStatePubSub) publish(state map[*store.ShardOwner][]string) {
54+
p.mu.RLock()
55+
defer p.mu.RUnlock()
56+
57+
for _, sub := range p.subscribers {
58+
select {
59+
case sub <- state:
60+
default:
61+
// Subscriber is not reading fast enough, skip this update
62+
p.logger.Warn("Subscriber not keeping up with state updates, dropping update", tag.ShardNamespace(p.namespace))
63+
}
64+
}
65+
}

0 commit comments

Comments
 (0)