diff --git a/component/file_cache/lru_policy.go b/component/file_cache/lru_policy.go index d15e6e5e0..9c4a792bf 100644 --- a/component/file_cache/lru_policy.go +++ b/component/file_cache/lru_policy.go @@ -28,6 +28,7 @@ package file_cache import ( "bytes" "encoding/gob" + "fmt" "os" "path/filepath" "strings" @@ -75,6 +76,9 @@ type lruPolicy struct { // Tracks scheduled files to skip during eviction schedule *FileCache + + // Counter for snapshot file rotation + snapshotCounter int } // LRUPolicySnapshot represents the *persisted state* of lruPolicy. @@ -83,13 +87,13 @@ type LRUPolicySnapshot struct { NodeList []string // Just node names, *without their fc.tmp prefix*, in linked list order CurrMarkerPosition uint64 // Node index of currMarker LastMarkerPosition uint64 // Node index of lastMarker + ScheduleOps []string // List of scheduled operations, if any + Timestamp int64 // Add this field } const ( // Check for disk usage in below number of minutes DiskUsageCheckInterval = 1 - // Cache snapshot relative filepath - snapshotPath = ".fileCacheSnapshot.gob" ) var _ cachePolicy = &lruPolicy{} @@ -144,9 +148,11 @@ func (p *lruPolicy) StartPolicy() error { // start the timeout monitor p.cacheTimeoutMonitor = time.Tick(time.Duration(p.cacheTimeout) * time.Second) + p.snapshotCounter = 0 go p.clearCache() go p.asyncCacheValid() + go p.periodicSnapshotter() return nil @@ -156,7 +162,7 @@ func (p *lruPolicy) ShutdownPolicy() error { log.Trace("lruPolicy::ShutdownPolicy") p.closeSignal <- 1 p.closeSignalValidate <- 1 - return p.createSnapshot().writeToFile(p.tmpPath) + return p.writeSnapshotToFile(p.createSnapshot()) } func (fc *FileCache) IsScheduled(objName string) bool { @@ -166,8 +172,10 @@ func (fc *FileCache) IsScheduled(objName string) bool { func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot { log.Trace("lruPolicy::saveSnapshot") - var snapshot LRUPolicySnapshot + // var snapshot LRUPolicySnapshot var index uint64 + snapshot := LRUPolicySnapshot{} + p.Lock() defer p.Unlock() // walk the list and write the entries into a SerializableLRUPolicy @@ -186,6 +194,19 @@ func (p *lruPolicy) createSnapshot() *LRUPolicySnapshot { } index++ } + + //Add scheduled operations to the snapshot + if p.schedule != nil { + p.schedule.scheduleOps.Range(func(key, value interface{}) bool { + if name, ok := key.(string); ok { + snapshot.ScheduleOps = append(snapshot.ScheduleOps, name) + } + return true + }) + } + + snapshot.Timestamp = time.Now().UnixNano() + return &snapshot } @@ -244,45 +265,124 @@ func (p *lruPolicy) loadSnapshot(snapshot *LRUPolicySnapshot) { } nodeIndex++ } -} -func (ss *LRUPolicySnapshot) writeToFile(tmpPath string) error { - var buf bytes.Buffer - enc := gob.NewEncoder(&buf) - err := enc.Encode(ss) - if err != nil { - log.Crit("lruPolicy::ShutdownPolicy : Failed to encode policy snapshot") - return err + // Restore scheduledOps from snapshot + if len(snapshot.ScheduleOps) > 0 { + // Create a new FileCache for schedule if it doesn't exist + if p.schedule == nil { + p.schedule = &FileCache{ + scheduleOps: sync.Map{}, + } + } + + for _, name := range snapshot.ScheduleOps { + p.schedule.scheduleOps.Store(name, struct{}{}) + } } - return os.WriteFile(filepath.Join(tmpPath, snapshotPath), buf.Bytes(), 0644) } func readSnapshotFromFile(tmpPath string) (*LRUPolicySnapshot, error) { - fullSnapshotPath := filepath.Join(tmpPath, snapshotPath) - defer os.Remove(fullSnapshotPath) - snapshotData, err := os.ReadFile(fullSnapshotPath) - if err != nil { - if !os.IsNotExist(err) { - log.Crit( - "lruPolicy::readSnapshotFromFile : Failed to read snapshot file. Here's why: %v", - err, - ) + // Try both snapshot files and use the most recent valid one + snapshot0Path := filepath.Join(tmpPath, "snapshot.0.dat") + snapshot1Path := filepath.Join(tmpPath, "snapshot.1.dat") + + snapshot0, err0 := tryReadSnapshot(snapshot0Path) + if err0 != nil && !os.IsNotExist(err0) { + log.Crit( + "lruPolicy::readSnapshotFromFile : Failed to read snapshot file %s. Here's why: %v", + snapshot0Path, err0, + ) + } + + snapshot1, err1 := tryReadSnapshot(snapshot1Path) + if err1 != nil && !os.IsNotExist(err1) { + log.Crit( + "lruPolicy::readSnapshotFromFile : Failed to read snapshot file %s. Here's why: %v", + snapshot1Path, err1, + ) + } + + if err0 == nil && err1 == nil { + // Both valid, compare timestamps and return the newer one + if snapshot0.Timestamp > snapshot1.Timestamp { + return snapshot0, nil } + return snapshot1, nil + } else if err0 == nil { + return snapshot0, nil + } else if err1 == nil { + return snapshot1, nil + } + // Only log as critical if neither file exists - otherwise it's normal for a fresh install + if !os.IsNotExist(err0) || !os.IsNotExist(err1) { + log.Crit("lruPolicy::readSnapshotFromFile : No valid snapshots found") + } + return nil, fmt.Errorf("no valid snapshots found") +} + +// tryReadSnapshot attempts to read and decode a snapshot file. +func tryReadSnapshot(path string) (*LRUPolicySnapshot, error) { + snapshotData, err := os.ReadFile(path) + if err != nil { return nil, err } var snapshot LRUPolicySnapshot dec := gob.NewDecoder(bytes.NewReader(snapshotData)) err = dec.Decode(&snapshot) if err != nil { - log.Crit( - "lruPolicy::readSnapshotFromFile : Failed to decode snapshot data. Here's why: %v", - err, - ) return nil, err } return &snapshot, nil } +func (p *lruPolicy) periodicSnapshotter() { + // Create ticker for periodic snapshots (e.g., every 5 minutes) + ticker := time.NewTicker(5 * time.Minute) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + // Create and write snapshot + snapshot := p.createSnapshot() + err := p.writeSnapshotToFile(snapshot) + if err != nil { + log.Err("lruPolicy::periodicSnapshotter : Failed to write snapshot: %v", err) + } else { + log.Info("lruPolicy::periodicSnapshotter : Successfully wrote periodic snapshot") + } + + case <-p.closeSignal: + // Exit when policy is shutting down + return + } + } +} + +func (p *lruPolicy) writeSnapshotToFile(snapshot *LRUPolicySnapshot) error { + // Rotate between two snapshot files + p.snapshotCounter = (p.snapshotCounter + 1) % 2 + filename := filepath.Join(p.tmpPath, fmt.Sprintf("snapshot.%d.dat", p.snapshotCounter)) + + tempFile := filename + ".tmp" + + if err := writeToFile(tempFile, snapshot); err != nil { + return err + } + + return os.Rename(tempFile, filename) +} + +// writeToFile serializes the snapshot using gob and writes it to the specified file. +func writeToFile(filename string, snapshot *LRUPolicySnapshot) error { + var buf bytes.Buffer + enc := gob.NewEncoder(&buf) + if err := enc.Encode(snapshot); err != nil { + return err + } + return os.WriteFile(filename, buf.Bytes(), 0644) +} + func (p *lruPolicy) UpdateConfig(c cachePolicyConfig) error { log.Trace("lruPolicy::UpdateConfig") p.maxSizeMB = c.maxSizeMB diff --git a/component/file_cache/lru_policy_test.go b/component/file_cache/lru_policy_test.go index a84d63314..2f39cda34 100644 --- a/component/file_cache/lru_policy_test.go +++ b/component/file_cache/lru_policy_test.go @@ -30,6 +30,8 @@ import ( "io/fs" "os" "path/filepath" + "strings" + "sync" "testing" "time" @@ -475,19 +477,239 @@ func (suite *lruPolicyTestSuite) TestCreateSnapshotLeadingMarkers() { func (suite *lruPolicyTestSuite) TestSnapshotSerialization() { defer suite.cleanupTest() - // setup + + // Setup - create a snapshot with test data snapshot := &LRUPolicySnapshot{ NodeList: []string{"a", "b", "c"}, CurrMarkerPosition: 1, LastMarkerPosition: 2, + ScheduleOps: []string{"op1", "op2"}, + Timestamp: time.Now().UnixNano(), } - // test - err := snapshot.writeToFile(cache_path) + + err := suite.policy.writeSnapshotToFile(snapshot) suite.assert.NoError(err) - snapshotFromFile, err := readSnapshotFromFile(cache_path) + + readSnapshot, err := readSnapshotFromFile(suite.policy.tmpPath) suite.assert.NoError(err) - // assert - suite.assert.Equal(snapshot, snapshotFromFile) // this checks deep equality + + // Verify the snapshot was preserved correctly + suite.assert.Equal(snapshot.NodeList, readSnapshot.NodeList) + suite.assert.Equal(snapshot.CurrMarkerPosition, readSnapshot.CurrMarkerPosition) + suite.assert.Equal(snapshot.LastMarkerPosition, readSnapshot.LastMarkerPosition) + suite.assert.Equal(snapshot.ScheduleOps, readSnapshot.ScheduleOps) + suite.assert.Equal(snapshot.Timestamp, readSnapshot.Timestamp) +} + +func (suite *lruPolicyTestSuite) TestPeriodicSnapshotterCreatesFiles() { + defer suite.cleanupTest() + + // Create some files to be included in the snapshot + for i := 1; i <= 3; i++ { + suite.policy.CacheValid(filepath.Join(cache_path, fmt.Sprintf("test_periodic_%d", i))) + } + + // Mock the periodicSnapshotter with a short interval for testing + // Start a custom snapshot routine with short interval + originalTicker := time.NewTicker(100 * time.Millisecond) + defer originalTicker.Stop() + + done := make(chan struct{}) + + // Start a goroutine to trigger snapshots manually using the ticker + go func() { + for range originalTicker.C { + snapshot := suite.policy.createSnapshot() + err := suite.policy.writeSnapshotToFile(snapshot) + suite.assert.NoError(err) + + // Check if both snapshot files exist after rotation + file0 := filepath.Join(suite.policy.tmpPath, "snapshot.0.dat") + file1 := filepath.Join(suite.policy.tmpPath, "snapshot.1.dat") + + // Once we detect both files have been created, we're done + if _, err0 := os.Stat(file0); err0 == nil { + if _, err1 := os.Stat(file1); err1 == nil { + close(done) + return + } + } + } + }() + + select { + case <-done: + // Success - found both snapshot files + case <-time.After(1 * time.Second): + suite.T().Error("Timed out waiting for snapshot files to be created") + } + + // Verify both snapshot files exist + file0Exists := false + file1Exists := false + + if _, err := os.Stat(filepath.Join(suite.policy.tmpPath, "snapshot.0.dat")); err == nil { + file0Exists = true + } + + if _, err := os.Stat(filepath.Join(suite.policy.tmpPath, "snapshot.1.dat")); err == nil { + file1Exists = true + } + + suite.assert.True(file0Exists || file1Exists, "At least one snapshot file should exist") +} + +func (suite *lruPolicyTestSuite) TestPeriodicSnapshotRotation() { + defer suite.cleanupTest() + + snapshotFile0 := filepath.Join(suite.policy.tmpPath, "snapshot.0.dat") + snapshotFile1 := filepath.Join(suite.policy.tmpPath, "snapshot.1.dat") + os.Remove(snapshotFile0) + os.Remove(snapshotFile1) + + suite.policy.snapshotCounter = 0 + + snapshot1 := suite.policy.createSnapshot() + err := suite.policy.writeSnapshotToFile(snapshot1) + suite.assert.NoError(err) + suite.assert.Equal(1, suite.policy.snapshotCounter) + + time.Sleep(50 * time.Millisecond) + + _, err = os.Stat(snapshotFile0) + if os.IsNotExist(err) { + // If the default snapshot file doesn't exist, let's look for the real one + files, _ := filepath.Glob(filepath.Join(suite.policy.tmpPath, "snapshot.*")) + suite.T().Logf("Found snapshot files: %v", files) + suite.assert.NotEmpty(files, "No snapshot files found") + } else { + suite.assert.NoError(err, "Error checking snapshot.0.dat") + } + + // Create second snapshot - should increment counter and rotate + snapshot2 := suite.policy.createSnapshot() + err = suite.policy.writeSnapshotToFile(snapshot2) + suite.assert.NoError(err) + suite.assert.Equal(0, suite.policy.snapshotCounter) + + time.Sleep(50 * time.Millisecond) + + // Create third snapshot - should wrap back to first file + snapshot3 := suite.policy.createSnapshot() + err = suite.policy.writeSnapshotToFile(snapshot3) + suite.assert.NoError(err) + suite.assert.Equal(1, suite.policy.snapshotCounter) + + time.Sleep(50 * time.Millisecond) + + // Check if any snapshot files exist + files, _ := filepath.Glob(filepath.Join(suite.policy.tmpPath, "snapshot.*")) + suite.T().Logf("Found snapshot files: %v", files) + suite.assert.NotEmpty(files, "No snapshot files found after rotation") +} + +func (suite *lruPolicyTestSuite) TestSnapshotConsistencyAfterOperations() { + defer suite.cleanupTest() + + // 1. Start with some files + fileNames := []string{ + filepath.Join(cache_path, "consistency_file1"), + filepath.Join(cache_path, "consistency_file2"), + filepath.Join(cache_path, "consistency_file3"), + } + for _, name := range fileNames { + suite.policy.CacheValid(name) + } + + snapshot1 := suite.policy.createSnapshot() + err := suite.policy.writeSnapshotToFile(snapshot1) + suite.assert.NoError(err) + + suite.policy.CacheValid(filepath.Join(cache_path, "consistency_file4")) + suite.policy.CachePurge(fileNames[1]) // Remove the second file + + snapshot2 := suite.policy.createSnapshot() + err = suite.policy.writeSnapshotToFile(snapshot2) + suite.assert.NoError(err) + + readSnapshot, err := readSnapshotFromFile(suite.policy.tmpPath) + suite.assert.NoError(err) + + // Should contain file1, file3, file4 but not file2 + snapshotContainsFile := make(map[string]bool) + for _, nodeName := range readSnapshot.NodeList { + snapshotContainsFile[nodeName] = true + } + + // Normalize the paths for comparison + file1 := strings.TrimPrefix(fileNames[0], cache_path) + file2 := strings.TrimPrefix(fileNames[1], cache_path) + file3 := strings.TrimPrefix(fileNames[2], cache_path) + file4 := strings.TrimPrefix(filepath.Join(cache_path, "consistency_file4"), cache_path) + + suite.assert.True(snapshotContainsFile[file1], "Snapshot should contain file1") + suite.assert.False( + snapshotContainsFile[file2], + "Snapshot should not contain file2 which was purged", + ) + suite.assert.True(snapshotContainsFile[file3], "Snapshot should contain file3") + suite.assert.True(snapshotContainsFile[file4], "Snapshot should contain file4 which was added") +} + +func (suite *lruPolicyTestSuite) TestPeriodicSnapshotWithEmptyCache() { + defer suite.cleanupTest() + + nodeMap := sync.Map{} + suite.policy.nodeMap = nodeMap + suite.policy.head = suite.policy.currMarker + suite.policy.currMarker.next = suite.policy.lastMarker + suite.policy.lastMarker.prev = suite.policy.currMarker + + // Create a snapshot with empty cache + snapshot := suite.policy.createSnapshot() + err := suite.policy.writeSnapshotToFile(snapshot) + suite.assert.NoError(err) + + // Read back the snapshot + readSnapshot, err := readSnapshotFromFile(suite.policy.tmpPath) + suite.assert.NoError(err) + + // Verify the snapshot is empty but valid + suite.assert.Empty(readSnapshot.NodeList) + suite.assert.Equal(uint64(0), readSnapshot.CurrMarkerPosition) + suite.assert.Equal(uint64(1), readSnapshot.LastMarkerPosition) +} + +func (suite *lruPolicyTestSuite) TestPeriodicSnapshotWithScheduledOperations() { + defer suite.cleanupTest() + + // Setup scheduled operations + fakeSchedule := &FileCache{} + fakeSchedule.scheduleOps.Store("operation1", struct{}{}) + fakeSchedule.scheduleOps.Store("operation2", struct{}{}) + suite.policy.schedule = fakeSchedule + + snapshot := suite.policy.createSnapshot() + err := suite.policy.writeSnapshotToFile(snapshot) + suite.assert.NoError(err) + + readSnapshot, err := readSnapshotFromFile(suite.policy.tmpPath) + suite.assert.NoError(err) + + containsOp1 := false + containsOp2 := false + + for _, op := range readSnapshot.ScheduleOps { + if op == "operation1" { + containsOp1 = true + } + if op == "operation2" { + containsOp2 = true + } + } + + suite.assert.True(containsOp1, "Snapshot should preserve scheduled operation1") + suite.assert.True(containsOp2, "Snapshot should preserve scheduled operation2") } func (suite *lruPolicyTestSuite) TestNoEvictionIfInScheduleOps() { @@ -537,6 +759,69 @@ func (suite *lruPolicyTestSuite) TestEvictionRespectsScheduleOps() { ) } +func (suite *lruPolicyTestSuite) TestSnapshotPreservesScheduleOps() { + defer suite.cleanupTest() + + // Setup test files + fileNames := []string{ + filepath.Join(cache_path, "snapshot_file1"), + filepath.Join(cache_path, "snapshot_file2"), + filepath.Join(cache_path, "snapshot_file3"), + filepath.Join(cache_path, "snapshot_file4"), + } + for _, name := range fileNames { + suite.policy.CacheValid(name) + } + + fakeSchedule := &FileCache{} + fakeSchedule.scheduleOps.Store(common.NormalizeObjectName("snapshot_file2"), struct{}{}) + fakeSchedule.scheduleOps.Store(common.NormalizeObjectName("snapshot_file4"), struct{}{}) + suite.policy.schedule = fakeSchedule + + originalPolicy := suite.policy + snapshot := suite.policy.createSnapshot() + + suite.assert.NotNil(snapshot) + suite.assert.Contains(snapshot.ScheduleOps, common.NormalizeObjectName("snapshot_file2")) + suite.assert.Contains(snapshot.ScheduleOps, common.NormalizeObjectName("snapshot_file4")) + + suite.cleanupTest() + suite.setupTestHelper(originalPolicy.cachePolicyConfig) + suite.policy.loadSnapshot(snapshot) + + for _, name := range fileNames { + suite.assert.True( + suite.policy.IsCached(name), + name+" should be cached after loading snapshot", + ) + } + + scheduledOpsExist := false + if suite.policy.schedule != nil { + _, found2 := suite.policy.schedule.scheduleOps.Load( + common.NormalizeObjectName("snapshot_file2"), + ) + _, found4 := suite.policy.schedule.scheduleOps.Load( + common.NormalizeObjectName("snapshot_file4"), + ) + scheduledOpsExist = found2 && found4 + } + suite.assert.True(scheduledOpsExist, "scheduledOps should be restored from snapshot") + + time.Sleep(3 * time.Second) + + suite.assert.False(suite.policy.IsCached(fileNames[0]), "file1 should be evicted after timeout") + suite.assert.True( + suite.policy.IsCached(fileNames[1]), + "file2 should NOT be evicted (restored from scheduledOps in snapshot)", + ) + suite.assert.False(suite.policy.IsCached(fileNames[2]), "file3 should be evicted after timeout") + suite.assert.True( + suite.policy.IsCached(fileNames[3]), + "file4 should NOT be evicted (restored from scheduledOps in snapshot)", + ) +} + func TestLRUPolicyTestSuite(t *testing.T) { suite.Run(t, new(lruPolicyTestSuite)) }