Skip to content

Commit 1dd074e

Browse files
authored
feat(shard-distributor): add streaming WatchNamespaceState endpoint (#7426)
**What changed?** Added a new streaming gRPC endpoint `WatchNamespaceState` to the shard distributor service that allows spectators to observe namespace state changes in real-time. - Added `WatchNamespaceState` RPC to shard distributor proto with streaming response - Implemented types and mappers for `WatchNamespaceStateRequest`, `WatchNamespaceStateResponse`, and `ExecutorShardAssignment` - Updated grpc.tmpl template to support server-side streaming endpoints - Updated metered.tmpl to dynamically detect request parameter for namespace tagging - Added metrics scope for `WatchNamespaceState` operation - Handler implementation returns "not implemented" (stub for future work) **Why?** To enable real-time monitoring of shard distribution state across executors without polling, providing better observability for debugging and operational awareness. **How did you test it?** Unit tests **Potential risks** Low risk - the endpoint is not yet implemented (returns error), only the API surface and supporting infrastructure are added. **Release notes** - Added experimental streaming endpoint `WatchNamespaceState` to shard distributor (not yet implemented) **Documentation Changes** None required at this stage. --------- Signed-off-by: Jakob Haahr Taankvist <[email protected]>
1 parent b9d79ed commit 1dd074e

File tree

15 files changed

+1640
-229
lines changed

15 files changed

+1640
-229
lines changed

.gen/proto/sharddistributor/v1/service.pb.go

Lines changed: 1173 additions & 203 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

.gen/proto/sharddistributor/v1/service.pb.yarpc.go

Lines changed: 129 additions & 25 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

common/metrics/defs.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1468,6 +1468,7 @@ const (
14681468
const (
14691469
// ShardDistributorGetShardOwnerScope tracks GetShardOwner API calls received by service
14701470
ShardDistributorGetShardOwnerScope = iota + NumWorkerScopes
1471+
ShardDistributorWatchNamespaceStateScope
14711472
ShardDistributorHeartbeatScope
14721473
ShardDistributorAssignLoopScope
14731474

@@ -2153,6 +2154,7 @@ var ScopeDefs = map[ServiceIdx]map[ScopeIdx]scopeDefinition{
21532154
},
21542155
ShardDistributor: {
21552156
ShardDistributorGetShardOwnerScope: {operation: "GetShardOwner"},
2157+
ShardDistributorWatchNamespaceStateScope: {operation: "WatchNamespaceState"},
21562158
ShardDistributorHeartbeatScope: {operation: "ExecutorHeartbeat"},
21572159
ShardDistributorAssignLoopScope: {operation: "ShardAssignLoop"},
21582160
ShardDistributorExecutorScope: {operation: "Executor"},

common/types/mapper/proto/sharddistributor.go

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,3 +279,82 @@ func toMigrationMode(modeSD types.MigrationMode) sharddistributorv1.MigrationMod
279279
}
280280
return mode
281281
}
282+
283+
// FromShardDistributorWatchNamespaceStateRequest converts a types.WatchNamespaceStateRequest to a sharddistributor.WatchNamespaceStateRequest
284+
func FromShardDistributorWatchNamespaceStateRequest(t *types.WatchNamespaceStateRequest) *sharddistributorv1.WatchNamespaceStateRequest {
285+
if t == nil {
286+
return nil
287+
}
288+
return &sharddistributorv1.WatchNamespaceStateRequest{
289+
Namespace: t.GetNamespace(),
290+
}
291+
}
292+
293+
// ToShardDistributorWatchNamespaceStateRequest converts a sharddistributor.WatchNamespaceStateRequest to a types.WatchNamespaceStateRequest
294+
func ToShardDistributorWatchNamespaceStateRequest(t *sharddistributorv1.WatchNamespaceStateRequest) *types.WatchNamespaceStateRequest {
295+
if t == nil {
296+
return nil
297+
}
298+
return &types.WatchNamespaceStateRequest{
299+
Namespace: t.GetNamespace(),
300+
}
301+
}
302+
303+
// FromShardDistributorWatchNamespaceStateResponse converts a types.WatchNamespaceStateResponse to a sharddistributor.WatchNamespaceStateResponse
304+
func FromShardDistributorWatchNamespaceStateResponse(t *types.WatchNamespaceStateResponse) *sharddistributorv1.WatchNamespaceStateResponse {
305+
if t == nil {
306+
return nil
307+
}
308+
309+
var executors []*sharddistributorv1.ExecutorInfo
310+
311+
for _, executor := range t.GetExecutors() {
312+
// Convert the Shards
313+
shards := make([]*sharddistributorv1.Shard, 0, len(executor.GetAssignedShards()))
314+
for _, shard := range executor.GetAssignedShards() {
315+
shards = append(shards, &sharddistributorv1.Shard{
316+
ShardKey: shard.GetShardKey(),
317+
})
318+
}
319+
executors = append(executors, &sharddistributorv1.ExecutorInfo{
320+
ExecutorId: executor.GetExecutorID(),
321+
Metadata: executor.GetMetadata(),
322+
Shards: shards,
323+
})
324+
}
325+
326+
return &sharddistributorv1.WatchNamespaceStateResponse{
327+
Executors: executors,
328+
}
329+
}
330+
331+
// ToShardDistributorWatchNamespaceStateResponse converts a sharddistributor.WatchNamespaceStateResponse to a types.WatchNamespaceStateResponse
332+
func ToShardDistributorWatchNamespaceStateResponse(t *sharddistributorv1.WatchNamespaceStateResponse) *types.WatchNamespaceStateResponse {
333+
if t == nil {
334+
return nil
335+
}
336+
337+
var executors []*types.ExecutorShardAssignment
338+
if t.GetExecutors() != nil {
339+
executors = make([]*types.ExecutorShardAssignment, 0, len(t.GetExecutors()))
340+
for _, executor := range t.GetExecutors() {
341+
// Convert the Shards
342+
shards := make([]*types.Shard, 0, len(executor.GetShards()))
343+
for _, shard := range executor.GetShards() {
344+
shards = append(shards, &types.Shard{
345+
ShardKey: shard.GetShardKey(),
346+
})
347+
}
348+
349+
executors = append(executors, &types.ExecutorShardAssignment{
350+
ExecutorID: executor.GetExecutorId(),
351+
Metadata: executor.GetMetadata(),
352+
AssignedShards: shards,
353+
})
354+
}
355+
}
356+
357+
return &types.WatchNamespaceStateResponse{
358+
Executors: executors,
359+
}
360+
}

common/types/mapper/proto/sharddistributor_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,3 +54,15 @@ func TestToShardDistributorExecutorHeartbeatResponse(t *testing.T) {
5454
assert.Equal(t, item, ToShardDistributorExecutorHeartbeatResponse(FromShardDistributorExecutorHeartbeatResponse(item)))
5555
}
5656
}
57+
58+
func TestFromShardDistributorWatchNamespaceStateRequest(t *testing.T) {
59+
for _, item := range []*types.WatchNamespaceStateRequest{nil, {}, &testdata.ShardDistributorWatchNamespaceStateRequest} {
60+
assert.Equal(t, item, ToShardDistributorWatchNamespaceStateRequest(FromShardDistributorWatchNamespaceStateRequest(item)))
61+
}
62+
}
63+
64+
func TestFromShardDistributorWatchNamespaceStateResponse(t *testing.T) {
65+
for _, item := range []*types.WatchNamespaceStateResponse{nil, {}, &testdata.ShardDistributorWatchNamespaceStateResponse} {
66+
assert.Equal(t, item, ToShardDistributorWatchNamespaceStateResponse(FromShardDistributorWatchNamespaceStateResponse(item)))
67+
}
68+
}

common/types/sharddistributor.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,3 +226,63 @@ const (
226226
MigrationModeDISTRIBUTEDPASSTHROUGH MigrationMode = 3
227227
MigrationModeONBOARDED MigrationMode = 4
228228
)
229+
230+
type WatchNamespaceStateRequest struct {
231+
Namespace string
232+
}
233+
234+
func (v *WatchNamespaceStateRequest) GetNamespace() (o string) {
235+
if v != nil {
236+
return v.Namespace
237+
}
238+
return
239+
}
240+
241+
type WatchNamespaceStateResponse struct {
242+
Executors []*ExecutorShardAssignment
243+
}
244+
245+
func (v *WatchNamespaceStateResponse) GetExecutors() (o []*ExecutorShardAssignment) {
246+
if v != nil {
247+
return v.Executors
248+
}
249+
return
250+
}
251+
252+
type ExecutorShardAssignment struct {
253+
ExecutorID string
254+
AssignedShards []*Shard
255+
Metadata map[string]string
256+
}
257+
258+
func (v *ExecutorShardAssignment) GetExecutorID() (o string) {
259+
if v != nil {
260+
return v.ExecutorID
261+
}
262+
return
263+
}
264+
265+
func (v *ExecutorShardAssignment) GetAssignedShards() (o []*Shard) {
266+
if v != nil {
267+
return v.AssignedShards
268+
}
269+
return
270+
}
271+
272+
func (v *ExecutorShardAssignment) GetMetadata() (o map[string]string) {
273+
if v != nil {
274+
return v.Metadata
275+
}
276+
return
277+
}
278+
279+
type Shard struct {
280+
ShardKey string
281+
}
282+
283+
func (v *Shard) GetShardKey() (o string) {
284+
if v != nil {
285+
return v.ShardKey
286+
}
287+
return
288+
}

0 commit comments

Comments
 (0)