Skip to content

Commit fc63189

Browse files
committed
refactor: 重构集群主从节点模式
1 parent ee400af commit fc63189

File tree

12 files changed

+59
-364
lines changed

12 files changed

+59
-364
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
# 服务器配置
22
PORT=3001
33
HOST=0.0.0.0
4+
IS_SLAVE=false
45

56
# 服务器读取、写入和空闲连接的超时时间(秒)
67
SERVER_READ_TIMEOUT=120

internal/app/app.go

Lines changed: 41 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ type App struct {
3333
requestLogService *services.RequestLogService
3434
cronChecker *keypool.CronChecker
3535
keyPoolProvider *keypool.KeyProvider
36-
leaderLock *store.LeaderLock
3736
proxyServer *proxy.ProxyServer
3837
storage store.Store
3938
db *gorm.DB
@@ -51,7 +50,6 @@ type AppParams struct {
5150
RequestLogService *services.RequestLogService
5251
CronChecker *keypool.CronChecker
5352
KeyPoolProvider *keypool.KeyProvider
54-
LeaderLock *store.LeaderLock
5553
ProxyServer *proxy.ProxyServer
5654
Storage store.Store
5755
DB *gorm.DB
@@ -68,7 +66,6 @@ func NewApp(params AppParams) *App {
6866
requestLogService: params.RequestLogService,
6967
cronChecker: params.CronChecker,
7068
keyPoolProvider: params.KeyPoolProvider,
71-
leaderLock: params.LeaderLock,
7269
proxyServer: params.ProxyServer,
7370
storage: params.Storage,
7471
db: params.DB,
@@ -77,70 +74,50 @@ func NewApp(params AppParams) *App {
7774

7875
// Start runs the application, it is a non-blocking call.
7976
func (a *App) Start() error {
77+
// Master 节点执行初始化
78+
if a.configManager.IsMaster() {
79+
logrus.Info("Starting as Master Node.")
80+
81+
// 数据库迁移
82+
if err := a.db.AutoMigrate(
83+
&models.SystemSetting{},
84+
&models.Group{},
85+
&models.APIKey{},
86+
&models.RequestLog{},
87+
&models.GroupHourlyStat{},
88+
); err != nil {
89+
return fmt.Errorf("database auto-migration failed: %w", err)
90+
}
91+
logrus.Info("Database auto-migration completed.")
8092

81-
// 启动 Leader Lock 服务并等待选举结果
82-
if err := a.leaderLock.Start(); err != nil {
83-
return fmt.Errorf("leader service failed to start: %w", err)
84-
}
85-
86-
// Leader 节点执行初始化,Follower 节点等待
87-
if a.leaderLock.IsLeader() {
88-
logrus.Info("Leader mode. Performing initial one-time tasks...")
89-
acquired, err := a.leaderLock.AcquireInitializingLock()
90-
if err != nil {
91-
return fmt.Errorf("failed to acquire initializing lock: %w", err)
93+
// 初始化系统设置
94+
if err := a.settingsManager.EnsureSettingsInitialized(); err != nil {
95+
return fmt.Errorf("failed to initialize system settings: %w", err)
9296
}
93-
if !acquired {
94-
logrus.Warn("Could not acquire initializing lock, another leader might be active. Switching to follower mode for initialization.")
95-
if err := a.leaderLock.WaitForInitializationToComplete(); err != nil {
96-
return fmt.Errorf("failed to wait for initialization as a fallback follower: %w", err)
97-
}
98-
} else {
99-
defer a.leaderLock.ReleaseInitializingLock()
100-
101-
// 数据库迁移
102-
if err := a.db.AutoMigrate(
103-
&models.SystemSetting{},
104-
&models.Group{},
105-
&models.APIKey{},
106-
&models.RequestLog{},
107-
&models.GroupHourlyStat{},
108-
); err != nil {
109-
return fmt.Errorf("database auto-migration failed: %w", err)
110-
}
111-
logrus.Info("Database auto-migration completed.")
112-
113-
// 初始化系统设置
114-
if err := a.settingsManager.EnsureSettingsInitialized(); err != nil {
115-
return fmt.Errorf("failed to initialize system settings: %w", err)
116-
}
117-
logrus.Info("System settings initialized in DB.")
118-
119-
a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock)
120-
121-
// 从数据库加载密钥到 Redis
122-
if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil {
123-
return fmt.Errorf("failed to load keys into key pool: %w", err)
124-
}
125-
logrus.Debug("API keys loaded into Redis cache by leader.")
97+
logrus.Info("System settings initialized in DB.")
98+
99+
a.settingsManager.Initialize(a.storage, a.groupManager, a.configManager.IsMaster())
100+
101+
// 从数据库加载密钥到 Redis
102+
if err := a.keyPoolProvider.LoadKeysFromDB(); err != nil {
103+
return fmt.Errorf("failed to load keys into key pool: %w", err)
126104
}
105+
logrus.Debug("API keys loaded into Redis cache by master.")
106+
107+
// 仅 Master 节点启动的服务
108+
a.requestLogService.Start()
109+
a.logCleanupService.Start()
110+
a.cronChecker.Start()
127111
} else {
128-
logrus.Info("Follower Mode. Waiting for leader to complete initialization.")
129-
if err := a.leaderLock.WaitForInitializationToComplete(); err != nil {
130-
return fmt.Errorf("follower failed to start: %w", err)
131-
}
132-
a.settingsManager.Initialize(a.storage, a.groupManager, a.leaderLock)
112+
logrus.Info("Starting as Slave Node.")
113+
a.settingsManager.Initialize(a.storage, a.groupManager, a.configManager.IsMaster())
133114
}
134115

135116
// 显示配置并启动所有后台服务
136117
a.configManager.DisplayServerConfig()
137118

138119
a.groupManager.Initialize()
139120

140-
a.requestLogService.Start()
141-
a.logCleanupService.Start()
142-
a.cronChecker.Start()
143-
144121
// Create HTTP server
145122
serverConfig := a.configManager.GetEffectiveServerConfig()
146123
a.httpServer = &http.Server{
@@ -174,8 +151,6 @@ func (a *App) Stop(ctx context.Context) {
174151

175152
// 动态计算 HTTP 关机超时时间,为后台服务固定预留 5 秒
176153
httpShutdownTimeout := totalTimeout - 5*time.Second
177-
178-
// 为 HTTP 服务器的优雅关闭创建一个独立的 context
179154
httpShutdownCtx, cancelHttpShutdown := context.WithTimeout(context.Background(), httpShutdownTimeout)
180155
defer cancelHttpShutdown()
181156

@@ -190,14 +165,18 @@ func (a *App) Stop(ctx context.Context) {
190165

191166
// 使用原始的总超时 context 继续关闭其他后台服务
192167
stoppableServices := []func(context.Context){
193-
a.cronChecker.Stop,
194-
a.leaderLock.Stop,
195-
a.logCleanupService.Stop,
196-
a.requestLogService.Stop,
197168
a.groupManager.Stop,
198169
a.settingsManager.Stop,
199170
}
200171

172+
if serverConfig.IsMaster {
173+
stoppableServices = append(stoppableServices,
174+
a.cronChecker.Stop,
175+
a.logCleanupService.Stop,
176+
a.requestLogService.Stop,
177+
)
178+
}
179+
201180
var wg sync.WaitGroup
202181
wg.Add(len(stoppableServices))
203182

@@ -221,7 +200,6 @@ func (a *App) Stop(ctx context.Context) {
221200
logrus.Warn("Shutdown timed out, some services may not have stopped gracefully.")
222201
}
223202

224-
// Step 3: Close storage connection last.
225203
if a.storage != nil {
226204
a.storage.Close()
227205
}

internal/config/manager.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ func (m *Manager) ReloadConfig() error {
7070

7171
config := &Config{
7272
Server: types.ServerConfig{
73+
IsMaster: !utils.ParseBoolean(os.Getenv("IS_SLAVE"), true),
7374
Port: utils.ParseInteger(os.Getenv("PORT"), 3001),
7475
Host: utils.GetEnvOrDefault("HOST", "0.0.0.0"),
7576
ReadTimeout: utils.ParseInteger(os.Getenv("SERVER_READ_TIMEOUT"), 120),
@@ -111,6 +112,11 @@ func (m *Manager) ReloadConfig() error {
111112
return nil
112113
}
113114

115+
// IsMaster returns Server mode
116+
func (m *Manager) IsMaster() bool {
117+
return m.config.Server.IsMaster
118+
}
119+
114120
// GetAuthConfig returns authentication configuration
115121
func (m *Manager) GetAuthConfig() types.AuthConfig {
116122
return m.config.Auth

internal/config/system_settings.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,8 @@ type groupManager interface {
3636
Invalidate() error
3737
}
3838

39-
type leaderLock interface {
40-
IsLeader() bool
41-
}
42-
4339
// Initialize initializes the SystemSettingsManager with database and store dependencies.
44-
func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, leaderLock leaderLock) error {
40+
func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager, isMaster bool) error {
4541
settingsLoader := func() (types.SystemSettings, error) {
4642
var dbSettings []models.SystemSetting
4743
if err := db.DB.Find(&dbSettings).Error; err != nil {
@@ -83,12 +79,10 @@ func (sm *SystemSettingsManager) Initialize(store store.Store, gm groupManager,
8379
}
8480

8581
afterLoader := func(newData types.SystemSettings) {
86-
if !leaderLock.IsLeader() {
82+
if !isMaster {
8783
return
8884
}
89-
if err := gm.Invalidate(); err != nil {
90-
logrus.Debugf("Failed to invalidate group manager cache after settings update: %v", err)
91-
}
85+
gm.Invalidate()
9286
}
9387

9488
syncer, err := syncer.NewCacheSyncer(

internal/container/container.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,6 @@ func BuildContainer() (*dig.Container, error) {
3434
if err := container.Provide(store.NewStore); err != nil {
3535
return nil, err
3636
}
37-
if err := container.Provide(store.NewLeaderLock); err != nil {
38-
return nil, err
39-
}
4037
if err := container.Provide(httpclient.NewHTTPClientManager); err != nil {
4138
return nil, err
4239
}

internal/keypool/cron_checker.go

Lines changed: 3 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"gpt-load/internal/config"
66
"gpt-load/internal/models"
7-
"gpt-load/internal/store"
87
"sync"
98
"sync/atomic"
109
"time"
@@ -18,7 +17,6 @@ type CronChecker struct {
1817
DB *gorm.DB
1918
SettingsManager *config.SystemSettingsManager
2019
Validator *KeyValidator
21-
LeaderLock *store.LeaderLock
2220
stopChan chan struct{}
2321
wg sync.WaitGroup
2422
}
@@ -28,13 +26,11 @@ func NewCronChecker(
2826
db *gorm.DB,
2927
settingsManager *config.SystemSettingsManager,
3028
validator *KeyValidator,
31-
leaderLock *store.LeaderLock,
3229
) *CronChecker {
3330
return &CronChecker{
3431
DB: db,
3532
SettingsManager: settingsManager,
3633
Validator: validator,
37-
LeaderLock: leaderLock,
3834
stopChan: make(chan struct{}),
3935
}
4036
}
@@ -68,22 +64,16 @@ func (s *CronChecker) Stop(ctx context.Context) {
6864
func (s *CronChecker) runLoop() {
6965
defer s.wg.Done()
7066

71-
if s.LeaderLock.IsLeader() {
72-
s.submitValidationJobs()
73-
}
67+
s.submitValidationJobs()
7468

7569
ticker := time.NewTicker(5 * time.Minute)
7670
defer ticker.Stop()
7771

7872
for {
7973
select {
8074
case <-ticker.C:
81-
if s.LeaderLock.IsLeader() {
82-
logrus.Debug("CronChecker: Running as leader, submitting validation jobs.")
83-
s.submitValidationJobs()
84-
} else {
85-
logrus.Debug("CronChecker: Not the leader. Standing by.")
86-
}
75+
logrus.Debug("CronChecker: Running as Master, submitting validation jobs.")
76+
s.submitValidationJobs()
8777
case <-s.stopChan:
8878
return
8979
}

internal/services/log_cleanup_service.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"gpt-load/internal/config"
66
"gpt-load/internal/models"
7-
"gpt-load/internal/store"
87
"sync"
98
"time"
109

@@ -16,17 +15,15 @@ import (
1615
type LogCleanupService struct {
1716
db *gorm.DB
1817
settingsManager *config.SystemSettingsManager
19-
leaderLock *store.LeaderLock
2018
stopCh chan struct{}
2119
wg sync.WaitGroup
2220
}
2321

2422
// NewLogCleanupService 创建新的日志清理服务
25-
func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager, leaderLock *store.LeaderLock) *LogCleanupService {
23+
func NewLogCleanupService(db *gorm.DB, settingsManager *config.SystemSettingsManager) *LogCleanupService {
2624
return &LogCleanupService{
2725
db: db,
2826
settingsManager: settingsManager,
29-
leaderLock: leaderLock,
3027
stopCh: make(chan struct{}),
3128
}
3229
}
@@ -77,11 +74,6 @@ func (s *LogCleanupService) run() {
7774

7875
// cleanupExpiredLogs 清理过期的请求日志
7976
func (s *LogCleanupService) cleanupExpiredLogs() {
80-
if !s.leaderLock.IsLeader() {
81-
logrus.Debug("Not the leader, skipping log cleanup.")
82-
return
83-
}
84-
8577
// 获取日志保留天数配置
8678
settings := s.settingsManager.GetSettings()
8779
retentionDays := settings.RequestLogRetentionDays

internal/services/request_log_service.go

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -28,19 +28,17 @@ type RequestLogService struct {
2828
db *gorm.DB
2929
store store.Store
3030
settingsManager *config.SystemSettingsManager
31-
leaderLock *store.LeaderLock
3231
stopChan chan struct{}
3332
wg sync.WaitGroup
3433
ticker *time.Ticker
3534
}
3635

3736
// NewRequestLogService creates a new RequestLogService instance
38-
func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager, ls *store.LeaderLock) *RequestLogService {
37+
func NewRequestLogService(db *gorm.DB, store store.Store, sm *config.SystemSettingsManager) *RequestLogService {
3938
return &RequestLogService{
4039
db: db,
4140
store: store,
4241
settingsManager: sm,
43-
leaderLock: ls,
4442
stopChan: make(chan struct{}),
4543
}
4644
}
@@ -133,12 +131,7 @@ func (s *RequestLogService) flush() {
133131
return
134132
}
135133

136-
if !s.leaderLock.IsLeader() {
137-
logrus.Debug("Not a leader, skipping log flush.")
138-
return
139-
}
140-
141-
logrus.Debug("Leader starting to flush request logs...")
134+
logrus.Debug("Master starting to flush request logs...")
142135

143136
for {
144137
keys, err := s.store.SPopN(PendingLogKeysSet, DefaultLogFlushBatchSize)

0 commit comments

Comments
 (0)