Skip to content

Commit 1fe3117

Browse files
committed
refactor: 优化优雅关机和完善失败请求处理
- 使用 channel 通信替代定时轮询,请求完成时实时打印日志 - 标记失败请求时设置 end_time 和计算 duration_ms - 同步处理 upstream_attempt 表的失败状态
1 parent 7838872 commit 1fe3117

File tree

7 files changed

+87
-17
lines changed

7 files changed

+87
-17
lines changed

cmd/maxx/main.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,12 @@ func main() {
121121
} else if count > 0 {
122122
log.Printf("Marked %d stale requests as failed", count)
123123
}
124+
// Also mark stale upstream attempts as failed
125+
if count, err := attemptRepo.MarkStaleAttemptsFailed(); err != nil {
126+
log.Printf("Warning: Failed to mark stale attempts: %v", err)
127+
} else if count > 0 {
128+
log.Printf("Marked %d stale upstream attempts as failed", count)
129+
}
124130

125131
// Create cached repositories
126132
cachedProviderRepo := cached.NewProviderRepository(providerRepo)
@@ -344,15 +350,15 @@ func main() {
344350
activeCount := requestTracker.ActiveCount()
345351
if activeCount > 0 {
346352
log.Printf("Waiting for %d active proxy requests to complete...", activeCount)
347-
completed := requestTracker.GracefulShutdown(core.GracefulShutdownTimeout, core.GracefulShutdownCheckInterval)
353+
completed := requestTracker.GracefulShutdown(core.GracefulShutdownTimeout)
348354
if !completed {
349355
log.Printf("Graceful shutdown timeout, some requests may be interrupted")
350356
} else {
351357
log.Printf("All proxy requests completed successfully")
352358
}
353359
} else {
354360
// Mark as shutting down to reject new requests
355-
requestTracker.GracefulShutdown(0, time.Second)
361+
requestTracker.GracefulShutdown(0)
356362
log.Printf("No active proxy requests")
357363
}
358364

internal/core/database.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,12 @@ func InitializeServerComponents(
172172
} else if count > 0 {
173173
log.Printf("[Core] Marked %d stale requests as failed", count)
174174
}
175+
// Also mark stale upstream attempts as failed
176+
if count, err := repos.AttemptRepo.MarkStaleAttemptsFailed(); err != nil {
177+
log.Printf("[Core] Warning: Failed to mark stale attempts: %v", err)
178+
} else if count > 0 {
179+
log.Printf("[Core] Marked %d stale upstream attempts as failed", count)
180+
}
175181

176182
log.Printf("[Core] Loading cached data")
177183
if err := repos.CachedProviderRepo.Load(); err != nil {

internal/core/request_tracker.go

Lines changed: 33 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,9 @@ type RequestTracker struct {
1414
wg sync.WaitGroup
1515
shutdownCh chan struct{}
1616
isShutdown atomic.Bool
17+
// notifyCh is used to notify when a request completes during shutdown
18+
notifyCh chan struct{}
19+
notifyMu sync.Mutex
1720
}
1821

1922
// NewRequestTracker creates a new request tracker
@@ -36,8 +39,23 @@ func (t *RequestTracker) Add() bool {
3639

3740
// Done decrements the active request count
3841
func (t *RequestTracker) Done() {
39-
atomic.AddInt64(&t.activeCount, -1)
42+
remaining := atomic.AddInt64(&t.activeCount, -1)
4043
t.wg.Done()
44+
45+
// Notify shutdown goroutine if shutting down
46+
if t.isShutdown.Load() {
47+
t.notifyMu.Lock()
48+
ch := t.notifyCh
49+
t.notifyMu.Unlock()
50+
if ch != nil {
51+
select {
52+
case ch <- struct{}{}:
53+
default:
54+
// Non-blocking send, channel might be full or closed
55+
}
56+
}
57+
log.Printf("[RequestTracker] Request completed, %d remaining", remaining)
58+
}
4159
}
4260

4361
// ActiveCount returns the current number of active requests
@@ -97,8 +115,12 @@ func (t *RequestTracker) ShutdownCh() <-chan struct{} {
97115

98116
// GracefulShutdown initiates graceful shutdown and waits for requests to complete
99117
// maxWait: maximum time to wait for requests to complete
100-
// checkInterval: how often to log remaining requests
101-
func (t *RequestTracker) GracefulShutdown(maxWait time.Duration, checkInterval time.Duration) bool {
118+
func (t *RequestTracker) GracefulShutdown(maxWait time.Duration) bool {
119+
// Setup notify channel before marking shutdown
120+
t.notifyMu.Lock()
121+
t.notifyCh = make(chan struct{}, 100) // Buffered to avoid blocking Done()
122+
t.notifyMu.Unlock()
123+
102124
t.isShutdown.Store(true)
103125
close(t.shutdownCh)
104126

@@ -116,19 +138,21 @@ func (t *RequestTracker) GracefulShutdown(maxWait time.Duration, checkInterval t
116138
close(done)
117139
}()
118140

119-
ticker := time.NewTicker(checkInterval)
120-
defer ticker.Stop()
121-
122141
deadline := time.After(maxWait)
123142

124143
for {
125144
select {
126145
case <-done:
127146
log.Printf("[RequestTracker] All requests completed, shutdown clean")
128147
return true
129-
case <-ticker.C:
130-
remaining := t.ActiveCount()
131-
log.Printf("[RequestTracker] Waiting for %d active requests to complete...", remaining)
148+
case <-t.notifyCh:
149+
// Request completed notification received, log is printed in Done()
150+
// Check if all done
151+
if t.ActiveCount() == 0 {
152+
<-done // Wait for wg.Wait() to complete
153+
log.Printf("[RequestTracker] All requests completed, shutdown clean")
154+
return true
155+
}
132156
case <-deadline:
133157
remaining := t.ActiveCount()
134158
log.Printf("[RequestTracker] Timeout reached, %d requests still active, forcing shutdown", remaining)

internal/core/server.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,6 @@ import (
1313
const (
1414
// GracefulShutdownTimeout is the maximum time to wait for active requests
1515
GracefulShutdownTimeout = 2 * time.Minute
16-
// GracefulShutdownCheckInterval is how often to log remaining requests
17-
GracefulShutdownCheckInterval = 5 * time.Second
1816
// HTTPShutdownTimeout is the timeout for HTTP server shutdown after requests complete
1917
HTTPShutdownTimeout = 5 * time.Second
2018
)
@@ -136,16 +134,15 @@ func (s *ManagedServer) Stop(ctx context.Context) error {
136134
if activeCount > 0 {
137135
log.Printf("[Server] Waiting for %d active proxy requests to complete...", activeCount)
138136

139-
// Use GracefulShutdown with logging
140-
completed := tracker.GracefulShutdown(GracefulShutdownTimeout, GracefulShutdownCheckInterval)
137+
completed := tracker.GracefulShutdown(GracefulShutdownTimeout)
141138
if !completed {
142139
log.Printf("[Server] Graceful shutdown timeout, some requests may be interrupted")
143140
} else {
144141
log.Printf("[Server] All proxy requests completed successfully")
145142
}
146143
} else {
147144
// Mark as shutting down to reject new requests
148-
tracker.GracefulShutdown(0, time.Second)
145+
tracker.GracefulShutdown(0)
149146
log.Printf("[Server] No active proxy requests")
150147
}
151148
}

internal/repository/interfaces.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ type ProxyUpstreamAttemptRepository interface {
116116
UpdateCost(id uint64, cost uint64) error
117117
// BatchUpdateCosts updates costs for multiple attempts in a single transaction
118118
BatchUpdateCosts(updates map[uint64]uint64) error
119+
// MarkStaleAttemptsFailed marks stale attempts as failed with proper end_time and duration
120+
MarkStaleAttemptsFailed() (int64, error)
119121
}
120122

121123
type SystemSettingRepository interface {

internal/repository/sqlite/proxy_request.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,25 +150,32 @@ func (r *ProxyRequestRepository) CountWithFilter(filter *repository.ProxyRequest
150150

151151
// MarkStaleAsFailed marks all IN_PROGRESS/PENDING requests from other instances as FAILED
152152
// Also marks requests that have been IN_PROGRESS for too long (> 30 minutes) as timed out
153+
// Sets proper end_time and duration_ms for complete failure handling
153154
func (r *ProxyRequestRepository) MarkStaleAsFailed(currentInstanceID string) (int64, error) {
154155
timeoutThreshold := time.Now().Add(-30 * time.Minute).UnixMilli()
155156
now := time.Now().UnixMilli()
156157

157158
// Use raw SQL for complex CASE expression
159+
// Sets end_time = now and calculates duration_ms = now - start_time
158160
result := r.db.gorm.Exec(`
159161
UPDATE proxy_requests
160162
SET status = 'FAILED',
161163
error = CASE
162164
WHEN instance_id IS NULL OR instance_id != ? THEN 'Server restarted'
163165
ELSE 'Request timed out (stuck in progress)'
164166
END,
167+
end_time = ?,
168+
duration_ms = CASE
169+
WHEN start_time > 0 THEN ? - start_time
170+
ELSE 0
171+
END,
165172
updated_at = ?
166173
WHERE status IN ('PENDING', 'IN_PROGRESS')
167174
AND (
168175
(instance_id IS NULL OR instance_id != ?)
169176
OR (start_time < ? AND start_time > 0)
170177
)`,
171-
currentInstanceID, now, currentInstanceID, timeoutThreshold,
178+
currentInstanceID, now, now, now, currentInstanceID, timeoutThreshold,
172179
)
173180
if result.Error != nil {
174181
return 0, result.Error

internal/repository/sqlite/proxy_upstream_attempt.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,34 @@ func (r *ProxyUpstreamAttemptRepository) UpdateCost(id uint64, cost uint64) erro
133133
return r.db.gorm.Model(&ProxyUpstreamAttempt{}).Where("id = ?", id).Update("cost", cost).Error
134134
}
135135

136+
// MarkStaleAttemptsFailed marks all IN_PROGRESS/PENDING attempts belonging to stale requests as FAILED
137+
// This should be called after MarkStaleAsFailed on proxy_requests to clean up orphaned attempts
138+
// Sets proper end_time and duration_ms for complete failure handling
139+
func (r *ProxyUpstreamAttemptRepository) MarkStaleAttemptsFailed() (int64, error) {
140+
now := time.Now().UnixMilli()
141+
142+
// Update attempts that belong to FAILED requests but are still in progress
143+
result := r.db.gorm.Exec(`
144+
UPDATE proxy_upstream_attempts
145+
SET status = 'FAILED',
146+
end_time = ?,
147+
duration_ms = CASE
148+
WHEN start_time > 0 THEN ? - start_time
149+
ELSE 0
150+
END,
151+
updated_at = ?
152+
WHERE status IN ('PENDING', 'IN_PROGRESS')
153+
AND proxy_request_id IN (
154+
SELECT id FROM proxy_requests WHERE status = 'FAILED'
155+
)`,
156+
now, now, now,
157+
)
158+
if result.Error != nil {
159+
return 0, result.Error
160+
}
161+
return result.RowsAffected, nil
162+
}
163+
136164
// BatchUpdateCosts updates costs for multiple attempts in a single transaction
137165
func (r *ProxyUpstreamAttemptRepository) BatchUpdateCosts(updates map[uint64]uint64) error {
138166
if len(updates) == 0 {

0 commit comments

Comments
 (0)