Skip to content

Commit 65ff38d

Browse files
committed
Update.
1 parent ba08880 commit 65ff38d

File tree

3 files changed

+152
-52
lines changed

3 files changed

+152
-52
lines changed

model/monitor_history.go

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,14 +8,19 @@ import (
88

99
// MonitorHistory 历史监控记录
1010
type MonitorHistory struct {
11-
ID uint64 `gorm:"primaryKey"`
11+
ID uint64 `gorm:"primaryKey;column:id;autoIncrement"`
1212
CreatedAt time.Time `gorm:"index;<-:create;index:idx_server_id_created_at_monitor_id_avg_delay"`
1313
UpdatedAt time.Time `gorm:"autoUpdateTime"`
1414
DeletedAt gorm.DeletedAt `gorm:"index"`
15-
MonitorID uint64 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay"`
16-
ServerID uint64 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay"`
17-
AvgDelay float32 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay"` // 平均延迟,毫秒
18-
Up uint64 // 检查状态良好计数
19-
Down uint64 // 检查状态异常计数
20-
Data string
15+
MonitorID uint64 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay;column:monitor_id"`
16+
ServerID uint64 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay;column:server_id"`
17+
AvgDelay float32 `gorm:"index:idx_server_id_created_at_monitor_id_avg_delay;column:avg_delay"` // 平均延迟,毫秒
18+
Up uint64 `gorm:"column:up"` // 检查状态良好计数
19+
Down uint64 `gorm:"column:down"` // 检查状态异常计数
20+
Data string `gorm:"column:data"`
21+
}
22+
23+
// TableName 显式指定表名,确保GORM不会自动添加@id字段
24+
func (MonitorHistory) TableName() string {
25+
return "monitor_histories"
2126
}

service/singleton/servicesentinel.go

Lines changed: 37 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -220,28 +220,28 @@ func (ss *ServiceSentinel) loadMonitorHistory() {
220220

221221
// 加载服务监控历史记录,优化查询性能
222222
var mhs []model.MonitorHistory
223-
223+
224224
// 添加查询优化和超时控制
225225
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
226226
defer cancel()
227-
227+
228228
startTime := time.Now()
229-
229+
230230
// 直接查询月度数据,系统启动时需要快速加载
231231
fromDate := today.AddDate(0, 0, -29)
232232
toDate := today
233-
233+
234234
err = DB.WithContext(ctx).
235235
Where("created_at > ? AND created_at < ?", fromDate, toDate).
236236
Order("created_at DESC").
237237
Find(&mhs).Error
238-
238+
239239
if err != nil {
240240
log.Printf("加载月度监控数据失败: %v", err)
241241
return
242242
}
243243
queryDuration := time.Since(startTime)
244-
244+
245245
if queryDuration > 500*time.Millisecond {
246246
log.Printf("慢SQL查询警告: 分批加载月度数据耗时 %v,返回 %d 条记录", queryDuration, len(mhs))
247247
} else {
@@ -376,7 +376,7 @@ func (ss *ServiceSentinel) worker() {
376376

377377
// 内存压力计数器
378378
memoryPressureCounter := 0
379-
379+
380380
// 添加内存监控
381381
memoryCheckTicker := time.NewTicker(1 * time.Minute) // 每分钟检查内存
382382
defer memoryCheckTicker.Stop()
@@ -400,12 +400,12 @@ func (ss *ServiceSentinel) worker() {
400400
var m runtime.MemStats
401401
runtime.ReadMemStats(&m)
402402
currentMemMB := m.Alloc / 1024 / 1024
403-
403+
404404
if currentMemMB > 500 { // 如果内存超过500MB(从300MB提高)
405405
log.Printf("ServiceSentinel检测到高内存使用: %dMB,执行清理", currentMemMB)
406406
ss.limitDataSize()
407407
ss.cleanupOldData()
408-
408+
409409
// 如果内存仍然很高,强制GC
410410
if currentMemMB > 800 { // 从400MB提高到800MB
411411
runtime.GC()
@@ -441,7 +441,7 @@ func (ss *ServiceSentinel) handleServiceReport(r ReportData) {
441441
}
442442

443443
mh := r.Data
444-
444+
445445
// 添加边界检查,防止panic
446446
if mh.GetId() == 0 {
447447
log.Printf("NG>> 无效的监控ID: %+v", r)
@@ -464,15 +464,15 @@ func (ss *ServiceSentinel) handleServiceReport(r ReportData) {
464464
ts.ping = float32(Conf.MaxTCPPingValue)
465465
}
466466
ts.count = 0
467-
467+
468468
// 使用异步数据库插入队列来保存监控数据,避免并发冲突
469469
monitorData := map[string]interface{}{
470470
"monitor_id": mh.GetId(),
471471
"avg_delay": ts.ping,
472472
"data": mh.Data,
473473
"server_id": r.Reporter,
474474
}
475-
475+
476476
// 使用异步插入避免数据库锁冲突
477477
AsyncDBInsert("monitor_histories", monitorData, func(err error) {
478478
if err != nil {
@@ -518,19 +518,19 @@ func (ss *ServiceSentinel) handleServiceReport(r ReportData) {
518518
ss.serviceCurrentStatusData[mh.GetId()] = make([]*pb.TaskResult, _CurrentStatusSize)
519519
}
520520

521-
// 边界检查:确保索引不会超出当前数组的实际长度
522-
currentArrayLength := len(ss.serviceCurrentStatusData[mh.GetId()])
523-
if ss.serviceCurrentStatusIndex[mh.GetId()].index >= currentArrayLength {
524-
ss.serviceCurrentStatusIndex[mh.GetId()].index = 0
525-
}
521+
// 边界检查:确保索引不会超出当前数组的实际长度
522+
currentArrayLength := len(ss.serviceCurrentStatusData[mh.GetId()])
523+
if ss.serviceCurrentStatusIndex[mh.GetId()].index >= currentArrayLength {
524+
ss.serviceCurrentStatusIndex[mh.GetId()].index = 0
525+
}
526526

527-
ss.serviceCurrentStatusData[mh.GetId()][ss.serviceCurrentStatusIndex[mh.GetId()].index] = mh
528-
ss.serviceCurrentStatusIndex[mh.GetId()].index++
529-
530-
// 立即检查并重置index以防止越界,使用实际数组长度
531-
if ss.serviceCurrentStatusIndex[mh.GetId()].index >= currentArrayLength {
532-
ss.serviceCurrentStatusIndex[mh.GetId()].index = 0
533-
}
527+
ss.serviceCurrentStatusData[mh.GetId()][ss.serviceCurrentStatusIndex[mh.GetId()].index] = mh
528+
ss.serviceCurrentStatusIndex[mh.GetId()].index++
529+
530+
// 立即检查并重置index以防止越界,使用实际数组长度
531+
if ss.serviceCurrentStatusIndex[mh.GetId()].index >= currentArrayLength {
532+
ss.serviceCurrentStatusIndex[mh.GetId()].index = 0
533+
}
534534
}
535535

536536
// 更新当前状态
@@ -566,20 +566,20 @@ func (ss *ServiceSentinel) handleServiceReport(r ReportData) {
566566
// 改为基于时间间隔的保存策略,而不是依赖不可靠的计数器
567567
now := time.Now()
568568
shouldSave := false
569-
569+
570570
// 检查是否需要保存数据
571571
if ss.serviceCurrentStatusIndex[mh.GetId()].lastSaveTime.IsZero() {
572572
// 首次保存
573573
shouldSave = true
574574
} else if now.Sub(ss.serviceCurrentStatusIndex[mh.GetId()].lastSaveTime) >= 15*time.Minute {
575575
// 超过15分钟未保存
576576
shouldSave = true
577-
} else if ss.serviceCurrentStatusIndex[mh.GetId()].index%_CurrentStatusSize == 0 &&
578-
ss.serviceCurrentStatusIndex[mh.GetId()].index > 0 {
577+
} else if ss.serviceCurrentStatusIndex[mh.GetId()].index%_CurrentStatusSize == 0 &&
578+
ss.serviceCurrentStatusIndex[mh.GetId()].index > 0 {
579579
// 当计数器完成一个周期时也保存
580580
shouldSave = true
581581
}
582-
582+
583583
if shouldSave {
584584
// 确保有数据才保存
585585
totalChecks := ss.serviceResponseDataStoreCurrentUp[mh.GetId()] + ss.serviceResponseDataStoreCurrentDown[mh.GetId()]
@@ -592,16 +592,16 @@ func (ss *ServiceSentinel) handleServiceReport(r ReportData) {
592592
"up": ss.serviceResponseDataStoreCurrentUp[mh.GetId()],
593593
"down": ss.serviceResponseDataStoreCurrentDown[mh.GetId()],
594594
}
595-
596-
// 使用异步插入避免数据库锁冲突
597-
AsyncDBInsert("monitor_histories", monitorData, func(err error) {
595+
596+
// 直接使用监控历史记录专用队列,避免@id字段错误
597+
AsyncMonitorHistoryInsert(monitorData, func(err error) {
598598
if err != nil {
599599
log.Printf("NG>> 服务监控数据持久化失败 (MonitorID: %d): %v", mh.GetId(), err)
600600
} else {
601601
ss.serviceCurrentStatusIndex[mh.GetId()].lastSaveTime = now
602-
log.Printf("监控数据已保存 (MonitorID: %d, Up: %d, Down: %d)",
603-
mh.GetId(),
604-
ss.serviceResponseDataStoreCurrentUp[mh.GetId()],
602+
log.Printf("监控数据已保存 (MonitorID: %d, Up: %d, Down: %d)",
603+
mh.GetId(),
604+
ss.serviceResponseDataStoreCurrentUp[mh.GetId()],
605605
ss.serviceResponseDataStoreCurrentDown[mh.GetId()])
606606
}
607607
})
@@ -853,10 +853,10 @@ func (ss *ServiceSentinel) limitDataSize() {
853853
if len(statusData) > maxStatusRecords {
854854
// 只保留最新的记录
855855
ss.serviceCurrentStatusData[monitorID] = statusData[len(statusData)-maxStatusRecords:]
856-
856+
857857
// 重要:当数组被缩减时,必须重置索引以防止越界
858-
if ss.serviceCurrentStatusIndex[monitorID] != nil &&
859-
ss.serviceCurrentStatusIndex[monitorID].index >= maxStatusRecords {
858+
if ss.serviceCurrentStatusIndex[monitorID] != nil &&
859+
ss.serviceCurrentStatusIndex[monitorID].index >= maxStatusRecords {
860860
ss.serviceCurrentStatusIndex[monitorID].index = 0
861861
}
862862
}

service/singleton/singleton.go

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1471,22 +1471,117 @@ func StartMonitorHistoryWorker() {
14711471
delete(req.Data, "@id")
14721472
}
14731473

1474+
// 增加更多字段检查和验证
1475+
var monitorID uint64
1476+
if mid, ok := req.Data["monitor_id"]; ok {
1477+
if midVal, ok := mid.(uint64); ok {
1478+
monitorID = midVal
1479+
}
1480+
}
1481+
14741482
// 简单地重试几次,然后放弃
14751483
var err error
1476-
for retry := 0; retry < 3; retry++ {
1477-
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
1478-
err = DB.WithContext(ctx).Table("monitor_histories").Create(req.Data).Error
1479-
cancel()
1484+
for retry := 0; retry < 5; retry++ { // 增加重试次数
1485+
func() {
1486+
// 使用函数包装以确保cancel被调用
1487+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
1488+
defer cancel()
1489+
1490+
// 不使用事务,直接执行插入
1491+
sqlStr := "INSERT INTO monitor_histories (monitor_id, server_id, avg_delay, data, up, down, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)"
1492+
1493+
// 准备参数
1494+
now := time.Now()
1495+
var serverID uint64
1496+
var avgDelay float32
1497+
var data string
1498+
var up, down uint64
1499+
1500+
if v, ok := req.Data["monitor_id"]; ok {
1501+
if val, ok := v.(uint64); ok {
1502+
monitorID = val
1503+
}
1504+
}
1505+
if v, ok := req.Data["server_id"]; ok {
1506+
if val, ok := v.(uint64); ok {
1507+
serverID = val
1508+
}
1509+
}
1510+
if v, ok := req.Data["avg_delay"]; ok {
1511+
if val, ok := v.(float32); ok {
1512+
avgDelay = val
1513+
} else if val, ok := v.(float64); ok {
1514+
avgDelay = float32(val)
1515+
}
1516+
}
1517+
if v, ok := req.Data["data"]; ok {
1518+
if val, ok := v.(string); ok {
1519+
data = val
1520+
}
1521+
}
1522+
if v, ok := req.Data["up"]; ok {
1523+
if val, ok := v.(uint64); ok {
1524+
up = val
1525+
} else if val, ok := v.(int); ok {
1526+
up = uint64(val)
1527+
}
1528+
}
1529+
if v, ok := req.Data["down"]; ok {
1530+
if val, ok := v.(uint64); ok {
1531+
down = val
1532+
} else if val, ok := v.(int); ok {
1533+
down = uint64(val)
1534+
}
1535+
}
1536+
1537+
// 直接执行SQL,避免GORM的自动化处理
1538+
err = DB.WithContext(ctx).Exec(sqlStr, monitorID, serverID, avgDelay, data, up, down, now, now).Error
1539+
}()
14801540

14811541
if err == nil {
14821542
break
14831543
}
14841544

1485-
if retry < 2 {
1486-
// 使用简单的延迟策略,避免过度复杂的重试逻辑
1487-
delay := time.Duration(50*(retry+1)) * time.Millisecond
1488-
time.Sleep(delay)
1545+
// 只对特定错误进行重试
1546+
if strings.Contains(err.Error(), "database is locked") ||
1547+
strings.Contains(err.Error(), "SQL statements in progress") ||
1548+
strings.Contains(err.Error(), "cannot commit transaction") {
1549+
if retry < 4 {
1550+
// 使用简单的延迟策略,避免过度复杂的重试逻辑
1551+
delay := time.Duration(100*(retry+1)) * time.Millisecond
1552+
time.Sleep(delay)
1553+
log.Printf("监控历史记录插入失败 (MonitorID: %d),重试 %d/5: %v", monitorID, retry+1, err)
1554+
continue
1555+
}
1556+
} else if strings.Contains(err.Error(), "no column named @id") {
1557+
// 如果是@id列错误,记录日志但不重试
1558+
log.Printf("监控历史记录插入遇到@id列错误 (MonitorID: %d): %v", monitorID, err)
1559+
// 尝试直接使用Map创建,避免使用@id
1560+
if retry < 4 {
1561+
// 清理数据
1562+
cleanData := make(map[string]interface{})
1563+
for k, v := range req.Data {
1564+
if k != "@id" {
1565+
cleanData[k] = v
1566+
}
1567+
}
1568+
1569+
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
1570+
err = DB.WithContext(ctx).Table("monitor_histories").Create(cleanData).Error
1571+
cancel()
1572+
1573+
if err == nil {
1574+
break
1575+
}
1576+
1577+
delay := time.Duration(100*(retry+1)) * time.Millisecond
1578+
time.Sleep(delay)
1579+
continue
1580+
}
14891581
}
1582+
1583+
// 其他错误不重试
1584+
break
14901585
}
14911586

14921587
if req.Callback != nil {

0 commit comments

Comments
 (0)