Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 126 additions & 26 deletions component/file_cache/lru_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ package file_cache
import (
"bytes"
"encoding/gob"
"fmt"
"os"
"path/filepath"
"strings"
Expand Down Expand Up @@ -75,6 +76,9 @@ type lruPolicy struct {

// Tracks scheduled files to skip during eviction
schedule *FileCache

// Counter for snapshot file rotation
snapshotCounter int
}

// LRUPolicySnapshot represents the *persisted state* of lruPolicy.
Expand All @@ -83,13 +87,13 @@ type LRUPolicySnapshot struct {
NodeList []string // Just node names, *without their fc.tmp prefix*, in linked list order
CurrMarkerPosition uint64 // Node index of currMarker
LastMarkerPosition uint64 // Node index of lastMarker
ScheduleOps []string // List of scheduled operations, if any
Timestamp int64 // Add this field
}

const (
// Check for disk usage in below number of minutes
DiskUsageCheckInterval = 1
// Cache snapshot relative filepath
snapshotPath = ".fileCacheSnapshot.gob"
)

var _ cachePolicy = &lruPolicy{}
Expand Down Expand Up @@ -144,9 +148,11 @@ func (p *lruPolicy) StartPolicy() error {

// start the timeout monitor
p.cacheTimeoutMonitor = time.Tick(time.Duration(p.cacheTimeout) * time.Second)
p.snapshotCounter = 0

go p.clearCache()
go p.asyncCacheValid()
go p.periodicSnapshotter()

return nil

Expand All @@ -156,7 +162,7 @@ func (p *lruPolicy) ShutdownPolicy() error {
log.Trace("lruPolicy::ShutdownPolicy")
p.closeSignal <- 1
p.closeSignalValidate <- 1
return p.createSnapshot().writeToFile(p.tmpPath)
return p.writeSnapshotToFile(p.createSnapshot())
}

func (fc *FileCache) IsScheduled(objName string) bool {
Expand All @@ -166,8 +172,10 @@ func (fc *FileCache) IsScheduled(objName string) bool {

func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot {
log.Trace("lruPolicy::saveSnapshot")
var snapshot LRUPolicySnapshot
// var snapshot LRUPolicySnapshot
var index uint64
snapshot := LRUPolicySnapshot{}

p.Lock()
defer p.Unlock()
// walk the list and write the entries into a SerializableLRUPolicy
Expand All @@ -186,6 +194,19 @@ func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot {
}
index++
}

//Add scheduled operations to the snapshot
if p.schedule != nil {
p.schedule.scheduleOps.Range(func(key, value interface{}) bool {
if name, ok := key.(string); ok {
snapshot.ScheduleOps = append(snapshot.ScheduleOps, name)
}
return true
})
}

snapshot.Timestamp = time.Now().UnixNano()

return &snapshot
}

Expand Down Expand Up @@ -244,45 +265,124 @@ func (p *lruPolicy) loadSnapshot(snapshot *LRUPolicySnapshot) {
}
nodeIndex++
}
}

func (ss *LRUPolicySnapshot) writeToFile(tmpPath string) error {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
err := enc.Encode(ss)
if err != nil {
log.Crit("lruPolicy::ShutdownPolicy : Failed to encode policy snapshot")
return err
// Restore scheduledOps from snapshot
if len(snapshot.ScheduleOps) > 0 {
// Create a new FileCache for schedule if it doesn't exist
if p.schedule == nil {
p.schedule = &FileCache{
scheduleOps: sync.Map{},
}
}

for _, name := range snapshot.ScheduleOps {
p.schedule.scheduleOps.Store(name, struct{}{})
}
}
return os.WriteFile(filepath.Join(tmpPath, snapshotPath), buf.Bytes(), 0644)
}

func readSnapshotFromFile(tmpPath string) (*LRUPolicySnapshot, error) {
fullSnapshotPath := filepath.Join(tmpPath, snapshotPath)
defer os.Remove(fullSnapshotPath)
snapshotData, err := os.ReadFile(fullSnapshotPath)
if err != nil {
if !os.IsNotExist(err) {
log.Crit(
"lruPolicy::readSnapshotFromFile : Failed to read snapshot file. Here's why: %v",
err,
)
// Try both snapshot files and use the most recent valid one
snapshot0Path := filepath.Join(tmpPath, "snapshot.0.dat")
snapshot1Path := filepath.Join(tmpPath, "snapshot.1.dat")

snapshot0, err0 := tryReadSnapshot(snapshot0Path)
if err0 != nil && !os.IsNotExist(err0) {
log.Crit(
"lruPolicy::readSnapshotFromFile : Failed to read snapshot file %s. Here's why: %v",
snapshot0Path, err0,
)
}

snapshot1, err1 := tryReadSnapshot(snapshot1Path)
if err1 != nil && !os.IsNotExist(err1) {
log.Crit(
"lruPolicy::readSnapshotFromFile : Failed to read snapshot file %s. Here's why: %v",
snapshot1Path, err1,
)
}

if err0 == nil && err1 == nil {
// Both valid, compare timestamps and return the newer one
if snapshot0.Timestamp > snapshot1.Timestamp {
return snapshot0, nil
}
return snapshot1, nil
} else if err0 == nil {
return snapshot0, nil
} else if err1 == nil {
return snapshot1, nil
}
// Only log as critical if neither file exists - otherwise it's normal for a fresh install
if !os.IsNotExist(err0) || !os.IsNotExist(err1) {
log.Crit("lruPolicy::readSnapshotFromFile : No valid snapshots found")
}
return nil, fmt.Errorf("no valid snapshots found")
}

// tryReadSnapshot attempts to read and decode a snapshot file.
func tryReadSnapshot(path string) (*LRUPolicySnapshot, error) {
snapshotData, err := os.ReadFile(path)
if err != nil {
return nil, err
}
var snapshot LRUPolicySnapshot
dec := gob.NewDecoder(bytes.NewReader(snapshotData))
err = dec.Decode(&snapshot)
if err != nil {
log.Crit(
"lruPolicy::readSnapshotFromFile : Failed to decode snapshot data. Here's why: %v",
err,
)
return nil, err
}
return &snapshot, nil
}

func (p *lruPolicy) periodicSnapshotter() {
// Create ticker for periodic snapshots (e.g., every 5 minutes)
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()

for {
select {
case <-ticker.C:
// Create and write snapshot
snapshot := p.createSnapshot()
err := p.writeSnapshotToFile(snapshot)
if err != nil {
log.Err("lruPolicy::periodicSnapshotter : Failed to write snapshot: %v", err)
} else {
log.Info("lruPolicy::periodicSnapshotter : Successfully wrote periodic snapshot")
}

case <-p.closeSignal:
// Exit when policy is shutting down
return
}
}
}

func (p *lruPolicy) writeSnapshotToFile(snapshot *LRUPolicySnapshot) error {
// Rotate between two snapshot files
p.snapshotCounter = (p.snapshotCounter + 1) % 2
filename := filepath.Join(p.tmpPath, fmt.Sprintf("snapshot.%d.dat", p.snapshotCounter))

tempFile := filename + ".tmp"

if err := writeToFile(tempFile, snapshot); err != nil {
return err
}

return os.Rename(tempFile, filename)
}

// writeToFile serializes the snapshot using gob and writes it to the specified file.
func writeToFile(filename string, snapshot *LRUPolicySnapshot) error {
var buf bytes.Buffer
enc := gob.NewEncoder(&buf)
if err := enc.Encode(snapshot); err != nil {
return err
}
return os.WriteFile(filename, buf.Bytes(), 0644)
}

func (p *lruPolicy) UpdateConfig(c cachePolicyConfig) error {
log.Trace("lruPolicy::UpdateConfig")
p.maxSizeMB = c.maxSizeMB
Expand Down
Loading
Loading