|
6 | 6 | "log" |
7 | 7 | "sort" |
8 | 8 | "strconv" |
9 | | - "strings" |
10 | 9 | "sync" |
11 | 10 | "time" |
12 | 11 |
|
@@ -373,47 +372,84 @@ func (o *MonitorHistoryOps) GetMonitorHistoriesByServerAndMonitorRangeReverseLim |
373 | 372 | // CleanupOldMonitorHistories removes monitor histories older than maxAge |
374 | 373 | func (o *MonitorHistoryOps) CleanupOldMonitorHistories(maxAge time.Duration) (int, error) { |
375 | 374 | cutoffNano := time.Now().Add(-maxAge).UnixNano() |
376 | | - keys, err := o.db.GetKeysWithPrefix("monitor_history:") |
377 | | - if err != nil { |
378 | | - return 0, err |
379 | | - } |
| 375 | + const scanBatchSize = 5000 |
| 376 | + const deleteBatchSize = 1000 |
| 377 | + prefix := []byte("monitor_history:") |
380 | 378 |
|
381 | | - keysToDelete := make([]string, 0, len(keys)/2) |
382 | | - for _, key := range keys { |
383 | | - idx := strings.LastIndexByte(key, ':') |
384 | | - if idx <= 0 || idx+1 >= len(key) { |
385 | | - continue |
386 | | - } |
| 379 | + deleted := 0 |
| 380 | + startKey := append([]byte{}, prefix...) |
| 381 | + |
| 382 | + for { |
| 383 | + keysToDelete := make([]string, 0, deleteBatchSize) |
| 384 | + hasMore := false |
| 385 | + nextStartKey := make([]byte, 0) |
| 386 | + |
| 387 | + o.db.rwMutex.RLock() |
| 388 | + err := o.db.db.View(func(txn *badger.Txn) error { |
| 389 | + opts := badger.DefaultIteratorOptions |
| 390 | + opts.PrefetchValues = false |
| 391 | + it := txn.NewIterator(opts) |
| 392 | + defer it.Close() |
| 393 | + |
| 394 | + scanned := 0 |
| 395 | + for it.Seek(startKey); it.ValidForPrefix(prefix); it.Next() { |
| 396 | + scanned++ |
| 397 | + |
| 398 | + key := it.Item().KeyCopy(nil) |
| 399 | + idx := bytes.LastIndexByte(key, ':') |
| 400 | + if idx > 0 && idx+1 < len(key) { |
| 401 | + ts, parseErr := strconv.ParseInt(string(key[idx+1:]), 10, 64) |
| 402 | + if parseErr == nil && ts <= cutoffNano { |
| 403 | + keysToDelete = append(keysToDelete, string(key)) |
| 404 | + } |
| 405 | + } |
387 | 406 |
|
388 | | - ts, parseErr := strconv.ParseInt(key[idx+1:], 10, 64) |
389 | | - if parseErr != nil { |
390 | | - continue |
391 | | - } |
| 407 | + if scanned >= scanBatchSize { |
| 408 | + nextStartKey = append([]byte{}, key...) |
| 409 | + nextStartKey = append(nextStartKey, 0) |
| 410 | + hasMore = true |
| 411 | + break |
| 412 | + } |
| 413 | + } |
392 | 414 |
|
393 | | - if ts <= cutoffNano { |
394 | | - keysToDelete = append(keysToDelete, key) |
| 415 | + return nil |
| 416 | + }) |
| 417 | + o.db.rwMutex.RUnlock() |
| 418 | + if err != nil { |
| 419 | + return deleted, err |
395 | 420 | } |
396 | | - } |
397 | 421 |
|
398 | | - if len(keysToDelete) == 0 { |
399 | | - return 0, nil |
400 | | - } |
| 422 | + for start := 0; start < len(keysToDelete); start += deleteBatchSize { |
| 423 | + end := start + deleteBatchSize |
| 424 | + if end > len(keysToDelete) { |
| 425 | + end = len(keysToDelete) |
| 426 | + } |
401 | 427 |
|
402 | | - // Batch delete in one write transaction to reduce lock and txn overhead. |
403 | | - o.db.rwMutex.Lock() |
404 | | - defer o.db.rwMutex.Unlock() |
405 | | - if err := o.db.db.Update(func(txn *badger.Txn) error { |
406 | | - for _, key := range keysToDelete { |
407 | | - if delErr := txn.Delete([]byte(key)); delErr != nil { |
408 | | - return delErr |
| 428 | + batch := keysToDelete[start:end] |
| 429 | + o.db.rwMutex.Lock() |
| 430 | + err = o.db.db.Update(func(txn *badger.Txn) error { |
| 431 | + for _, key := range batch { |
| 432 | + if delErr := txn.Delete([]byte(key)); delErr != nil { |
| 433 | + return delErr |
| 434 | + } |
| 435 | + } |
| 436 | + return nil |
| 437 | + }) |
| 438 | + o.db.rwMutex.Unlock() |
| 439 | + if err != nil { |
| 440 | + return deleted, err |
409 | 441 | } |
| 442 | + |
| 443 | + deleted += len(batch) |
410 | 444 | } |
411 | | - return nil |
412 | | - }); err != nil { |
413 | | - return 0, err |
| 445 | + |
| 446 | + if !hasMore { |
| 447 | + break |
| 448 | + } |
| 449 | + startKey = append([]byte{}, nextStartKey...) |
414 | 450 | } |
415 | 451 |
|
416 | | - return len(keysToDelete), nil |
| 452 | + return deleted, nil |
417 | 453 | } |
418 | 454 |
|
419 | 455 | // UserOps provides specialized operations for users |
|
0 commit comments