Skip to content

Commit 74ec3a7

Browse files
committed
fix: 修复多个代码质量和稳定性问题
本次提交根据AI代码审核意见,修复了多个潜在的bug和改进了代码质量: 1. 缓冲池内存管理优化 - 修复 buffer_pool.go 中缓冲池重置问题 - 在 Grow 前先 Reset,避免非空缓冲池导致的过度分配 - 确保返回的缓冲区处于干净状态 2. 状态更新通道监控增强 - 为 keypool/provider.go 的 UpdateStatus 方法添加详细的监控日志 - 记录通道溢出事件,包含 key_id、group_id 等关键信息 - 添加注释说明同步回退可能增加客户端延迟的影响 3. 请求日志计数器一致性修复 - 修复 request_log_service.go 中 pendingCount 递减逻辑不一致问题 - 确保无论 Del 操作是否成功,都递减计数器 - 防止计数器漂移导致的内存压力检测失效 4. 数据库迁移健壮性提升 - 为 v1_22_0_UpdatePrioritySemantics 迁移添加表存在性检查 - 避免在部分安装环境中迁移失败 5. 接口文档完善 - 为 store.Store 接口的 SCard 方法添加语义文档 - 明确缺失键返回 0 的行为,保持实现一致性 6. 通道兼容性注释优化 - 为 channel_compatibility.go 的 fallback 逻辑添加详细注释 - 说明未知格式的处理策略 7. Hub服务批量更新设计说明 - 为 BatchUpdateModelGroupPriorities 添加设计注释 - 说明部分成功策略和未来改进方向 8. 测试覆盖率提升 - 为 group_service_test.go 添加默认优先级和无效优先级测试用例 - 验证 Sort 字段的默认值和范围验证逻辑 所有修改已通过单元测试验证,无编译错误和 lint 警告。
1 parent 75d344b commit 74ec3a7

File tree

8 files changed

+73
-15
lines changed

8 files changed

+73
-15
lines changed

internal/centralizedmgmt/channel_compatibility.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,9 @@ var channelCompatibilityMap = map[types.RelayFormat]ChannelCompatibility{
9191
func GetCompatibleChannels(format types.RelayFormat) []string {
9292
compat, exists := channelCompatibilityMap[format]
9393
if !exists {
94-
// Fallback to OpenAI for truly unknown formats not in the map
94+
// Fallback to OpenAI for formats not defined in the map
95+
// This handles edge cases like typos or future formats not yet added to the map
96+
// The defensive fallback ensures requests with unknown paths can still be routed
9597
return []string{"openai"}
9698
}
9799

internal/centralizedmgmt/hub_service.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1104,6 +1104,12 @@ func (s *HubService) UpdateModelGroupPriority(ctx context.Context, modelName str
11041104
// allowing the batch operation to partially succeed rather than failing entirely.
11051105
// This design choice enables resilient batch operations where some updates may have
11061106
// validation issues while others can proceed successfully.
1107+
//
1108+
// Design Note: Callers receive no indication of which updates were skipped.
1109+
// This is intentional to maintain API simplicity and backward compatibility.
1110+
// Skipped updates are logged with logrus.Warn for operational monitoring.
1111+
// If detailed feedback is needed in the future, consider returning a summary
1112+
// struct (e.g., {updated: N, skipped: M, skippedItems: []...}) instead of error.
11071113
func (s *HubService) BatchUpdateModelGroupPriorities(ctx context.Context, updates []UpdateModelGroupPriorityParams) error {
11081114
return s.db.WithContext(ctx).Transaction(func(tx *gorm.DB) error {
11091115
for _, update := range updates {

internal/db/migrations/v1_22_0_UpdatePrioritySemantics.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@ import (
1717
func V1_22_0_UpdatePrioritySemantics(db *gorm.DB) error {
1818
logrus.Info("Starting migration: Update priority semantics (0→1000 for disabled)")
1919

20+
// Check if table exists to avoid migration failures on partial installs
21+
if !db.Migrator().HasTable("hub_model_group_priorities") {
22+
logrus.Info("Table hub_model_group_priorities does not exist, skipping priority semantics update")
23+
return nil
24+
}
25+
2026
// Update hub_model_group_priorities table
2127
// Change all priority=0 to priority=1000
2228
result := db.Exec(`

internal/keypool/provider.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,10 @@ func (p *KeyProvider) SelectKey(groupID uint) (*models.APIKey, error) {
205205

206206
// UpdateStatus submits a key status update task to the worker pool.
207207
// Uses bounded concurrency to prevent resource exhaustion.
208+
// Logs a warning when channel is full to enable monitoring of backpressure.
209+
// Note: Synchronous fallback blocks callers (proxy error handlers) on store operations,
210+
// which may increase client response latency under sustained load. This is acceptable
211+
// as it provides backpressure to prevent unbounded goroutine creation.
208212
func (p *KeyProvider) UpdateStatus(apiKey *models.APIKey, group *models.Group, isSuccess bool, errorMessage string) {
209213
task := statusUpdateTask{
210214
apiKey: apiKey,
@@ -220,7 +224,12 @@ func (p *KeyProvider) UpdateStatus(apiKey *models.APIKey, group *models.Group, i
220224
// Channel full, process synchronously to avoid data loss
221225
// Note: Using sync processing instead of spawning goroutine to prevent
222226
// unbounded goroutine creation when channel is persistently full
223-
logrus.Warn("Status update channel full, processing synchronously")
227+
// Log warning to enable monitoring of channel overflow events
228+
logrus.WithFields(logrus.Fields{
229+
"key_id": apiKey.ID,
230+
"group_id": group.ID,
231+
"is_success": isSuccess,
232+
}).Warn("Status update channel full (1000 capacity), processing synchronously - may increase client latency")
224233
p.processStatusUpdate(task)
225234
}
226235
}

internal/services/group_service_test.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,32 @@ func TestCreateGroup(t *testing.T) {
157157
},
158158
expectError: false,
159159
},
160+
{
161+
name: "default sort when omitted",
162+
params: GroupCreateParams{
163+
Name: "default-sort-group",
164+
GroupType: "standard",
165+
Upstreams: json.RawMessage(`[{"url":"https://api.openai.com","weight":100}]`),
166+
ChannelType: "openai",
167+
TestModel: "gpt-3.5-turbo",
168+
ValidationEndpoint: "/v1/chat/completions",
169+
// Sort omitted -> expect default 100
170+
},
171+
expectError: false,
172+
},
173+
{
174+
name: "invalid sort range",
175+
params: GroupCreateParams{
176+
Name: "invalid-sort",
177+
GroupType: "standard",
178+
Upstreams: json.RawMessage(`[{"url":"https://api.openai.com","weight":100}]`),
179+
ChannelType: "openai",
180+
Sort: 1000,
181+
TestModel: "gpt-3.5-turbo",
182+
ValidationEndpoint: "/v1/chat/completions",
183+
},
184+
expectError: true,
185+
},
160186
{
161187
name: "invalid group name",
162188
params: GroupCreateParams{
@@ -216,6 +242,10 @@ func TestCreateGroup(t *testing.T) {
216242
assert.NotNil(t, group)
217243
assert.NotZero(t, group.ID)
218244
assert.Equal(t, tt.params.Name, group.Name)
245+
// Verify default sort value when omitted
246+
if tt.params.Sort == 0 {
247+
assert.Equal(t, 100, group.Sort, "Expected default sort value of 100 when omitted")
248+
}
219249
}
220250
})
221251
}

internal/services/request_log_service.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -270,14 +270,14 @@ func (s *RequestLogService) flush() {
270270
}
271271

272272
if len(logs) == 0 {
273-
// Delete corrupted keys only if deletion succeeds, then decrement counter
273+
// Decrement pendingCount regardless of Del success since keys are already popped from set
274+
// This prevents counter drift when Del fails but keys are already removed from tracking set
274275
if len(badKeys) > 0 {
275276
if err := s.store.Del(badKeys...); err != nil {
276277
logrus.WithError(err).Error("Failed to delete corrupted log bodies from store")
277-
} else {
278-
// Only decrement counter after successful deletion to maintain accuracy
279-
atomic.AddInt64(&s.pendingCount, -int64(len(badKeys)))
280278
}
279+
// Decrement regardless of Del success since keys are already popped from set
280+
atomic.AddInt64(&s.pendingCount, -int64(len(badKeys)))
281281
}
282282
if len(retryKeys) > 0 {
283283
args := make([]any, len(retryKeys))
@@ -311,14 +311,14 @@ func (s *RequestLogService) flush() {
311311
logrus.Errorf("CRITICAL: Failed to re-add failed log keys to set: %v", saddErr)
312312
}
313313
}
314-
// Delete corrupted keys only if deletion succeeds, then decrement counter
314+
// Decrement pendingCount regardless of Del success since keys are already popped from set
315+
// This prevents counter drift when Del fails but keys are already removed from tracking set
315316
if len(badKeys) > 0 {
316317
if delErr := s.store.Del(badKeys...); delErr != nil {
317318
logrus.WithError(delErr).Error("Failed to delete corrupted log bodies from store")
318-
} else {
319-
// Only decrement counter after successful deletion to maintain accuracy
320-
atomic.AddInt64(&s.pendingCount, -int64(len(badKeys)))
321319
}
320+
// Decrement regardless of Del success since keys are already popped from set
321+
atomic.AddInt64(&s.pendingCount, -int64(len(badKeys)))
322322
}
323323
// Decrement pendingCount for missing keys to prevent counter drift
324324
if missingCount > 0 {

internal/store/store.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ type Store interface {
5454
// SET operations
5555
SAdd(key string, members ...any) error
5656
SPopN(key string, count int64) ([]string, error)
57+
// SCard returns the set cardinality (number of members).
58+
// Returns 0 with nil error for missing keys to maintain consistency across implementations.
5759
SCard(key string) (int64, error)
5860

5961
// Close closes the store and releases any underlying resources.

internal/utils/buffer_pool.go

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -91,12 +91,15 @@ func GetBufferWithCapacity(capacity int) *bytes.Buffer {
9191
return buf
9292
}
9393

94+
// Reset buffer to ensure clean state and correct length for Grow calculation
95+
// This prevents over-allocation when non-empty buffers slip into the pool
96+
buf.Reset()
97+
9498
// Ensure the buffer has at least the requested capacity to avoid reallocation
95-
// This is critical for performance when the caller knows the required size
96-
// bytes.Buffer.Grow(n) ensures space for n more bytes relative to current length
97-
// So we need to grow by (capacity - current_length) when capacity > current_capacity
98-
if buf.Cap() < capacity {
99-
buf.Grow(capacity)
99+
// bytes.Buffer.Grow(n) reserves space for n more bytes relative to current length
100+
// After Reset(), length is 0, so we grow by the delta from current capacity
101+
if capacity > 0 && buf.Cap() < capacity {
102+
buf.Grow(capacity - buf.Len())
100103
}
101104

102105
return buf

0 commit comments

Comments
 (0)