Skip to content

Commit ae751eb

Browse files
committed
feat: 优雅退出
1 parent 99fd68d commit ae751eb

File tree

8 files changed

+142
-54
lines changed

8 files changed

+142
-54
lines changed

internal/app/app.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"context"
66
"fmt"
77
"net/http"
8+
"sync"
89
"time"
910

1011
"gpt-load/internal/config"
@@ -168,19 +169,47 @@ func (a *App) Start() error {
168169
func (a *App) Stop(ctx context.Context) {
169170
logrus.Info("Shutting down server...")
170171

171-
// Shutdown http server
172172
if err := a.httpServer.Shutdown(ctx); err != nil {
173173
logrus.Errorf("Server forced to shutdown: %v", err)
174174
}
175175

176-
// Stop background services
177-
a.cronChecker.Stop()
178-
a.leaderLock.Stop()
179-
a.logCleanupService.Stop()
180-
a.requestLogService.Stop()
181-
a.groupManager.Stop()
182-
a.settingsManager.Stop()
183-
a.storage.Close()
176+
stoppableServices := []func(context.Context){
177+
a.cronChecker.Stop,
178+
a.leaderLock.Stop,
179+
a.logCleanupService.Stop,
180+
a.requestLogService.Stop,
181+
a.groupManager.Stop,
182+
a.settingsManager.Stop,
183+
}
184+
185+
var wg sync.WaitGroup
186+
wg.Add(len(stoppableServices))
187+
188+
for _, stopFunc := range stoppableServices {
189+
go func(stop func(context.Context)) {
190+
defer wg.Done()
191+
stop(ctx)
192+
}(stopFunc)
193+
}
194+
195+
// Wait for all services to stop, or for the context to be done.
196+
done := make(chan struct{})
197+
go func() {
198+
wg.Wait()
199+
close(done)
200+
}()
201+
202+
select {
203+
case <-done:
204+
logrus.Info("All background services stopped.")
205+
case <-ctx.Done():
206+
logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.")
207+
}
208+
209+
// Step 3: Close storage connection last.
210+
if a.storage != nil {
211+
a.storage.Close()
212+
}
184213

185214
logrus.Info("Server exited gracefully")
186215
}

internal/config/system_settings.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package config
22

33
import (
4+
"context"
45
"encoding/json"
56
"fmt"
67
"gpt-load/internal/db"
@@ -106,7 +107,7 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager,
106107
}
107108

108109
// Stop gracefully stops the SystemSettingsManager's background syncer.
109-
func (sm *SystemSettingsManager) Stop() {
110+
func (sm *SystemSettingsManager) Stop(ctx context.Context) {
110111
if sm.syncer != nil {
111112
sm.syncer.Stop()
112113
}

internal/keypool/cron_checker.go

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package keypool
22

33
import (
4+
"context"
45
"gpt-load/internal/config"
56
"gpt-load/internal/models"
67
"gpt-load/internal/store"
@@ -44,12 +45,23 @@ func (s *CronChecker) Start() {
4445
go s.runLoop()
4546
}
4647

47-
// Stop stops the cron job.
48-
func (s *CronChecker) Stop() {
49-
logrus.Info("Stopping CronChecker...")
48+
// Stop stops the cron job, respecting the context for shutdown timeout.
49+
func (s *CronChecker) Stop(ctx context.Context) {
5050
close(s.stopChan)
51-
s.wg.Wait()
52-
logrus.Info("CronChecker stopped.")
51+
52+
// Wait for the goroutine to finish, or for the shutdown to time out.
53+
done := make(chan struct{})
54+
go func() {
55+
s.wg.Wait()
56+
close(done)
57+
}()
58+
59+
select {
60+
case <-done:
61+
logrus.Info("CronChecker stopped gracefully.")
62+
case <-ctx.Done():
63+
logrus.Warn("CronChecker stop timed out.")
64+
}
5365
}
5466

5567
func (s *CronChecker) runLoop() {
@@ -104,8 +116,12 @@ func (s *CronChecker) submitValidationJobs() {
104116
validatedCount := len(invalidKeys)
105117
becameValidCount := 0
106118
if validatedCount > 0 {
107-
logrus.Debugf("CronChecker: Found %d invalid keys to validate for group %s.", validatedCount, group.Name)
108119
for j := range invalidKeys {
120+
select {
121+
case <-s.stopChan:
122+
return
123+
default:
124+
}
109125
key := &invalidKeys[j]
110126
isValid, _ := s.Validator.ValidateSingleKey(key, group)
111127

internal/services/group_manager.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package services
22

33
import (
4+
"context"
45
"fmt"
56
"gpt-load/internal/config"
67
"gpt-load/internal/models"
@@ -93,7 +94,7 @@ func (gm *GroupManager) Invalidate() error {
9394
}
9495

9596
// Stop gracefully stops the GroupManager's background syncer.
96-
func (gm *GroupManager) Stop() {
97+
func (gm *GroupManager) Stop(ctx context.Context) {
9798
if gm.syncer != nil {
9899
gm.syncer.Stop()
99100
}

internal/services/log_cleanup_service.go

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package services
22

33
import (
4+
"context"
45
"gpt-load/internal/config"
56
"gpt-load/internal/models"
67
"gpt-load/internal/store"
8+
"sync"
79
"time"
810

911
"github.com/sirupsen/logrus"
@@ -16,6 +18,7 @@ type LogCleanupService struct {
1618
settingsManager *config.SystemSettingsManager
1719
leaderLock *store.LeaderLock
1820
stopCh chan struct{}
21+
wg sync.WaitGroup
1922
}
2023

2124
// NewLogCleanupService 创建新的日志清理服务
@@ -30,18 +33,32 @@ func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsMan
3033

3134
// Start 启动日志清理服务
3235
func (s *LogCleanupService) Start() {
36+
s.wg.Add(1)
3337
go s.run()
3438
logrus.Debug("Log cleanup service started")
3539
}
3640

3741
// Stop 停止日志清理服务
38-
func (s *LogCleanupService) Stop() {
42+
func (s *LogCleanupService) Stop(ctx context.Context) {
3943
close(s.stopCh)
40-
logrus.Info("Log cleanup service stopped")
44+
45+
done := make(chan struct{})
46+
go func() {
47+
s.wg.Wait()
48+
close(done)
49+
}()
50+
51+
select {
52+
case <-done:
53+
logrus.Info("LogCleanupService stopped gracefully.")
54+
case <-ctx.Done():
55+
logrus.Warn("LogCleanupService stop timed out.")
56+
}
4157
}
4258

4359
// run 运行日志清理的主循环
4460
func (s *LogCleanupService) run() {
61+
defer s.wg.Done()
4562
ticker := time.NewTicker(2 * time.Hour)
4663
defer ticker.Stop()
4764

internal/services/request_log_service.go

Lines changed: 45 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"gpt-load/internal/models"
99
"gpt-load/internal/store"
1010
"strings"
11+
"sync"
1112
"time"
1213

1314
"github.com/google/uuid"
@@ -28,61 +29,77 @@ type RequestLogService struct {
2829
store store.Store
2930
settingsManager *config.SystemSettingsManager
3031
leaderLock *store.LeaderLock
31-
ctx context.Context
32-
cancel context.CancelFunc
32+
stopChan chan struct{}
33+
wg sync.WaitGroup
3334
ticker *time.Ticker
3435
}
3536

3637
// NewRequestLogService creates a new RequestLogService instance
3738
func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService {
38-
ctx, cancel := context.WithCancel(context.Background())
3939
return &RequestLogService{
4040
db: db,
4141
store: store,
4242
settingsManager: sm,
4343
leaderLock: ls,
44-
ctx: ctx,
45-
cancel: cancel,
44+
stopChan: make(chan struct{}),
4645
}
4746
}
4847

4948
// Start initializes the service and starts the periodic flush routine
5049
func (s *RequestLogService) Start() {
51-
go s.flush()
50+
s.wg.Add(1)
51+
go s.runLoop()
52+
}
53+
54+
func (s *RequestLogService) runLoop() {
55+
defer s.wg.Done()
56+
57+
// Initial flush on start
58+
s.flush()
5259

5360
interval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
5461
if interval <= 0 {
5562
interval = time.Minute
5663
}
5764
s.ticker = time.NewTicker(interval)
65+
defer s.ticker.Stop()
5866

59-
go func() {
60-
for {
61-
select {
62-
case <-s.ticker.C:
63-
newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
64-
if newInterval <= 0 {
65-
newInterval = time.Minute
66-
}
67-
if newInterval != interval {
68-
s.ticker.Reset(newInterval)
69-
interval = newInterval
70-
logrus.Debugf("Request log write interval updated to: %v", interval)
71-
}
72-
s.flush()
73-
case <-s.ctx.Done():
74-
s.ticker.Stop()
75-
logrus.Info("RequestLogService stopped.")
76-
return
67+
for {
68+
select {
69+
case <-s.ticker.C:
70+
newInterval := time.Duration(s.settingsManager.GetSettings().RequestLogWriteIntervalMinutes) * time.Minute
71+
if newInterval <= 0 {
72+
newInterval = time.Minute
73+
}
74+
if newInterval != interval {
75+
s.ticker.Reset(newInterval)
76+
interval = newInterval
77+
logrus.Debugf("Request log write interval updated to: %v", interval)
7778
}
79+
s.flush()
80+
case <-s.stopChan:
81+
return
7882
}
79-
}()
83+
}
8084
}
8185

8286
// Stop gracefully stops the RequestLogService
83-
func (s *RequestLogService) Stop() {
84-
s.flush()
85-
s.cancel()
87+
func (s *RequestLogService) Stop(ctx context.Context) {
88+
close(s.stopChan)
89+
90+
done := make(chan struct{})
91+
go func() {
92+
s.wg.Wait()
93+
close(done)
94+
}()
95+
96+
select {
97+
case <-done:
98+
s.flush()
99+
logrus.Info("RequestLogService stopped gracefully.")
100+
case <-ctx.Done():
101+
logrus.Warn("RequestLogService stop timed out.")
102+
}
86103
}
87104

88105
// Record logs a request to the database and cache

internal/store/leader_lock.go

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,18 +79,28 @@ func (s *LeaderLock) Start() error {
7979
}
8080

8181
// Stop gracefully stops the leadership maintenance process.
82-
func (s *LeaderLock) Stop() {
82+
func (s *LeaderLock) Stop(ctx context.Context) {
8383
if s.isSingleNode {
8484
return
8585
}
86-
logrus.Info("Stopping leadership maintenance process...")
8786
close(s.stopChan)
88-
s.wg.Wait()
87+
88+
done := make(chan struct{})
89+
go func() {
90+
s.wg.Wait()
91+
close(done)
92+
}()
93+
94+
select {
95+
case <-done:
96+
logrus.Info("Leadership maintenance process stopped gracefully.")
97+
case <-ctx.Done():
98+
logrus.Warn("Leadership maintenance process stop timed out.")
99+
}
89100

90101
if s.isLeader.Load() {
91102
s.releaseLock()
92103
}
93-
logrus.Info("Leadership maintenance process stopped.")
94104
}
95105

96106
// IsLeader returns true if the current node is the leader.
@@ -176,7 +186,6 @@ func (s *LeaderLock) maintainLeadershipLoop() {
176186
logrus.WithError(err).Warn("Error during leadership maintenance cycle.")
177187
}
178188
case <-s.stopChan:
179-
logrus.Info("Leadership maintenance loop stopping.")
180189
return
181190
}
182191
}

internal/syncer/cache_syncer.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ func (s *CacheSyncer[T]) Invalidate() error {
6868

6969
// Stop gracefully shuts down the syncer's background goroutine.
7070
func (s *CacheSyncer[T]) Stop() {
71-
s.logger.Debug("stopping cache syncer...")
7271
close(s.stopChan)
7372
s.wg.Wait()
7473
s.logger.Info("cache syncer stopped.")
@@ -139,7 +138,6 @@ func (s *CacheSyncer[T]) listenForUpdates() {
139138
s.logger.Errorf("failed to reload cache after notification: %v", err)
140139
}
141140
case <-s.stopChan:
142-
s.logger.Info("received stop signal, exiting subscriber loop.")
143141
if err := subscription.Close(); err != nil {
144142
s.logger.Errorf("failed to close subscription: %v", err)
145143
}

0 commit comments

Comments
 (0)