Skip to content

Commit a7bdef0

Browse files
committed
fix compress job
1 parent 28fd37f commit a7bdef0

File tree

2 files changed

+110
-111
lines changed

2 files changed

+110
-111
lines changed

pkg/dispatcher/compress.go

Lines changed: 107 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -152,136 +152,135 @@ func DecompressFileWithProgress(inputPath string, progressChan chan ProgressInfo
152152
return nil
153153
}
154154

155-
func CompressJob() error {
156-
// TODO separate compress jobs for each agent
157-
utils.LogDebug(fmt.Sprintf("Compress jobs started to run at %s", time.Now().Format(time.RFC3339)))
158-
for {
159-
if utils.AppConfiguration.ArchiveInterval == "" {
160-
utils.LogError("Archive interval is not set, waiting for config to be updated")
161-
time.Sleep(3 * time.Second)
162-
continue
163-
}
164-
utils.LogDebug(fmt.Sprintf("Archive interval is set to %s", utils.AppConfiguration.ArchiveInterval))
165-
break
166-
}
167-
interval, err := utils.ParseDuration(utils.AppConfiguration.ArchiveInterval)
155+
func processFiles(agentId string, interval time.Duration, archiveDir string) error {
156+
// Get all files in the snapshot directory
157+
snapshotDir := filepath.Join(utils.AppConfiguration.DataDir, "snapshot")
158+
files, err := os.ReadDir(snapshotDir)
168159
if err != nil {
169-
utils.LogError(fmt.Sprintf("Failed to parse archive interval: %v", err))
170-
return err
160+
return fmt.Errorf("failed to read snapshot directory: %v", err)
171161
}
172-
go func() {
173-
ctx, cancel := context.WithCancel(context.Background())
174-
defer cancel()
175-
176-
// Create archive directory if it doesn't exist
177-
archiveDir := filepath.Join(utils.AppConfiguration.DataDir, "archive")
178-
if err := os.MkdirAll(archiveDir, 0755); err != nil {
179-
utils.LogError(fmt.Sprintf("Failed to create archive directory: %v", err))
180-
return
181-
}
182162

183-
// Function to process files that need compression
184-
processFiles := func() error {
185-
// Get all files in the snapshot directory
186-
snapshotDir := filepath.Join(utils.AppConfiguration.DataDir, "snapshot")
187-
files, err := os.ReadDir(snapshotDir)
188-
if err != nil {
189-
return fmt.Errorf("failed to read snapshot directory: %v", err)
190-
}
191-
192-
for _, file := range files {
193-
if file.IsDir() {
194-
continue
195-
}
163+
for _, file := range files {
164+
if file.IsDir() {
165+
continue
166+
}
196167

197-
// Skip already compressed files, log files, and non-img files
198-
if !strings.HasSuffix(file.Name(), ".img") ||
199-
strings.HasSuffix(file.Name(), ".zst") {
200-
continue
201-
}
168+
// Skip already compressed files, log files, not img files, and not currentagent files
169+
if !strings.HasSuffix(file.Name(), ".img") ||
170+
strings.HasSuffix(file.Name(), ".zst") ||
171+
!strings.Contains(file.Name(), agentId) {
172+
continue
173+
}
202174

203-
filePath := filepath.Join(snapshotDir, file.Name())
204-
fileInfo, err := file.Info()
205-
if err != nil {
206-
utils.LogError(fmt.Sprintf("Failed to get file info for %s: %v", filePath, err))
207-
continue
208-
}
175+
filePath := filepath.Join(snapshotDir, file.Name())
176+
fileInfo, err := file.Info()
177+
if err != nil {
178+
utils.LogError(fmt.Sprintf("Failed to get file info for %s: %v", filePath, err))
179+
continue
180+
}
209181

210-
// Check if file is older than archive interval
211-
if time.Since(fileInfo.ModTime()) > interval {
212-
utils.LogDebug(fmt.Sprintf("Processing file for archival: %s", filePath))
182+
// Check if file is older than archive interval
183+
if time.Since(fileInfo.ModTime()) > interval {
184+
utils.LogDebug(fmt.Sprintf("Processing file for archival: %s", filePath))
213185

214-
// Create progress channel
215-
progressChan := make(chan ProgressInfo)
186+
// Create progress channel
187+
progressChan := make(chan ProgressInfo)
216188

217-
// Start progress monitoring goroutine
218-
actionId := strings.TrimSuffix(file.Name(), ".img")
219-
archiveId := utils.AppConfiguration.DataDir + "/archive/" + file.Name() + ".zst"
220-
go UpdateCompressProgress(progressChan, actionId, archiveId)
189+
// Start progress monitoring goroutine
190+
actionId := strings.TrimSuffix(file.Name(), ".img")
191+
archiveId := utils.AppConfiguration.DataDir + "/archive/" + file.Name() + ".zst"
192+
go UpdateCompressProgress(progressChan, actionId, archiveId)
221193

222-
// Compress the file
223-
if err := CompressFileWithProgress(filePath, progressChan); err != nil {
224-
utils.LogError(fmt.Sprintf("Failed to compress %s: %v", filePath, err))
225-
close(progressChan)
226-
continue
227-
}
228-
close(progressChan)
194+
// Compress the file
195+
if err := CompressFileWithProgress(filePath, progressChan); err != nil {
196+
utils.LogError(fmt.Sprintf("Failed to compress %s: %v", filePath, err))
197+
close(progressChan)
198+
continue
199+
}
200+
close(progressChan)
229201

230-
// Move compressed file to archive directory
231-
compressedPath := filePath + ".zst"
232-
archivePath := filepath.Join(archiveDir, file.Name()+".zst")
202+
// Move compressed file to archive directory
203+
compressedPath := filePath + ".zst"
204+
archivePath := filepath.Join(archiveDir, file.Name()+".zst")
233205

234-
if err := os.Rename(compressedPath, archivePath); err != nil {
235-
utils.LogError(fmt.Sprintf("Failed to move compressed file to archive: %v", err))
236-
continue
237-
}
206+
if err := os.Rename(compressedPath, archivePath); err != nil {
207+
utils.LogError(fmt.Sprintf("Failed to move compressed file to archive: %v", err))
208+
continue
209+
}
238210

239-
// Move corresponding log file if it exists
240-
logPath := strings.TrimSuffix(filePath, ".img") + ".log"
241-
archiveLogPath := filepath.Join(archiveDir, strings.TrimSuffix(file.Name(), ".img")+".log")
242-
243-
// Check if log file exists before trying to move it
244-
if _, err := os.Stat(logPath); err == nil {
245-
if err := os.Rename(logPath, archiveLogPath); err != nil {
246-
utils.LogError(fmt.Sprintf("Failed to move log file to archive: %v", err))
247-
}
248-
} else {
249-
utils.LogDebug(fmt.Sprintf("No log file found for %s", file.Name()))
250-
}
211+
// Move corresponding log file if it exists
212+
logPath := strings.TrimSuffix(filePath, ".img") + ".log"
213+
archiveLogPath := filepath.Join(archiveDir, strings.TrimSuffix(file.Name(), ".img")+".log")
251214

252-
// Remove original file after successful compression and move
253-
if err := os.Remove(filePath); err != nil {
254-
utils.LogError(fmt.Sprintf("Failed to remove original file: %v", err))
255-
} else {
256-
utils.LogDebug(fmt.Sprintf("Successfully compressed and archived %s", file.Name()))
257-
}
215+
// Check if log file exists before trying to move it
216+
if _, err := os.Stat(logPath); err == nil {
217+
if err := os.Rename(logPath, archiveLogPath); err != nil {
218+
utils.LogError(fmt.Sprintf("Failed to move log file to archive: %v", err))
258219
}
220+
} else {
221+
utils.LogDebug(fmt.Sprintf("No log file found for %s", file.Name()))
222+
}
223+
224+
// Remove original file after successful compression and move
225+
if err := os.Remove(filePath); err != nil {
226+
utils.LogError(fmt.Sprintf("Failed to remove original file: %v", err))
227+
} else {
228+
utils.LogDebug(fmt.Sprintf("Successfully compressed and archived %s", file.Name()))
259229
}
260-
return nil
261230
}
231+
}
232+
return nil
233+
}
262234

263-
// Run initial compression for all agents
264-
if err := processFiles(); err != nil {
265-
utils.LogError(fmt.Sprintf("Initial compression for all agents failed: %v", err))
235+
func CompressJob() error {
236+
agents, err := service.GetAllAgentsMap(-1)
237+
if err != nil {
238+
utils.LogError(fmt.Sprintf("Failed to get agents: %v", err))
239+
return err
240+
}
241+
utils.LogDebug(fmt.Sprintf("Compress jobs started to run at %s for %d agents", time.Now().Format(time.RFC3339), len(agents)))
242+
for _, agent := range agents {
243+
currentAgent := agent
244+
utils.LogDebug(fmt.Sprintf("Compress jobs started to run at %s for agent %s", time.Now().Format(time.RFC3339), currentAgent.AgentId))
245+
utils.LogDebug(fmt.Sprintf("Agent: %+v", currentAgent))
246+
interval, err := utils.ParseDuration(currentAgent.ArchiveInterval)
247+
if err != nil {
248+
utils.LogError(fmt.Sprintf("Failed to parse archive interval: %v", err))
249+
return err
266250
}
251+
go func(agent utils.Agent, interval time.Duration) {
252+
ctx, cancel := context.WithCancel(context.Background())
253+
defer cancel()
254+
255+
// Create archive directory if it doesn't exist
256+
archiveDir := filepath.Join(utils.AppConfiguration.DataDir, "archive")
257+
if err := os.MkdirAll(archiveDir, 0755); err != nil {
258+
utils.LogError(fmt.Sprintf("Failed to create archive directory: %v", err))
259+
return
260+
}
267261

268-
ticker := time.NewTicker(interval)
269-
defer ticker.Stop()
262+
// Run initial compression for this agent
263+
if err := processFiles(agent.AgentId, interval, archiveDir); err != nil {
264+
utils.LogError(fmt.Sprintf("Initial compression for agent %s failed: %v", agent.AgentId, err))
265+
}
270266

271-
// Then run periodic compression
272-
for {
273-
select {
274-
case <-ctx.Done():
275-
utils.LogDebug("Compress jobs stopped")
276-
return
277-
case <-ticker.C:
278-
if err := processFiles(); err != nil {
279-
utils.LogError(fmt.Sprintf("Periodic compression failed: %v", err))
267+
ticker := time.NewTicker(interval)
268+
defer ticker.Stop()
269+
270+
// Then run periodic compression
271+
for {
272+
select {
273+
case <-ctx.Done():
274+
utils.LogDebug("Compress jobs stopped")
275+
return
276+
case <-ticker.C:
277+
if err := processFiles(agent.AgentId, interval, archiveDir); err != nil {
278+
utils.LogError(fmt.Sprintf("Periodic compression for agent %s failed: %v", agent.AgentId, err))
279+
}
280280
}
281281
}
282-
}
283-
}()
284-
282+
}(currentAgent, interval)
283+
}
285284
return nil
286285
}
287286

plans/testplan.yml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
name: "production-backup-policy"
22
description: "Backup policy for production servers"
3-
archive_interval: 24
4-
snapshot_frequency: 4
3+
archive_interval: 24h
4+
snapshot_frequency: "daily"
55
snapshot_time: "00:00"
66
bandwidth_limit: 50
77
snapshot_retention: 7
8-
live_sync_frequency: 1
8+
live_sync_frequency: 1m
99
transition_after_days: 30
1010
delete_after_days: 90
1111

0 commit comments

Comments
 (0)