Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions component/file_cache/file_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/Seagate/cloudfuse/internal"
"github.com/Seagate/cloudfuse/internal/handlemap"
"github.com/Seagate/cloudfuse/internal/stats_manager"
cron "github.com/netresearch/go-cron"
)

// Common structure for Component
Expand Down Expand Up @@ -84,6 +85,7 @@ type FileCache struct {
activeWindows int
activeWindowsMutex *sync.Mutex
closeWindowCh chan struct{}
cronScheduler *cron.Cron
}

// Structure defining your config parameters
Expand Down Expand Up @@ -181,6 +183,7 @@ func (fc *FileCache) Start(ctx context.Context) error {
log.Debug("Starting file cache stats collector")

fc.uploadNotifyCh = make(chan struct{}, 1)
fc.stopAsyncUpload = make(chan struct{})
err = fc.SetupScheduler()
if err != nil {
log.Warn("FileCache::Start : Failed to setup scheduler [%s]", err.Error())
Expand All @@ -193,6 +196,18 @@ func (fc *FileCache) Start(ctx context.Context) error {
func (fc *FileCache) Stop() error {
log.Trace("Stopping component : %s", fc.Name())

// Signal active upload windows to stop
if fc.stopAsyncUpload != nil {
close(fc.stopAsyncUpload)
}

// Stop the cron scheduler and wait for running jobs to complete
if fc.cronScheduler != nil {
log.Info("FileCache::Stop : Stopping cron scheduler")
<-fc.cronScheduler.Stop().Done()
log.Info("FileCache::Stop : Cron scheduler stopped")
}

// Wait for all async upload to complete if any
if fc.lazyWrite {
log.Info("FileCache::Stop : Waiting for async close to complete")
Expand Down
41 changes: 28 additions & 13 deletions component/file_cache/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ func (fc *FileCache) SetupScheduler() error {
cronScheduler := cron.New(cron.WithSeconds())
fc.scheduleUploads(cronScheduler, fc.schedule)
cronScheduler.Start()
fc.cronScheduler = cronScheduler

log.Info("FileCache::SetupScheduler : Scheduler started successfully")
return nil
Expand All @@ -86,6 +87,11 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
log.Info("FileCache::SetupScheduler : Upload window ended")
close(fc.closeWindowCh)
}

parser := cron.MustNewParser(
cron.Second | cron.Minute | cron.Hour | cron.Dom | cron.Month | cron.Dow | cron.Descriptor,
)

// start up the schedules
for _, config := range sched {
windowName := config.Name
Expand All @@ -94,9 +100,29 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
log.Info("[%s] Invalid duration '%s': %v\n", windowName, config.Duration, err)
continue
}

// Determine if we're joining a window that's already active by
// finding the most recent scheduled start via Prev().
now := time.Now()
var initialWindowEndTime time.Time
var jobOpts []cron.JobOption

schedule, _ := parser.Parse(config.CronExpr)
if sp, ok := schedule.(cron.ScheduleWithPrev); ok {
prevStart := sp.Prev(now)
if !prevStart.IsZero() && prevStart.Add(duration).After(now) {
// We're inside an active window that started at prevStart.
initialWindowEndTime = prevStart.Add(duration)
// Run immediately to join the in-progress window with shortened duration.
jobOpts = append(jobOpts, cron.WithRunImmediately())
log.Info(
"FileCache::scheduleUploads : [%s] joining active window (started %s, ends %s)",
windowName, prevStart.Format(time.Kitchen), initialWindowEndTime.Format(time.Kitchen),
)
}
}

cronEntryId, err := c.AddFunc(config.CronExpr, func() {
_, err = c.AddFunc(config.CronExpr, func() {
// Start a new window and track it
fc.activeWindowsMutex.Lock()
isFirstWindow := fc.activeWindows == 0
Expand Down Expand Up @@ -155,23 +181,12 @@ func (fc *FileCache) scheduleUploads(c *cron.Cron, sched WeeklySchedule) {
fc.servicePendingOps()
}
}
})
}, jobOpts...)
if err != nil {
log.Err("[%s] Failed to schedule cron job with expression '%s': %v\n",
windowName, config.CronExpr, err)
continue
}

// check if this schedule should already be active
// did this schedule have a start time within the last duration?
schedule := c.Entry(cronEntryId)
now := time.Now()
for t := schedule.Schedule.Next(now.Add(-duration)); now.After(t); t = schedule.Schedule.Next(t) {
initialWindowEndTime = t.Add(duration)
}
if !initialWindowEndTime.IsZero() {
go schedule.Job.Run()
}
}
}

Expand Down