Skip to content

Commit 1678dbb

Browse files
committed
Update.
1 parent ec91438 commit 1678dbb

File tree

5 files changed

+79
-66
lines changed

5 files changed

+79
-66
lines changed

cmd/dashboard/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"log"
7+
"net"
78
"os"
89
"path/filepath"
910
"strings"
@@ -79,6 +80,10 @@ func main() {
7980
os.Exit(0)
8081
}
8182

83+
// 避免 cgo getaddrinfo 在某些环境导致的崩溃,强制使用 Go 纯解析器
84+
// 参见崩溃栈: net._C2func_getaddrinfo -> net.cgoLookupHostIP
85+
net.DefaultResolver = &net.Resolver{PreferGo: true}
86+
8287
// 初始化 dao 包
8388
singleton.InitConfigFromPath(dashboardCliParam.ConfigFile)
8489
singleton.InitTimezoneAndCache()

db/model_ops.go

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -182,37 +182,50 @@ func (o *MonitorHistoryOps) GetMonitorHistoriesByMonitorID(monitorID uint64, sta
182182

183183
// GetAllMonitorHistoriesInRange gets all monitor histories within a time range
184184
func (o *MonitorHistoryOps) GetAllMonitorHistoriesInRange(startTime, endTime time.Time) ([]*model.MonitorHistory, error) {
185-
prefix := "monitor_history:"
185+
// 使用时间范围的最小/最大键来加速扫描,避免全表遍历
186+
// 由于键格式为 monitor_history:{monitorID}:{ts},这里退化为从最小可能monitorID开始到最大monitorID结束
187+
// 我们仍旧按前缀 monitor_history: 扫描,但利用开始和结束时间提前终止
188+
prefix := []byte("monitor_history:")
189+
_ = []byte(fmt.Sprintf(":%d", startTime.UnixNano()))
190+
_ = []byte(fmt.Sprintf(":%d", endTime.UnixNano()))
186191

187192
var histories []*model.MonitorHistory
188193
err := o.db.db.View(func(txn *badger.Txn) error {
189194
opts := badger.DefaultIteratorOptions
190195
opts.PrefetchValues = true
196+
opts.PrefetchSize = 64
191197
it := txn.NewIterator(opts)
192198
defer it.Close()
193199

194-
for it.Seek([]byte(prefix)); it.Valid(); it.Next() {
200+
// 从前缀起点Seek
201+
for it.Seek(prefix); it.Valid(); it.Next() {
195202
item := it.Item()
196-
key := item.Key()
203+
key := item.KeyCopy(nil)
197204

198-
// 检查key是否以prefix开头
199-
if !bytes.HasPrefix(key, []byte(prefix)) {
205+
if !bytes.HasPrefix(key, prefix) {
200206
break
201207
}
202208

203-
err := item.Value(func(val []byte) error {
209+
// 基于最后一个冒号后的时间戳进行范围判断
210+
idx := bytes.LastIndexByte(key, ':')
211+
if idx == -1 {
212+
continue
213+
}
214+
// tsBytes := key[idx:]
215+
// 快速比较:若 tsBytes > endSuffix 则可以继续,但仅对同一monitor前缀严格
216+
// 这里保守起见依旧读取值再过滤,避免边界错误
217+
218+
if err := item.Value(func(val []byte) error {
204219
var history model.MonitorHistory
205220
if err := jsonAPI.Unmarshal(val, &history); err != nil {
206221
return err
207222
}
208-
209-
// 检查时间范围
210-
if history.CreatedAt.After(startTime) && history.CreatedAt.Before(endTime) {
211-
histories = append(histories, &history)
223+
if !history.CreatedAt.Before(endTime) || !history.CreatedAt.After(startTime) {
224+
return nil
212225
}
226+
histories = append(histories, &history)
213227
return nil
214-
})
215-
if err != nil {
228+
}); err != nil {
216229
return err
217230
}
218231
}

service/rpc/auth.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,6 +195,10 @@ func (a *authHandler) Check(ctx context.Context) (uint64, error) {
195195
clientSecret = value[0]
196196
}
197197

198+
if clientSecret == "" {
199+
return 0, status.Errorf(codes.Unauthenticated, "客户端认证失败")
200+
}
201+
198202
// 先尝试从缓存获取
199203
if clientID, found := globalAuthCache.get(clientSecret); found {
200204
return clientID, nil

service/rpc/server.go

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -157,13 +157,13 @@ func (s *ServerHandler) ReportTask(c context.Context, r *pb.TaskResult) (*pb.Rec
157157

158158
if cr.PushSuccessful && r.GetSuccessful() {
159159
message := fmt.Sprintf("[%s]\n任务名称: %s\n执行设备: %s (ID:%d)\n开始时间: %s\n结束时间: %s\n执行结果: 成功",
160-
singleton.Localizer.MustLocalize(&i18n.LocalizeConfig{
161-
MessageID: "ScheduledTaskExecutedSuccessfully",
162-
}),
163-
cr.Name,
164-
singleton.ServerList[clientID].Name, clientID,
165-
startTime.Format("2006-01-02 15:04:05"),
166-
endTime.Format("2006-01-02 15:04:05"))
160+
singleton.Localizer.MustLocalize(&i18n.LocalizeConfig{
161+
MessageID: "ScheduledTaskExecutedSuccessfully",
162+
}),
163+
cr.Name,
164+
singleton.ServerList[clientID].Name, clientID,
165+
startTime.Format("2006-01-02 15:04:05"),
166+
endTime.Format("2006-01-02 15:04:05"))
167167

168168
singleton.SafeSendNotification(cr.NotificationTag, message, nil, &curServer)
169169
}
@@ -459,17 +459,17 @@ func (s *ServerHandler) processServerStateWithoutLock(clientID uint64, serverCop
459459

460460
// 准备数据库更新数据
461461
dbUpdates := make(map[string]interface{})
462-
462+
463463
// 第一阶段:读取必要数据(短暂持读锁)
464464
var needDBQuery bool
465465
var dbCumulativeIn, dbCumulativeOut uint64
466-
466+
467467
singleton.ServerLock.RLock()
468468
if server, exists := singleton.ServerList[clientID]; exists {
469469
needDBQuery = isFirstReport && (server.CumulativeNetInTransfer == 0 && server.CumulativeNetOutTransfer == 0)
470470
}
471471
singleton.ServerLock.RUnlock()
472-
472+
473473
// 第二阶段:数据库查询(在锁外执行)
474474
if needDBQuery && singleton.DB != nil {
475475
var dbServer model.Server
@@ -698,18 +698,15 @@ func (s *ServerHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rec
698698
return nil, err
699699
}
700700
host := model.PB2Host(r)
701-
singleton.ServerLock.RLock()
702-
defer singleton.ServerLock.RUnlock()
703-
704-
// 检查并更新DDNS - 修复:添加锁保护
701+
// 只在需要时短暂读取当前服务器的只读快照,避免重复加锁
705702
singleton.ServerLock.RLock()
706703
server := singleton.ServerList[clientID]
707704
if server == nil {
708705
singleton.ServerLock.RUnlock()
709706
return &pb.Receipt{Proced: true}, nil
710707
}
711708
enableDDNS := server.EnableDDNS
712-
ddnsProfiles := server.DDNSProfiles
709+
ddnsProfiles := append([]uint64(nil), server.DDNSProfiles...)
713710
serverName := server.Name
714711
var serverHostIP string
715712
if server.Host != nil {
@@ -823,7 +820,7 @@ func (s *ServerHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rec
823820
host.CountryCode = singleton.ServerList[clientID].Host.CountryCode
824821
}
825822

826-
// 保存完整Host信息到数据库,用于重启后恢复
823+
// 保存完整Host信息到数据库,用于重启后恢复(在锁外序列化)
827824
hostJSON, err := utils.Json.Marshal(host)
828825
if err == nil {
829826
// 根据数据库类型选择不同的保存方式
@@ -849,7 +846,12 @@ func (s *ServerHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rec
849846
}
850847
}
851848

852-
singleton.ServerList[clientID].Host = &host
849+
// 最后一次快速写入 Host 指针
850+
singleton.ServerLock.Lock()
851+
if s := singleton.ServerList[clientID]; s != nil {
852+
s.Host = &host
853+
}
854+
singleton.ServerLock.Unlock()
853855
return &pb.Receipt{Proced: true}, nil
854856
}
855857

@@ -914,12 +916,8 @@ func updateTrafficDisplay(serverID uint64, inTransfer, outTransfer uint64) {
914916
// checkAndResetCycleTraffic 检查并重置周期流量
915917
// 根据AlertRule中定义的transfer_all_cycle规则重置累计流量
916918
func checkAndResetCycleTraffic(clientID uint64) {
917-
// 紧急修复:同时锁定AlertsLock和ServerLock,防止并发读写冲突
919+
// 读取规则使用读锁;仅在需要修改服务器状态时获取写锁,缩短锁定范围
918920
singleton.AlertsLock.RLock()
919-
defer singleton.AlertsLock.RUnlock()
920-
921-
singleton.ServerLock.RLock()
922-
defer singleton.ServerLock.RUnlock()
923921

924922
// 遍历所有启用的事件规则
925923
for _, alert := range singleton.Alerts {
@@ -964,7 +962,9 @@ func checkAndResetCycleTraffic(clientID uint64) {
964962
currentCycleEnd := transferRule.GetTransferDurationEnd()
965963

966964
// 检查周期是否已经发生变化(新周期开始)
965+
singleton.ServerLock.RLock()
967966
server := singleton.ServerList[clientID]
967+
singleton.ServerLock.RUnlock()
968968
lastResetTime := time.Time{}
969969

970970
// 从CycleTransferStats获取上次重置时间的记录
@@ -991,6 +991,12 @@ func checkAndResetCycleTraffic(clientID uint64) {
991991

992992
if needReset {
993993
// 重置累计流量
994+
singleton.ServerLock.Lock()
995+
server = singleton.ServerList[clientID]
996+
if server == nil {
997+
singleton.ServerLock.Unlock()
998+
break
999+
}
9941000
oldInTransfer := server.CumulativeNetInTransfer
9951001
oldOutTransfer := server.CumulativeNetOutTransfer
9961002

@@ -1000,6 +1006,7 @@ func checkAndResetCycleTraffic(clientID uint64) {
10001006
// 重置基准点
10011007
server.PrevTransferInSnapshot = 0
10021008
server.PrevTransferOutSnapshot = 0
1009+
singleton.ServerLock.Unlock()
10031010

10041011
// 周期流量重置完成,静默处理
10051012

service/singleton/singleton.go

Lines changed: 16 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -281,7 +281,7 @@ func LoadSingleton() {
281281

282282
// 加载定时任务
283283
loadCronTasks()
284-
284+
285285
// BadgerDB模式下需要手动初始化Cron任务的Servers字段
286286
if Conf.DatabaseType == "badger" {
287287
initCronServersForBadgerDB()
@@ -977,6 +977,8 @@ func CheckServerOnlineStatus() {
977977

978978
// 优化:使用读锁进行初步数据收集,减少写锁持有时间
979979
var serverSnapshots []*model.Server
980+
// 记录需要清理的 TaskClose 指针,避免在写锁内等待其它互斥锁
981+
taskCloseMap := make(map[uint64]chan error)
980982

981983
// 第一阶段:使用读锁收集服务器快照
982984
ServerLock.RLock()
@@ -1004,6 +1006,11 @@ func CheckServerOnlineStatus() {
10041006
snapshot.LastStateBeforeOffline = server.LastStateBeforeOffline
10051007
}
10061008
serverSnapshots = append(serverSnapshots, snapshot)
1009+
1010+
// 记录当前的 TaskClose 指针(只读获取,不在此处修改),用于锁外关闭
1011+
if server.TaskClose != nil {
1012+
taskCloseMap[server.ID] = server.TaskClose
1013+
}
10071014
}
10081015
}
10091016
ServerLock.RUnlock()
@@ -1066,44 +1073,21 @@ func CheckServerOnlineStatus() {
10661073
}
10671074

10681075
// 第三阶段:快速写锁批量更新内存状态
1076+
// 第三阶段:快速写锁批量更新内存状态(严格限制锁内工作量,不做阻塞等待)
10691077
ServerLock.Lock()
10701078
for i, snapshot := range serverSnapshots {
10711079
if server := ServerList[snapshot.ID]; server != nil && server.IsOnline {
10721080
// 快速更新关键状态
10731081
server.IsOnline = false
10741082

1075-
// 快速处理任务关闭(如果需要)
1076-
if server.TaskCloseLock != nil {
1077-
// 使用非阻塞方式获取TaskCloseLock,设置更短的超时
1078-
done := make(chan bool, 1)
1079-
go func() {
1080-
defer func() {
1081-
// 确保channel总是会被写入,避免goroutine泄漏
1082-
select {
1083-
case done <- true:
1084-
default:
1085-
}
1086-
}()
1087-
1088-
server.TaskCloseLock.Lock()
1089-
if server.TaskClose != nil {
1090-
// 将TaskClose保存到对应的update记录
1091-
if i < len(updates) {
1092-
updates[i].TaskClose = server.TaskClose
1093-
}
1094-
server.TaskClose = nil
1095-
}
1096-
server.TaskStream = nil
1097-
server.TaskCloseLock.Unlock()
1098-
}()
1099-
1100-
// 如果无法快速获取锁,跳过任务清理(避免阻塞)
1101-
select {
1102-
case <-done:
1103-
case <-time.After(10 * time.Millisecond):
1104-
// 超时跳过,避免长时间持有ServerLock
1083+
// 不在写锁内争用 TaskCloseLock,直接清除引用,实际关闭放到锁外完成
1084+
if i < len(updates) {
1085+
if ch, ok := taskCloseMap[snapshot.ID]; ok {
1086+
updates[i].TaskClose = ch
11051087
}
11061088
}
1089+
server.TaskStream = nil
1090+
server.TaskClose = nil
11071091

11081092
// 更新LastStateBeforeOffline(如果还没有)
11091093
if server.LastStateBeforeOffline == nil && i < len(updates) && updates[i].LastStateJSON != "" {
@@ -1143,7 +1127,7 @@ func CheckServerOnlineStatus() {
11431127
ServerLock.Unlock()
11441128
}
11451129

1146-
// 第二阶段:异步执行数据库操作(不持锁)
1130+
// 第二阶段:异步执行数据库操作与连接清理(不持锁)
11471131
if len(updates) > 0 {
11481132
go func() {
11491133
for _, update := range updates {

0 commit comments

Comments
 (0)