Skip to content

Commit 9eeccb5

Browse files
committed
[功能] 添加数据库观测数据清理功能,支持手动和自动清理策略
1 parent a1b3204 commit 9eeccb5

File tree

16 files changed

+795
-18
lines changed

16 files changed

+795
-18
lines changed

docs/app-config.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,12 +57,19 @@ go run . --port 3000 --log-dir ./logs
5757
| `NodeOfflineThreshold` | 节点离线阈值(毫秒) | `120000` |
5858
| `AgentUpdateRepo` | Agent 自更新仓库 | `Rain-kl/OpenFlare` |
5959
| `GeoIPProvider` | 节点/IP 归属解析方式 | `ipinfo` |
60+
| `DatabaseAutoCleanupEnabled` | 是否启用每日自动清理观测数据 | `false` |
61+
| `DatabaseAutoCleanupRetentionDays` | 自动清理保留天数(至少 1 天) | `30` |
6062
| `GlobalApiRateLimitNum` / `GlobalApiRateLimitDuration` | 全局 API 限流次数 / 时间窗口 | `300` / `180` |
6163
| `GlobalWebRateLimitNum` / `GlobalWebRateLimitDuration` | 全局 Web 限流次数 / 时间窗口 | `300` / `180` |
6264
| `UploadRateLimitNum` / `UploadRateLimitDuration` | 上传接口限流次数 / 时间窗口 | `50` / `60` |
6365
| `DownloadRateLimitNum` / `DownloadRateLimitDuration` | 下载接口限流次数 / 时间窗口 | `50` / `60` |
6466
| `CriticalRateLimitNum` / `CriticalRateLimitDuration` | 敏感接口限流次数 / 时间窗口 | `100` / `1200` |
6567

68+
说明:
69+
70+
* `DatabaseAutoCleanupEnabled` 开启后,Server 会在每天凌晨 3 点自动清理 `node_access_logs``node_metric_snapshots``node_request_reports` 三类观测数据
71+
* `DatabaseAutoCleanupRetentionDays` 为统一保留天数,必须大于等于 1;管理端支持手动清理时留空保留天数,以直接删除对应数据集的全部历史记录
72+
6673
### 1.4 OpenResty 参数
6774

6875
OpenResty 性能参数与缓存参数继续统一保存在 `Option` 表。当前常用项包括:

openflare_server/common/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ var NodeOfflineThreshold = 2 * time.Minute
5555
var AgentHeartbeatInterval = 10000 // milliseconds
5656
var AgentUpdateRepo = "Rain-kl/OpenFlare"
5757
var GeoIPProvider = "ipinfo"
58+
var DatabaseAutoCleanupEnabled = false
59+
var DatabaseAutoCleanupRetentionDays = 30
5860

5961
// V5 OpenResty performance settings (hot-reloadable via Option table)
6062
var OpenRestyWorkerProcesses = "auto"
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package controller
2+
3+
import (
4+
"net/http"
5+
"openflare/service"
6+
7+
"github.com/gin-gonic/gin"
8+
)
9+
10+
// CleanupDatabaseObservability godoc
11+
// @Summary Cleanup observability tables
12+
// @Tags Options
13+
// @Accept json
14+
// @Produce json
15+
// @Security BearerAuth
16+
// @Success 200 {object} map[string]interface{}
17+
// @Router /api/option/database/cleanup [post]
18+
func CleanupDatabaseObservability(c *gin.Context) {
19+
var input service.DatabaseCleanupInput
20+
if err := decodeOptionalJSONBody(c.Request.Body, &input); err != nil {
21+
c.JSON(http.StatusBadRequest, gin.H{
22+
"success": false,
23+
"message": "参数错误",
24+
"error": err.Error(),
25+
})
26+
return
27+
}
28+
result, err := service.CleanupDatabaseObservability(input)
29+
if err != nil {
30+
respondFailure(c, err.Error())
31+
return
32+
}
33+
respondSuccess(c, result)
34+
}

openflare_server/controller/option.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,21 @@ func validateGeoIPOption(key string, value string) error {
7373
return nil
7474
}
7575

76+
func validateDatabaseCleanupOption(key string, value string) error {
77+
switch key {
78+
case "DatabaseAutoCleanupEnabled":
79+
return validateBooleanOption(key, value)
80+
case "DatabaseAutoCleanupRetentionDays":
81+
intValue, err := strconv.Atoi(value)
82+
if err != nil || intValue < 1 {
83+
return fmt.Errorf("%s 必须为大于等于 1 的整数天", key)
84+
}
85+
return nil
86+
default:
87+
return nil
88+
}
89+
}
90+
7691
func validateOpenRestyOption(key string, value string) error {
7792
trimmed := strings.TrimSpace(value)
7893

@@ -271,6 +286,13 @@ func UpdateOption(c *gin.Context) {
271286
})
272287
return
273288
}
289+
if err = validateDatabaseCleanupOption(option.Key, option.Value); err != nil {
290+
c.JSON(http.StatusOK, gin.H{
291+
"success": false,
292+
"message": err.Error(),
293+
})
294+
return
295+
}
274296
err = model.UpdateOption(option.Key, option.Value)
275297
if err != nil {
276298
c.JSON(http.StatusOK, gin.H{

openflare_server/main.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package main
22

33
import (
4+
"context"
45
"embed"
56
"fmt"
67
"github.com/gin-contrib/sessions"
@@ -13,6 +14,7 @@ import (
1314
"openflare/middleware"
1415
"openflare/model"
1516
"openflare/router"
17+
"openflare/service"
1618
"openflare/utils/geoip"
1719
"os"
1820
"strconv"
@@ -67,6 +69,9 @@ func main() {
6769
// Initialize options
6870
model.InitOptionMap()
6971
geoip.InitGeoIP()
72+
backgroundCtx, cancelBackgroundTasks := context.WithCancel(context.Background())
73+
defer cancelBackgroundTasks()
74+
service.StartDatabaseAutoCleanupScheduler(backgroundCtx)
7075

7176
// Initialize HTTP server
7277
server := gin.Default()

openflare_server/model/node_access_log.go

Lines changed: 10 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -240,14 +240,13 @@ func ListNodeAccessLogIPTrend(query NodeAccessLogIPTrendQuery) (items []*NodeAcc
240240
}
241241

242242
func DeleteNodeAccessLogsBefore(before time.Time) (deleted int64, err error) {
243-
for _, table := range observabilityShardTables("node_access_logs") {
244-
result := DB.Table(table).Where("logged_at < ?", before).Delete(&NodeAccessLog{})
245-
if result.Error != nil {
246-
return deleted, result.Error
247-
}
248-
deleted += result.RowsAffected
249-
}
250-
return deleted, nil
243+
return deleteAcrossShards(DB, "node_access_logs", &NodeAccessLog{}, func(tx *gorm.DB) *gorm.DB {
244+
return tx.Where("logged_at < ?", before)
245+
})
246+
}
247+
248+
func DeleteAllNodeAccessLogs(db *gorm.DB) (deleted int64, err error) {
249+
return deleteAcrossShards(db, "node_access_logs", &NodeAccessLog{}, nil)
251250
}
252251

253252
func NodeAccessLogExists(db *gorm.DB, record *NodeAccessLog) (bool, error) {
@@ -279,15 +278,9 @@ func NodeAccessLogExists(db *gorm.DB, record *NodeAccessLog) (bool, error) {
279278
}
280279

281280
func DeleteNodeAccessLogsByNodeBefore(db *gorm.DB, nodeID string, before time.Time) (deleted int64, err error) {
282-
db = normalizeShardedDB(db)
283-
for _, table := range observabilityShardTables("node_access_logs") {
284-
result := db.Table(table).Where("node_id = ? AND logged_at < ?", nodeID, before).Delete(&NodeAccessLog{})
285-
if result.Error != nil {
286-
return deleted, result.Error
287-
}
288-
deleted += result.RowsAffected
289-
}
290-
return deleted, nil
281+
return deleteAcrossShards(db, "node_access_logs", &NodeAccessLog{}, func(tx *gorm.DB) *gorm.DB {
282+
return tx.Where("node_id = ? AND logged_at < ?", nodeID, before)
283+
})
291284
}
292285

293286
func buildNodeAccessLogQuery(db *gorm.DB, query NodeAccessLogQuery) *gorm.DB {

openflare_server/model/node_metric_snapshot.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,3 +104,13 @@ func NodeMetricSnapshotExists(db *gorm.DB, nodeID string, capturedAt time.Time)
104104
}
105105
return false, nil
106106
}
107+
108+
func DeleteNodeMetricSnapshotsBefore(db *gorm.DB, before time.Time) (int64, error) {
109+
return deleteAcrossShards(db, "node_metric_snapshots", &NodeMetricSnapshot{}, func(tx *gorm.DB) *gorm.DB {
110+
return tx.Where("captured_at < ?", before)
111+
})
112+
}
113+
114+
func DeleteAllNodeMetricSnapshots(db *gorm.DB) (int64, error) {
115+
return deleteAcrossShards(db, "node_metric_snapshots", &NodeMetricSnapshot{}, nil)
116+
}

openflare_server/model/node_request_report.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,3 +99,13 @@ func NodeRequestReportExists(db *gorm.DB, nodeID string, windowStartedAt time.Ti
9999
}
100100
return false, nil
101101
}
102+
103+
func DeleteNodeRequestReportsBefore(db *gorm.DB, before time.Time) (int64, error) {
104+
return deleteAcrossShards(db, "node_request_reports", &NodeRequestReport{}, func(tx *gorm.DB) *gorm.DB {
105+
return tx.Where("window_ended_at < ?", before)
106+
})
107+
}
108+
109+
func DeleteAllNodeRequestReports(db *gorm.DB) (int64, error) {
110+
return deleteAcrossShards(db, "node_request_reports", &NodeRequestReport{}, nil)
111+
}

openflare_server/model/option.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ func InitOptionMap() {
5656
common.OptionMap["NodeOfflineThreshold"] = strconv.Itoa(int(common.NodeOfflineThreshold.Milliseconds()))
5757
common.OptionMap["AgentUpdateRepo"] = common.AgentUpdateRepo
5858
common.OptionMap["GeoIPProvider"] = common.GeoIPProvider
59+
common.OptionMap["DatabaseAutoCleanupEnabled"] = strconv.FormatBool(common.DatabaseAutoCleanupEnabled)
60+
common.OptionMap["DatabaseAutoCleanupRetentionDays"] = strconv.Itoa(common.DatabaseAutoCleanupRetentionDays)
5961
common.OptionMap["OpenRestyWorkerProcesses"] = common.OpenRestyWorkerProcesses
6062
common.OptionMap["OpenRestyWorkerConnections"] = strconv.Itoa(common.OpenRestyWorkerConnections)
6163
common.OptionMap["OpenRestyWorkerRlimitNofile"] = strconv.Itoa(common.OpenRestyWorkerRlimitNofile)
@@ -219,6 +221,12 @@ func updateOptionMap(key string, value string) {
219221
common.GeoIPProvider = value
220222
shouldRefreshGeoIP = true
221223
}
224+
case "DatabaseAutoCleanupEnabled":
225+
common.DatabaseAutoCleanupEnabled = value == "true"
226+
case "DatabaseAutoCleanupRetentionDays":
227+
if v, err := strconv.Atoi(value); err == nil && v >= 1 {
228+
common.DatabaseAutoCleanupRetentionDays = v
229+
}
222230
case "OpenRestyWorkerProcesses":
223231
if strings.TrimSpace(value) != "" {
224232
common.OpenRestyWorkerProcesses = value

openflare_server/model/sharding.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -205,6 +205,25 @@ func queryAcrossShardsWithDB[T any](db *gorm.DB, baseTable string, query func(tx
205205
return items, nil
206206
}
207207

208+
func deleteAcrossShards(db *gorm.DB, baseTable string, model any, apply func(tx *gorm.DB) *gorm.DB) (int64, error) {
209+
db = normalizeShardedDB(db)
210+
var deleted int64
211+
for _, table := range observabilityShardTables(baseTable) {
212+
tx := db.Table(table)
213+
if apply != nil {
214+
tx = apply(tx)
215+
} else {
216+
tx = tx.Session(&gorm.Session{AllowGlobalUpdate: true})
217+
}
218+
result := tx.Delete(model)
219+
if result.Error != nil {
220+
return deleted, result.Error
221+
}
222+
deleted += result.RowsAffected
223+
}
224+
return deleted, nil
225+
}
226+
208227
func sortShardRows[T any](items []T, less func(left T, right T) bool) {
209228
sort.Slice(items, func(i int, j int) bool {
210229
return less(items[i], items[j])

0 commit comments

Comments
 (0)