Skip to content

Commit 0d565f1

Browse files
committed
Update.
1 parent 1678dbb commit 0d565f1

File tree

1 file changed

+124
-134
lines changed

1 file changed

+124
-134
lines changed

service/rpc/server.go

Lines changed: 124 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ var (
108108
// goroutine清理控制变量
109109
lastGoroutineCleanupTime time.Time
110110
lastCleanupMutex sync.Mutex
111+
112+
// 周期流量检测节流:避免在高频上报路径中频繁获取 AlertsLock
113+
cycleCheckMu sync.Mutex
114+
lastCycleCheck = make(map[uint64]time.Time)
111115
)
112116

113117
type ServerHandler struct {
@@ -784,9 +788,9 @@ func (s *ServerHandler) ReportSystemInfo(c context.Context, r *pb.Host) (*pb.Rec
784788
oldBootTime := singleton.ServerList[clientID].Host.BootTime
785789
bootTimeDiff := host.BootTime - oldBootTime
786790

787-
// 只有在 BootTime 显著增加(超过1小时)或减少时才认为是重启
788-
// 这样可以避免频繁重启或时间同步问题导致的误判
789-
if bootTimeDiff > 3600 || bootTimeDiff < 0 {
791+
// 只有在 BootTime 显著增加(超过1小时)或出现回退时才认为是重启
792+
// 注意:bootTimeDiff 为无符号,不能与 0 比较,回退用 host.BootTime < oldBootTime 判断
793+
if bootTimeDiff > 3600 || host.BootTime < oldBootTime {
790794
// 真正的重启:保持累计流量不变,只重置上次记录点
791795
singleton.ServerList[clientID].PrevTransferInSnapshot = 0
792796
singleton.ServerList[clientID].PrevTransferOutSnapshot = 0
@@ -916,173 +920,159 @@ func updateTrafficDisplay(serverID uint64, inTransfer, outTransfer uint64) {
916920
// checkAndResetCycleTraffic 检查并重置周期流量
917921
// 根据AlertRule中定义的transfer_all_cycle规则重置累计流量
918922
func checkAndResetCycleTraffic(clientID uint64) {
919-
// 读取规则使用读锁;仅在需要修改服务器状态时获取写锁,缩短锁定范围
920-
singleton.AlertsLock.RLock()
923+
// 节流:同一服务器30秒内只检查一次,避免高频上报导致锁竞争
924+
cycleCheckMu.Lock()
925+
if last, ok := lastCycleCheck[clientID]; ok {
926+
if time.Since(last) < 30*time.Second {
927+
cycleCheckMu.Unlock()
928+
return
929+
}
930+
}
931+
lastCycleCheck[clientID] = time.Now()
932+
cycleCheckMu.Unlock()
921933

922-
// 遍历所有启用的事件规则
934+
// 1) 快照读取 Alerts 与匹配的规则(读锁极短持有)
935+
singleton.AlertsLock.RLock()
936+
var matchingAlert *model.AlertRule
937+
var transferRule *model.Rule
923938
for _, alert := range singleton.Alerts {
924939
if !alert.Enabled() {
925940
continue
926941
}
927-
928-
// 检查规则是否包含此服务器
929-
shouldMonitorServer := false
930-
var transferRule *model.Rule
931-
932942
for i := range alert.Rules {
933943
rule := &alert.Rules[i]
934944
if !rule.IsTransferDurationRule() {
935945
continue
936946
}
937-
938-
// 检查规则覆盖范围
947+
// 覆盖范围匹配
939948
if rule.Cover == model.RuleCoverAll {
940-
// 监控全部服务器,但排除了此服务器
941949
if rule.Ignore[clientID] {
942950
continue
943951
}
944952
} else if rule.Cover == model.RuleCoverIgnoreAll {
945-
// 忽略全部服务器,但指定监控了此服务器
946953
if !rule.Ignore[clientID] {
947954
continue
948955
}
949956
}
950-
951-
shouldMonitorServer = true
957+
matchingAlert = alert
952958
transferRule = rule
953959
break
954960
}
955-
956-
if !shouldMonitorServer || transferRule == nil {
957-
continue
958-
}
959-
960-
// 获取当前周期的开始时间
961-
currentCycleStart := transferRule.GetTransferDurationStart()
962-
currentCycleEnd := transferRule.GetTransferDurationEnd()
963-
964-
// 检查周期是否已经发生变化(新周期开始)
965-
singleton.ServerLock.RLock()
966-
server := singleton.ServerList[clientID]
967-
singleton.ServerLock.RUnlock()
968-
lastResetTime := time.Time{}
969-
970-
// 从CycleTransferStats获取上次重置时间的记录
971-
if stats, exists := singleton.AlertsCycleTransferStatsStore[alert.ID]; exists {
972-
if nextUpdate, hasUpdate := stats.NextUpdate[clientID]; hasUpdate {
973-
// 使用NextUpdate时间作为参考,判断是否进入新周期
974-
if nextUpdate.Before(currentCycleStart) {
975-
lastResetTime = nextUpdate
976-
}
977-
}
961+
if matchingAlert != nil {
962+
break
978963
}
964+
}
979965

980-
// 检查是否需要重置:当前时间已经进入新周期,且之前没有在这个周期重置过
981-
needReset := false
982-
now := time.Now()
966+
// 若无匹配规则,尽早释放锁并返回
967+
if matchingAlert == nil || transferRule == nil {
968+
singleton.AlertsLock.RUnlock()
969+
return
970+
}
983971

984-
if lastResetTime.IsZero() {
985-
// 第一次运行,不需要重置,只记录时间
986-
// 首次检查周期流量,静默处理
987-
} else if now.After(currentCycleStart) && lastResetTime.Before(currentCycleStart) {
988-
// 当前时间已过周期开始时间,且上次重置在当前周期开始之前
989-
needReset = true
990-
}
972+
currentCycleStart := transferRule.GetTransferDurationStart()
973+
currentCycleEnd := transferRule.GetTransferDurationEnd()
991974

992-
if needReset {
993-
// 重置累计流量
994-
singleton.ServerLock.Lock()
995-
server = singleton.ServerList[clientID]
996-
if server == nil {
997-
singleton.ServerLock.Unlock()
998-
break
999-
}
1000-
oldInTransfer := server.CumulativeNetInTransfer
1001-
oldOutTransfer := server.CumulativeNetOutTransfer
1002-
1003-
server.CumulativeNetInTransfer = 0
1004-
server.CumulativeNetOutTransfer = 0
1005-
1006-
// 重置基准点
1007-
server.PrevTransferInSnapshot = 0
1008-
server.PrevTransferOutSnapshot = 0
1009-
singleton.ServerLock.Unlock()
1010-
1011-
// 周期流量重置完成,静默处理
1012-
1013-
// 立即保存到数据库
1014-
if singleton.Conf.DatabaseType == "badger" {
1015-
// 使用BadgerDB保存流量重置
1016-
if db.DB != nil {
1017-
serverOps := db.NewServerOps(db.DB)
1018-
if dbServer, err := serverOps.GetServer(clientID); err == nil && dbServer != nil {
1019-
dbServer.CumulativeNetInTransfer = 0
1020-
dbServer.CumulativeNetOutTransfer = 0
1021-
if err := serverOps.SaveServer(dbServer); err != nil {
1022-
log.Printf("保存服务器 %s 周期重置流量到BadgerDB失败: %v", server.Name, err)
1023-
}
1024-
}
1025-
}
1026-
} else {
1027-
// 使用SQLite保存流量重置
1028-
if singleton.DB != nil {
1029-
updateSQL := "UPDATE servers SET cumulative_net_in_transfer = ?, cumulative_net_out_transfer = ? WHERE id = ?"
1030-
if err := singleton.DB.Exec(updateSQL, 0, 0, clientID).Error; err != nil {
1031-
log.Printf("保存服务器 %s 周期重置流量到数据库失败: %v", server.Name, err)
1032-
}
1033-
}
975+
// 读取上次重置参考时间(仍在读锁下,随后立即释放)
976+
lastResetTime := time.Time{}
977+
if stats, exists := singleton.AlertsCycleTransferStatsStore[matchingAlert.ID]; exists && stats != nil {
978+
if nextUpdate, has := stats.NextUpdate[clientID]; has {
979+
if nextUpdate.Before(currentCycleStart) {
980+
lastResetTime = nextUpdate
1034981
}
982+
}
983+
}
984+
singleton.AlertsLock.RUnlock()
1035985

1036-
// 更新AlertsCycleTransferStatsStore中的重置时间记录
1037-
if stats, exists := singleton.AlertsCycleTransferStatsStore[alert.ID]; exists {
1038-
stats.NextUpdate[clientID] = now
1039-
stats.Transfer[clientID] = 0 // 重置显示的流量
986+
// 2) 判断是否需要重置(锁外计算)
987+
needReset := false
988+
now := time.Now()
989+
if !lastResetTime.IsZero() && now.After(currentCycleStart) && lastResetTime.Before(currentCycleStart) {
990+
needReset = true
991+
}
992+
if !needReset {
993+
return
994+
}
1040995

1041-
// 更新周期时间信息
1042-
stats.From = currentCycleStart
1043-
stats.To = currentCycleEnd
996+
// 3) 重置累计流量(写锁仅包裹修改内存状态)
997+
singleton.ServerLock.Lock()
998+
server := singleton.ServerList[clientID]
999+
if server == nil {
1000+
singleton.ServerLock.Unlock()
1001+
return
1002+
}
1003+
oldInTransfer := server.CumulativeNetInTransfer
1004+
oldOutTransfer := server.CumulativeNetOutTransfer
1005+
serverName := server.Name
1006+
serverIP := ""
1007+
if server.Host != nil {
1008+
serverIP = server.Host.IP
1009+
}
10441010

1045-
// 已更新AlertsCycleTransferStatsStore中的重置记录
1046-
}
1011+
server.CumulativeNetInTransfer = 0
1012+
server.CumulativeNetOutTransfer = 0
1013+
server.PrevTransferInSnapshot = 0
1014+
server.PrevTransferOutSnapshot = 0
1015+
singleton.ServerLock.Unlock()
10471016

1048-
// 发送流量重置通知
1049-
// 格式化流量为人性化显示
1050-
formatTraffic := func(bytes uint64) string {
1051-
const unit = 1024
1052-
if bytes < unit {
1053-
return fmt.Sprintf("%d B", bytes)
1054-
}
1055-
div, exp := uint64(unit), 0
1056-
for n := bytes / unit; n >= unit; n /= unit {
1057-
div *= unit
1058-
exp++
1059-
}
1060-
return fmt.Sprintf("%.2f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
1017+
// 4) 持久化到数据库(锁外)
1018+
if singleton.Conf.DatabaseType == "badger" {
1019+
if db.DB != nil {
1020+
serverOps := db.NewServerOps(db.DB)
1021+
if dbServer, err := serverOps.GetServer(clientID); err == nil && dbServer != nil {
1022+
dbServer.CumulativeNetInTransfer = 0
1023+
dbServer.CumulativeNetOutTransfer = 0
1024+
_ = serverOps.SaveServer(dbServer) // 静默处理错误,避免打扰热路径
10611025
}
1026+
}
1027+
} else {
1028+
if singleton.DB != nil {
1029+
_ = singleton.DB.Exec("UPDATE servers SET cumulative_net_in_transfer = 0, cumulative_net_out_transfer = 0 WHERE id = ?", clientID).Error
1030+
}
1031+
}
10621032

1063-
// 计算上个周期累计流量
1064-
totalOldTraffic := oldInTransfer + oldOutTransfer
1065-
1066-
resetMessage := fmt.Sprintf("流量重置通知\n服务器 %s [%s] 的周期流量已重置\n上个周期累计流量: %s (入站=%s, 出站=%s)\n新周期: %s 到 %s\n事件规则: %s",
1067-
server.Name,
1068-
singleton.IPDesensitize(server.Host.IP),
1069-
formatTraffic(totalOldTraffic),
1070-
formatTraffic(oldInTransfer),
1071-
formatTraffic(oldOutTransfer),
1072-
currentCycleStart.Format("2006-01-02 15:04:05"),
1073-
currentCycleEnd.Format("2006-01-02 15:04:05"),
1074-
alert.Name)
1075-
1076-
// 创建流量重置通知的静音标签,避免短时间内重复发送
1077-
resetMuteLabel := fmt.Sprintf("traffic-reset-%d-%d", alert.ID, clientID)
1078-
1079-
// 使用安全的通知发送方式,防止Goroutine泄漏
1080-
singleton.SafeSendNotification(alert.NotificationTag, resetMessage, &resetMuteLabel, server)
1033+
// 5) 更新周期统计存储(需要写锁)
1034+
singleton.AlertsLock.Lock()
1035+
if stats, exists := singleton.AlertsCycleTransferStatsStore[matchingAlert.ID]; exists && stats != nil {
1036+
if stats.NextUpdate == nil {
1037+
stats.NextUpdate = make(map[uint64]time.Time)
10811038
}
1039+
if stats.Transfer == nil {
1040+
stats.Transfer = make(map[uint64]uint64)
1041+
}
1042+
stats.NextUpdate[clientID] = now
1043+
stats.Transfer[clientID] = 0
1044+
stats.From = currentCycleStart
1045+
stats.To = currentCycleEnd
1046+
}
1047+
singleton.AlertsLock.Unlock()
10821048

1083-
// 只处理第一个匹配的规则
1084-
break
1049+
// 6) 发送通知(锁外)
1050+
formatTraffic := func(bytes uint64) string {
1051+
const unit = 1024
1052+
if bytes < unit {
1053+
return fmt.Sprintf("%d B", bytes)
1054+
}
1055+
div, exp := uint64(unit), 0
1056+
for n := bytes / unit; n >= unit; n /= unit {
1057+
div *= unit
1058+
exp++
1059+
}
1060+
return fmt.Sprintf("%.2f %cB", float64(bytes)/float64(div), "KMGTPE"[exp])
10851061
}
1062+
totalOldTraffic := oldInTransfer + oldOutTransfer
1063+
resetMessage := fmt.Sprintf(
1064+
"流量重置通知\n服务器 %s [%s] 的周期流量已重置\n上个周期累计流量: %s (入站=%s, 出站=%s)\n新周期: %s 到 %s\n事件规则: %s",
1065+
serverName,
1066+
singleton.IPDesensitize(serverIP),
1067+
formatTraffic(totalOldTraffic),
1068+
formatTraffic(oldInTransfer),
1069+
formatTraffic(oldOutTransfer),
1070+
currentCycleStart.Format("2006-01-02 15:04:05"),
1071+
currentCycleEnd.Format("2006-01-02 15:04:05"),
1072+
matchingAlert.Name,
1073+
)
1074+
resetMuteLabel := fmt.Sprintf("traffic-reset-%d-%d", matchingAlert.ID, clientID)
1075+
singleton.SafeSendNotification(matchingAlert.NotificationTag, resetMessage, &resetMuteLabel, nil)
10861076
}
10871077

10881078
// GetConnectionStats 获取连接统计信息

0 commit comments

Comments
 (0)