Skip to content

Commit 2ed8d48

Browse files
authored
Merge pull request #117 from xunxun1982/xxdev
feat: 优化429限速错误的健康度惩罚机制
2 parents 762c683 + 970b2c1 commit 2ed8d48

14 files changed

+336
-76
lines changed

internal/centralizedmgmt/hub_service.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -548,7 +548,9 @@ func (s *HubService) calculateAggregateGroupHealthScore(aggregateGroupID uint) f
548548
// calculateAggregateGroupHealthScoreWithVisited calculates health score with cycle detection.
549549
// Uses path-scoped visited set to prevent infinite recursion on circular aggregate group references.
550550
// The visited set is scoped to the current recursion path, allowing shared sub-groups in DAG structures.
551-
func (s *HubService) calculateAggregateGroupHealthScoreWithVisited(aggregateGroupID uint, visited map[uint]struct{}) float64 {
551+
// NOTE: Currently nested aggregates are not supported (validated at sub-group creation time),
552+
// so the visited parameter is not actively used but kept for future extensibility.
553+
func (s *HubService) calculateAggregateGroupHealthScoreWithVisited(aggregateGroupID uint, _ map[uint]struct{}) float64 {
552554
// Get sub-group relationships
553555
var subGroupRels []models.GroupSubGroup
554556
if err := s.db.Where("group_id = ? AND weight > 0", aggregateGroupID).

internal/centralizedmgmt/hub_service_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func createTestSubGroup(t *testing.T, db *gorm.DB, aggregateGroupID, subGroupID
134134
// setupHubService creates a HubService with test dependencies
135135
// Note: GroupManager is set to nil for most tests since HubService queries DB directly.
136136
// For tests that need SelectGroupForModel, we need to set up GroupManager properly.
137-
func setupHubService(t *testing.T, db *gorm.DB) *HubService {
137+
func setupHubService(_ *testing.T, db *gorm.DB) *HubService {
138138
// Create a mock store
139139
mockStore := store.NewMemoryStore()
140140

@@ -150,7 +150,7 @@ func setupHubService(t *testing.T, db *gorm.DB) *HubService {
150150
// This is needed for tests that use SelectGroupForModel
151151
// Note: This requires the global db.DB to be set, which is complex in unit tests.
152152
// For now, we skip tests that require GroupManager if setup fails.
153-
func setupHubServiceWithGroupManager(t *testing.T, db *gorm.DB) *HubService {
153+
func setupHubServiceWithGroupManager(t *testing.T, _ *gorm.DB) *HubService {
154154
t.Skip("Skipping test that requires GroupManager - complex setup with global db.DB")
155155
return nil
156156
}
@@ -2396,7 +2396,7 @@ func TestCalculateGroupHealthScore(t *testing.T) {
23962396
hubService.dynamicWeightManager.RecordGroupSuccess(group.ID)
23972397
}
23982398
for i := 0; i < 3; i++ {
2399-
hubService.dynamicWeightManager.RecordGroupFailure(group.ID)
2399+
hubService.dynamicWeightManager.RecordGroupFailure(group.ID, false)
24002400
}
24012401

24022402
score := hubService.calculateGroupHealthScore(group)
@@ -2413,7 +2413,7 @@ func TestCalculateGroupHealthScore(t *testing.T) {
24132413
t.Run("consecutive_failures_reduce_health", func(t *testing.T) {
24142414
// Record 5 consecutive failures
24152415
for i := 0; i < 5; i++ {
2416-
hubService.dynamicWeightManager.RecordGroupFailure(group.ID)
2416+
hubService.dynamicWeightManager.RecordGroupFailure(group.ID, false)
24172417
}
24182418

24192419
score := hubService.calculateGroupHealthScore(group)
@@ -2458,15 +2458,15 @@ func TestGroupHealthScoreVsSubGroupMetrics(t *testing.T) {
24582458
for i := 0; i < 9; i++ {
24592459
hubService.dynamicWeightManager.RecordGroupSuccess(standardGroup.ID)
24602460
}
2461-
hubService.dynamicWeightManager.RecordGroupFailure(standardGroup.ID)
2461+
hubService.dynamicWeightManager.RecordGroupFailure(standardGroup.ID, false)
24622462

24632463
// Record poor performance in aggregate group (used for aggregate health score)
24642464
// Record 1 success first, then 8 failures to get low success rate + consecutive failures
24652465
for i := 0; i < 1; i++ {
24662466
hubService.dynamicWeightManager.RecordSubGroupSuccess(aggGroup.ID, standardGroup.ID)
24672467
}
24682468
for i := 0; i < 8; i++ {
2469-
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, standardGroup.ID)
2469+
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, standardGroup.ID, false)
24702470
}
24712471

24722472
// Hub health score should reflect group-level metrics (90% success rate)
@@ -2545,7 +2545,7 @@ func TestCalculateAggregateGroupHealthScore(t *testing.T) {
25452545
hubService.dynamicWeightManager.RecordSubGroupSuccess(aggGroup.ID, subGroup2.ID)
25462546
}
25472547
for i := 0; i < 3; i++ {
2548-
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, subGroup2.ID)
2548+
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, subGroup2.ID, false)
25492549
}
25502550

25512551
// Calculate aggregate health score (weighted average of sub-groups)
@@ -2588,7 +2588,7 @@ func TestHealthScoreInModelPool(t *testing.T) {
25882588
hubService.dynamicWeightManager.RecordGroupSuccess(group.ID)
25892589
}
25902590
for i := 0; i < 2; i++ {
2591-
hubService.dynamicWeightManager.RecordGroupFailure(group.ID)
2591+
hubService.dynamicWeightManager.RecordGroupFailure(group.ID, false)
25922592
}
25932593

25942594
// Manually test health score calculation
@@ -2670,7 +2670,7 @@ func TestAggregateHealthWithSkewedUsage(t *testing.T) {
26702670
hubService.dynamicWeightManager.RecordSubGroupSuccess(aggGroup.ID, subGroupB.ID)
26712671
}
26722672
for i := 0; i < 8; i++ {
2673-
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, subGroupB.ID)
2673+
hubService.dynamicWeightManager.RecordSubGroupFailure(aggGroup.ID, subGroupB.ID, false)
26742674
}
26752675

26762676
// Sub-group C: 0 requests (never used)

internal/proxy/codex_cc_support_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ func TestCodexCCThinkingBlockWindowsPathConversion(t *testing.T) {
149149
}
150150

151151
// Helper function to convert Codex response map to Claude response
152-
func convertCodexToClaudeResponseFromMap(codexResp map[string]interface{}, reverseToolNameMap map[string]string) *ClaudeResponse {
152+
func convertCodexToClaudeResponseFromMap(codexResp map[string]interface{}, _ map[string]string) *ClaudeResponse {
153153
// This is a simplified version for testing
154154
claudeResp := &ClaudeResponse{
155155
ID: "msg_test",

internal/proxy/gemini_cc_support.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -400,9 +400,10 @@ func convertClaudeMessageToGemini(msg ClaudeMessage, toolNameShortMap map[string
400400
var userParts []GeminiPart
401401
frIndex := 0 // Track which function response to use (matches order of tool_result blocks)
402402
for _, block := range blocks {
403-
if block.Type == "text" {
403+
switch block.Type {
404+
case "text":
404405
userParts = append(userParts, GeminiPart{Text: block.Text})
405-
} else if block.Type == "tool_result" {
406+
case "tool_result":
406407
// Flush accumulated user text before function response
407408
if len(userParts) > 0 {
408409
result = append(result, GeminiContent{
@@ -421,6 +422,8 @@ func convertClaudeMessageToGemini(msg ClaudeMessage, toolNameShortMap map[string
421422
Parts: []GeminiPart{{FunctionResponse: &fr}},
422423
})
423424
}
425+
default:
426+
// Ignore unknown block types
424427
}
425428
}
426429
// Flush any remaining user text

internal/proxy/server.go

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -757,8 +757,10 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
757757
// direct API calls that should preserve their original query parameters.
758758
// Also check /v1beta/messages for Gemini CC conversion (path may be rewritten to /v1beta/messages)
759759
if isCCSupportEnabled(group) && (strings.HasSuffix(c.Request.URL.Path, "/v1/messages") || strings.HasSuffix(c.Request.URL.Path, "/v1beta/messages")) {
760-
// Handle Codex channel CC support (Claude -> Codex/Responses API)
761-
if group.ChannelType == "codex" {
760+
// Handle channel-specific CC support conversions
761+
switch group.ChannelType {
762+
case "codex":
763+
// Handle Codex channel CC support (Claude -> Codex/Responses API)
762764
convertedBody, converted, ccErr := ps.applyCodexCCRequestConversion(c, group, finalBodyBytes)
763765
if ccErr != nil {
764766
logrus.WithError(ccErr).WithFields(logrus.Fields{
@@ -784,7 +786,7 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
784786
"new_path": c.Request.URL.Path,
785787
}).Debug("Codex CC support: converted Claude request to Codex format")
786788
}
787-
} else if group.ChannelType == "gemini" {
789+
case "gemini":
788790
// Handle Gemini channel CC support (Claude -> Gemini API)
789791
convertedBody, converted, ccErr := ps.applyGeminiCCRequestConversion(c, group, finalBodyBytes)
790792
if ccErr != nil {
@@ -809,7 +811,7 @@ func (ps *ProxyServer) HandleProxy(c *gin.Context) {
809811
"new_path": c.Request.URL.Path,
810812
}).Debug("Gemini CC support: converted Claude request to Gemini format")
811813
}
812-
} else {
814+
default:
813815
// Handle OpenAI channel CC support (Claude -> OpenAI Chat Completions)
814816
convertedBody, converted, ccErr := ps.applyCCRequestConversionDirect(c, group, finalBodyBytes)
815817
if ccErr != nil {
@@ -1377,8 +1379,10 @@ func (ps *ProxyServer) executeRequestWithAggregateRetry(
13771379
isMessagesEndpoint := strings.HasSuffix(c.Request.URL.Path, "/v1/messages") ||
13781380
strings.HasSuffix(c.Request.URL.Path, "/v1beta/messages")
13791381
if isCCSupportEnabled(group) && isMessagesEndpoint {
1380-
// Handle Codex channel CC support (Claude -> Codex/Responses API)
1381-
if group.ChannelType == "codex" {
1382+
// Handle channel-specific CC support conversions
1383+
switch group.ChannelType {
1384+
case "codex":
1385+
// Handle Codex channel CC support (Claude -> Codex/Responses API)
13821386
// Sanitize query parameters for Codex CC (remove Claude-specific params like beta=true)
13831387
// This is needed even if path wasn't /claude/ since Anthropic aggregate may send directly to /v1/messages
13841388
sanitizeCCQueryParams(c.Request.URL)
@@ -1440,7 +1444,7 @@ func (ps *ProxyServer) executeRequestWithAggregateRetry(
14401444
"new_path": c.Request.URL.Path,
14411445
}).Debug("Codex CC support: converted Claude request for sub-group")
14421446
}
1443-
} else if group.ChannelType == "gemini" {
1447+
case "gemini":
14441448
// Handle Gemini channel CC support (Claude -> Gemini API)
14451449
sanitizeCCQueryParams(c.Request.URL)
14461450

@@ -1469,7 +1473,7 @@ func (ps *ProxyServer) executeRequestWithAggregateRetry(
14691473
"new_path": c.Request.URL.Path,
14701474
}).Debug("Gemini CC support: converted Claude request for sub-group")
14711475
}
1472-
} else {
1476+
default:
14731477
// Handle OpenAI channel CC support (Claude -> OpenAI Chat Completions)
14741478
convertedBody, converted, ccErr := ps.applyCCRequestConversionDirect(c, group, finalBodyBytes)
14751479
if ccErr != nil {
@@ -2198,7 +2202,7 @@ func (ps *ProxyServer) logRequest(
21982202
// This ensures that failed sub-group attempts are reflected in health scores,
21992203
// even when the overall aggregate request succeeds via retry to another sub-group.
22002204
if ps.dynamicWeightManager != nil {
2201-
ps.recordDynamicWeightMetrics(c, originalGroup, group, logEntry.IsSuccess)
2205+
ps.recordDynamicWeightMetrics(c, originalGroup, group, logEntry.IsSuccess, statusCode)
22022206
}
22032207
}
22042208

@@ -2209,23 +2213,30 @@ func (ps *ProxyServer) logRequest(
22092213
// or unavailable, it can add tail latency or reduce throughput. The current implementation
22102214
// prioritizes simplicity and correctness. For production deployments with strict latency SLAs,
22112215
// consider async/buffering and ensure strict client timeouts in the store implementation.
2212-
func (ps *ProxyServer) recordDynamicWeightMetrics(c *gin.Context, originalGroup, group *models.Group, isSuccess bool) {
2216+
func (ps *ProxyServer) recordDynamicWeightMetrics(c *gin.Context, originalGroup, group *models.Group, isSuccess bool, statusCode int) {
22132217
if ps.dynamicWeightManager == nil {
22142218
return
22152219
}
22162220

2221+
// Determine if this is a rate limit error (429)
2222+
// Rate limit errors receive lighter penalties as they indicate temporary throttling
2223+
// rather than service unavailability
2224+
isRateLimit := !isSuccess && statusCode == 429
2225+
22172226
// Record sub-group metrics for aggregate groups
22182227
if originalGroup != nil && originalGroup.GroupType == "aggregate" && originalGroup.ID != group.ID {
22192228
if isSuccess {
22202229
ps.dynamicWeightManager.RecordSubGroupSuccess(originalGroup.ID, group.ID)
22212230
} else {
2222-
ps.dynamicWeightManager.RecordSubGroupFailure(originalGroup.ID, group.ID)
2231+
ps.dynamicWeightManager.RecordSubGroupFailure(originalGroup.ID, group.ID, isRateLimit)
22232232
}
22242233

22252234
logrus.WithFields(logrus.Fields{
22262235
"aggregate_group_id": originalGroup.ID,
22272236
"sub_group_id": group.ID,
22282237
"is_success": isSuccess,
2238+
"is_rate_limit": isRateLimit,
2239+
"status_code": statusCode,
22292240
}).Debug("Recorded dynamic weight metrics for sub-group")
22302241
}
22312242

@@ -2235,12 +2246,14 @@ func (ps *ProxyServer) recordDynamicWeightMetrics(c *gin.Context, originalGroup,
22352246
if isSuccess {
22362247
ps.dynamicWeightManager.RecordGroupSuccess(group.ID)
22372248
} else {
2238-
ps.dynamicWeightManager.RecordGroupFailure(group.ID)
2249+
ps.dynamicWeightManager.RecordGroupFailure(group.ID, isRateLimit)
22392250
}
22402251

22412252
logrus.WithFields(logrus.Fields{
2242-
"group_id": group.ID,
2243-
"is_success": isSuccess,
2253+
"group_id": group.ID,
2254+
"is_success": isSuccess,
2255+
"is_rate_limit": isRateLimit,
2256+
"status_code": statusCode,
22442257
}).Debug("Recorded dynamic weight metrics for standard group")
22452258
}
22462259

@@ -2264,15 +2277,17 @@ func (ps *ProxyServer) recordDynamicWeightMetrics(c *gin.Context, originalGroup,
22642277
if isSuccess {
22652278
ps.dynamicWeightManager.RecordModelRedirectSuccess(group.ID, originalModelStr, targetModel)
22662279
} else {
2267-
ps.dynamicWeightManager.RecordModelRedirectFailure(group.ID, originalModelStr, targetModel)
2280+
ps.dynamicWeightManager.RecordModelRedirectFailure(group.ID, originalModelStr, targetModel, isRateLimit)
22682281
}
22692282

22702283
logrus.WithFields(logrus.Fields{
2271-
"group_id": group.ID,
2272-
"source_model": originalModelStr,
2273-
"target_model": targetModel,
2274-
"target_index": targetIdxInt,
2275-
"is_success": isSuccess,
2284+
"group_id": group.ID,
2285+
"source_model": originalModelStr,
2286+
"target_model": targetModel,
2287+
"target_index": targetIdxInt,
2288+
"is_success": isSuccess,
2289+
"is_rate_limit": isRateLimit,
2290+
"status_code": statusCode,
22762291
}).Debug("Recorded dynamic weight metrics for model redirect")
22772292
}
22782293
}

0 commit comments

Comments
 (0)