Skip to content

Commit 8cb60cb

Browse files
authored
chore(shard-distributor): classify errors (#7466)
1 parent adb4a98 commit 8cb60cb

File tree

4 files changed

+42
-16
lines changed

4 files changed

+42
-16
lines changed

service/sharddistributor/handler/executor.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -50,19 +50,18 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
5050
previousHeartbeat, assignedShards, err := h.storage.GetHeartbeat(ctx, request.Namespace, request.ExecutorID)
5151
// We ignore Executor not found errors, since it just means that this executor heartbeat the first time.
5252
if err != nil && !errors.Is(err, store.ErrExecutorNotFound) {
53-
return nil, fmt.Errorf("get heartbeat: %w", err)
53+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get heartbeat: %v", err)}
5454
}
5555

5656
now := h.timeSource.Now().UTC()
5757
mode := h.migrationConfiguration.GetMigrationMode(request.Namespace)
5858

5959
switch mode {
6060
case types.MigrationModeINVALID:
61-
h.logger.Warn("Migration mode is invalid", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
62-
return nil, fmt.Errorf("migration mode is invalid")
61+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("namespace's migration mode is invalid: %v", err)}
6362
case types.MigrationModeLOCALPASSTHROUGH:
6463
h.logger.Warn("Migration mode is local passthrough, no calls to heartbeat allowed", tag.ShardNamespace(request.Namespace), tag.ShardExecutor(request.ExecutorID))
65-
return nil, fmt.Errorf("migration mode is local passthrough")
64+
return nil, &types.BadRequestError{Message: "migration mode is local passthrough, no calls to heartbeat allowed"}
6665
// From SD perspective the behaviour is the same
6766
case types.MigrationModeLOCALPASSTHROUGHSHADOW, types.MigrationModeDISTRIBUTEDPASSTHROUGH:
6867
assignedShards, err = h.assignShardsInCurrentHeartbeat(ctx, request)
@@ -88,12 +87,12 @@ func (h *executor) Heartbeat(ctx context.Context, request *types.ExecutorHeartbe
8887
}
8988

9089
if err := validateMetadata(newHeartbeat.Metadata); err != nil {
91-
return nil, fmt.Errorf("validate metadata: %w", err)
90+
return nil, types.BadRequestError{Message: fmt.Sprintf("invalid metadata: %s", err)}
9291
}
9392

9493
err = h.storage.RecordHeartbeat(ctx, request.Namespace, request.ExecutorID, newHeartbeat)
9594
if err != nil {
96-
return nil, fmt.Errorf("record heartbeat: %w", err)
95+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to record heartbeat: %v", err)}
9796
}
9897

9998
return _convertResponse(assignedShards, mode), nil
@@ -108,7 +107,7 @@ func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *
108107
}
109108
err := h.storage.DeleteExecutors(ctx, request.GetNamespace(), []string{request.GetExecutorID()}, store.NopGuard())
110109
if err != nil {
111-
return nil, fmt.Errorf("delete executors: %w", err)
110+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to delete assigned shards: %v", err)}
112111
}
113112
for shard := range request.GetShardStatusReports() {
114113
assignedShards.AssignedShards[shard] = &types.ShardAssignment{
@@ -124,7 +123,7 @@ func (h *executor) assignShardsInCurrentHeartbeat(ctx context.Context, request *
124123
}
125124
err = h.storage.AssignShards(ctx, request.GetNamespace(), assignShardsRequest, store.NopGuard())
126125
if err != nil {
127-
return nil, fmt.Errorf("assign shards in current heartbeat: %w", err)
126+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to assign shards in the current heartbeat: %v", err)}
128127
}
129128
return &assignedShards, nil
130129
}

service/sharddistributor/handler/executor_test.go

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -395,8 +395,35 @@ func TestHeartbeat(t *testing.T) {
395395

396396
_, err := handler.Heartbeat(ctx, req)
397397
require.Error(t, err)
398-
require.Contains(t, err.Error(), "validate metadata")
399-
require.Contains(t, err.Error(), "exceeds the maximum")
398+
require.Contains(t, err.Error(), "invalid metadata: metadata has 33 keys, which exceeds the maximum of 32")
399+
})
400+
401+
// Test Case: Heartbeat with executor associated with MigrationModeLOCALPASSTHROUGH (should error)
402+
t.Run("MigrationModeLOCALPASSTHROUGH", func(t *testing.T) {
403+
ctrl := gomock.NewController(t)
404+
mockStore := store.NewMockStore(ctrl)
405+
mockTimeSource := clock.NewMockedTimeSource()
406+
shardDistributionCfg := config.ShardDistribution{
407+
Namespaces: []config.Namespace{{Name: namespace, Mode: config.MigrationModeLOCALPASSTHROUGH}},
408+
}
409+
migrationConfig := newMigrationConfig(t, []configEntry{{dynamicproperties.MigrationMode, config.MigrationModeLOCALPASSTHROUGH}})
410+
handler := NewExecutorHandler(testlogger.New(t), mockStore, mockTimeSource, shardDistributionCfg, migrationConfig)
411+
412+
req := &types.ExecutorHeartbeatRequest{
413+
Namespace: namespace,
414+
ExecutorID: executorID,
415+
Status: types.ExecutorStatusACTIVE,
416+
}
417+
previousHeartbeat := store.HeartbeatState{
418+
LastHeartbeat: now.Unix(),
419+
Status: types.ExecutorStatusACTIVE,
420+
}
421+
422+
mockStore.EXPECT().GetHeartbeat(gomock.Any(), namespace, executorID).Return(&previousHeartbeat, nil, nil)
423+
424+
_, err := handler.Heartbeat(ctx, req)
425+
require.Error(t, err)
426+
require.Contains(t, err.Error(), "migration mode is local passthrough, no calls to heartbeat allowed")
400427
})
401428
}
402429

service/sharddistributor/handler/handler.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func (h *handlerImpl) GetShardOwner(ctx context.Context, request *types.GetShard
9999
}
100100
}
101101
if err != nil {
102-
return nil, fmt.Errorf("get shard owner: %w", err)
102+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get shard owner: %v", err)}
103103
}
104104

105105
resp = &types.GetShardOwnerResponse{
@@ -116,7 +116,7 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string
116116
// Get the current state of the namespace and find the executor with the least assigned shards
117117
state, err := h.storage.GetState(ctx, namespace)
118118
if err != nil {
119-
return nil, fmt.Errorf("get state: %w", err)
119+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to get namespace state: %v", err)}
120120
}
121121

122122
var executor string
@@ -132,7 +132,7 @@ func (h *handlerImpl) assignEphemeralShard(ctx context.Context, namespace string
132132
// Assign the shard to the executor with the least assigned shards
133133
err = h.storage.AssignShard(ctx, namespace, shardID, executor)
134134
if err != nil {
135-
return nil, fmt.Errorf("assign ephemeral shard: %w", err)
135+
return nil, &types.InternalServiceError{Message: fmt.Sprintf("failed to assign ephemeral shard: %v", err)}
136136
}
137137

138138
return &types.GetShardOwnerResponse{
@@ -148,13 +148,13 @@ func (h *handlerImpl) WatchNamespaceState(request *types.WatchNamespaceStateRequ
148148
assignmentChangesChan, unSubscribe, err := h.storage.SubscribeToAssignmentChanges(server.Context(), request.Namespace)
149149
defer unSubscribe()
150150
if err != nil {
151-
return fmt.Errorf("subscribe to namespace state: %w", err)
151+
return &types.InternalServiceError{Message: fmt.Sprintf("failed to subscribe to namespace state: %v", err)}
152152
}
153153

154154
// Send initial state immediately so client doesn't have to wait for first update
155155
state, err := h.storage.GetState(server.Context(), request.Namespace)
156156
if err != nil {
157-
return fmt.Errorf("get initial state: %w", err)
157+
return &types.InternalServiceError{Message: fmt.Sprintf("failed to get namespace state: %v", err)}
158158
}
159159
response := toWatchNamespaceStateResponse(state)
160160
if err := server.Send(response); err != nil {

service/sharddistributor/handler/handler_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,6 +293,6 @@ func TestWatchNamespaceState(t *testing.T) {
293293

294294
err := handler.WatchNamespaceState(&types.WatchNamespaceStateRequest{Namespace: "test-ns"}, mockServer)
295295
require.Error(t, err)
296-
require.Contains(t, err.Error(), "get initial state")
296+
require.Contains(t, err.Error(), "failed to get namespace state: storage error")
297297
})
298298
}

0 commit comments

Comments
 (0)